Skip to main content

New site for Dart news and articles

For the latest Dart news, visit our new blog at  https://medium.com/dartlang .

Introducing new Streams API

TL;DR: please play with our Stream-based IO and isolates and give us feedback. See instructions near the end of the this post.

As announced some weeks ago, we have continued our library refactoring in an experimental branch to avoid unnecessary breakages. We have now reached a small milestone in our development, and would like to invite the community to play with the current library and give us some feedback.
Milestone “streams”: users have been asking for a simpler asynchronous library, where everything works in a consistent way, for some time now (see for example http://goo.gl/cWaJh for Sean Seagan’s proposal). With this milestone we finally have running code that gets us one step closer to that goal. We have now converted both the IO library and isolates to use our new class for repeating events, Streams. Both are currently only wrapping the old primitives, but eventually we want to remove support for the old API. The html library is not yet adapted but should eventually also use streams for UI events.


We have also improved the Future class, and made it simpler to use. A simple “then” method lets you apply asynchronous or synchronous functions to the result of a future, merging the three methods “chain”, “transform” and “then”. Streams and Futures should make asynchronous Dart programs easier to write and read, and should reduce some types of programming errors.

Stream

A “Stream” is an object that emits a (possibly non-terminating) sequence of events. In the IO library, these are mainly data events, represented as byte-arrays, whereas for the isolate library the data is most often lists, maps, Strings, numbers and booleans.

The main thing one does with a stream is subscribe to it, calling the method “subscribe” with optional arguments onData, onError, and onDone.


class Stream<T> {
 StreamSubscription<T> subscribe(
     {void onData(T data),
      void onError(AsyncError error),
      void onDone(),
      bool unsubscribeOnError});
 ...
}

Ignoring corner-cases, and in particular error-handling, this method basically subscribes to the stream and will then receive all events.


Example:

new File('/etc/passwd').openForRead() // returns a Stream.
   .subscribe(onData: (List<int> data) { print(data.length); });

On my machine this prints one line, “2060”, indicating that the whole file was read in one go, and that the data-handler was invoked once.


$ ls -l /etc/passwd
-rw-r--r-- 1 root root 2060 Sep 19 03:20 /etc/passwd

To change an existing subscription to a stream, one should use the return value from .subscribe, which is a StreamSubscription object. This lets one end a subscription (“unsubscribe”), attempt to throttle the stream (“pause”), or change the handlers that were set when subscribing initially.

class StreamSubscription<T> {
 void unsubscribe();
 void pause([Signal resume]);

 void onData(void handleData(T data));
 void onDone(void handleDone());
 void onError(void handleError(AsyncError error));
}

Contrary to the addEventHandler/removeEventHandler pattern we don’t allow unsubscription using the onData method, but require going through the subscription object.
The onData/onDone/onError methods have the same semantics as in Stream.subscribe. By having them on the subscription we provide the flexibility to change the subscription at a later point.

The pause method does a best effort to pause the stream (usually by transmitting the pause-request to its source). However, unless the Stream is a “BufferingStream”, there is no guarantee that the pause will actually have an effect.

We have learned a lot by looking at .Net’s RX (reactive extensions) and its port to Dart by John Evan: https://github.com/prujohn/Reactive-Dart. We feel that Iterables and Streams should be tightly linked together, like C#’s Enumerables and Observables. That is, we see Streams as the push version of Iterables. Iterables provide a sequence of data elements on demand (pull), and Streams asynchronously push the elements, and demand that they be handled. Both classes deal with sequences of data, though, and their interfaces thus provide similar functions:


class Stream<T> {
 ...
 Future<bool> get isEmpty;
 Future<int> get length;

 Future<bool> any(bool test(T element));
 Future<bool> every(bool test(T element));
 Future<bool> contains(T match);
 Future reduce(var initialValue, combine(previous, T element));
 Future<List<T>> toList();
 Future<Set<T>> toSet();
 ...
}

If you haven’t seen all of these methods on Iterable yet, it is because some of these methods have only been added to Iterable recently. Here is the relevant snippet of the Iterable class in our experimental branch:

class Iterable<T> {
 ...
 bool get isEmpty;
 int get length;

 bool any(bool test(T element));
 bool every(bool test(T element));
 bool contains(T match);
 reduce(var initialValue, combine(previous, T element));
 List<T> toList();
 Set<T> toSet();
 ...
 // See below for a discussion on the mappedTo and where change.
 // Replaces "map".
 Iterable mappedTo(convert(T element));
 // Replaces "filter".
 Iterable where(bool test(T element));
}


There are also a list of methods that transform a stream and return a new Stream. Again we make the behavior analog to methods in Iterable:

class Stream<T> {
 ...
 Stream mappedTo(convert(T element));

Stream<T> where(bool test(T element));

 // Transforms one data-event into zero or more converted
 // data-events.
 Stream expand(Iterable convert(T value));
 // Transforms one event (data, error or done) into zero or more
 // converted events.
 Stream transform(StreamTransformer transformer);

 // Stream-only method without counterpart in Iterable.
 // Returns [this] wrapped in a stream that is guaranteed to
 // respect the pause-request.
 Stream<T> buffered()
 ...
}

Both Iterable and Stream are still missing some transformation functions (see for example http://goo.gl/SfUwm for a changelist that is currently under review), but we believe that there are already enough to start playing with the streams.

A StreamController lets you create streams. The stream’s events come from a linked stream sink that the data is pushed into.

class StreamController<T> {
 ...
 Stream<T> get stream;
 StreamSink<T> get sink;
}

Data that is fed into the sink is then pushed as events on the stream.

abstract class StreamSink<T> {
 void add(T event);
 void signalError(AsyncError errorEvent);
 void close();
}

For convenience, the StreamController already implements both the Stream and StreamSink interface. The stream and sink getters return wrappers that limit the functionality. This way it is possible to pass on a stream to a different library, while still knowing that the receiver cannot trigger new events.

Streams live in the async package and are asynchronous. That is, they should not send any data before the end of the current event-loop iteration. The streams provided by the async-library guarantee this behavior. This gives the program time to attach subscribers to events. After the control returns to the event loop a stream is allowed to send its events, even if there are no subscribers. This means that data and errors can be lost if there are no subscribers.

A tricky point is that transformed streams need a final subscriber, to make events go through the transformation before being dropped. Transforming streams are not subscribing themselves until they themselves have been subscribed to.

Example:

new File('/etc/passwd').openForRead()
 .mappedTo((data) {
   print(data);
   return data.length;
 });

In this example the stream (coming from ‘openForRead’) is transformed by ‘mappedTo’, which has no subscribers. At the next event-loop iteration the file stream will start firing events. Since the mappedTo stream doesn’t have any subscribers it didn’t subscribe to the file-stream. As a consequence the file-stream has no subscribers and will just discard its events. No data will be printed.

Other library changes

This stream change didn’t happen in the void. While working on them we also improved and changed Dart in other parts of the library. We recently stabilized our experimental branch, but work in the other areas is still (more) work in progress. While we appreciate input on every part of the library we are aware that there are still things missing.

Iterable

  • Iterable.iterator() has been changed to Iterable.iterator. That is, the iterator method has been replaced by the equivalent iterator getter.

  • Iterators have been changed from next and hasNext to current and moveNext, as is the case in C#.:

abstract class Iterator<T> {
 // Returns the current element which got updated at the
 // last [moveNext]. Initially (before any [moveNext]) return
 // [:null:].
 T get current;

 // Moves to the next element and and updates [current].
 // Returns [:true:] if [this] is still in bounds. Otherwise
 // sets [current] to [:null:] and returns [:false:].
 bool moveNext();
}

Note that, contrary to how C# acts, Dart iterators don’t throw when accessed out of bounds.

Example:

var it = [1, 2].iterator;
while(it.moveNext()) {
 print(it.current);  // 1, 2
}
print(it.current);  // null.

If your code relies on the hasNext and next methods we have a HasNextIterator wrapper class for easy porting:

var it = new HasNextIterator([1, 2].iterator);
while(it.hasNext) {
 print(it.next());  // 1, 2
}


  • Iterable.map and Iterable.filter are now lazily applied and return an Iterable. That is, the mapping or filtering doesn’t happen immediately, but when the resulting iterable is iterated over. Due to the new semantics the functions have been renamed to “mappedBy” and “where”.
    We have also added “toList” and “toSet” to Iterables, which create a fresh List or Set. These force a lazy iterable to be evaluated, and store the results in a concrete collection.

Example:

Iterable iterable =
   [1, 2, 3, 4].where((x) { print(x); return x < 3; });

for (var x in iterable) {  // Prints 1, 2, 3, 4.
 /* do nothing. */
}

for (var x in iterable) {  // Prints 1, 2, 3, 4 again.
 /* do nothing. */
}

List list =
   [1, 2, 3, 4]
   .where((x) { print(x); return x < 3; })
   .toList();  // prints 1, 2, 3, 4 and stores the
               // filtered result in a fresh list.

for (var x in iterable) {  // Doesn’t print anything.
 /* do nothing. */
}

Style changes

Methods whose names are nouns and don’t take any arguments should be getters.
This means that a call to a getter does not give any guarantee on the performance. The decision to use either a getter or a method now only depends on its name and the number of arguments. For example: “isEmpty” (boolean name), “iterator” (noun name) or “length” (noun name) are all getters.
In the same way that programmers need to be aware of slow functions, programmers need to be aware of slow getters. For example a “contains” method needs to be documented that it is in O(n) and not O(1). Getters now need similar documentation. 

While we don’t explicitly forbid a getter to have a visible side-effects on its object it is generally a bad idea. In most cases this means that the name hides its intent and should be changed.

How to get the new libraries

Follow the instructions on https://code.google.com/p/dart/wiki/GettingTheSource?tm=4 but replace “branches/bleeding_edge” with “experimental/lib_v2”.

Once you are done building you can take the “sdk” directory and put it into the editor’s directory (replacing the pre-installed version). This give you syntax highlighting for the new classes and functions. Make sure to use a very recent build of the editor. The editor-team just updated the analyzer so that it accepts the new SDK without warnings about iterator being a field and not a method.

What’s next

Even during the writing of this post we already spotted areas for improvements, and new open questions. The current version of the asynchronous library is hence still a work in progress.

In the near-term future we will, however, also look more seriously into the collection library. Most of our effort will be spent on cleaning these two libraries.

In parallel we will go through open bugs and make sure that they don’t clash with the changes we propose. Once we are mostly satisfied with the libraries and are confident that the open bugs won’t require breaking changes to our new libraries we intend to merge back into bleeding_edge.

Feedback

This mail is an RFC. We want to hear your feedback. What works? What doesn’t? Are there important use-cases that we have overlooked?

If you think that there is something we need to address feel free to open a bug on http://dartbug.com/new. If you just want to send us your feedback you can also simply use this mailing-list. Both will be read by us but the issue-tracker is better if you want to make sure we don’t forget, and/or if you want to track progress.

Appendix - Examples

Example 1

import 'dart:async';
import 'dart:io';

int fib(int v) {
 if (v <= 1) return v;
 return fib(v - 1) + fib(v - 2);
}

/**
* Splits the incoming string into lines. Emits a new data
* event for each line.
*/
class LineTransformer extends StreamTransformer {
 String _buffer = "";

 void handleData(String data, Sink sink) {
   int index = data.indexOf('\n');
   if (index >= 0) {
     int start = 0;
     do {
       sink.add("$_buffer${data.substring(start, index)}");
       _buffer = "";
       start = index + 1;
       index = data.indexOf('\n', start);
     } while (index >= start);
     _buffer = data.substring(start);
   } else {
     _buffer = "$_buffer$data";
   }
 }

 void handleDone(Sink sink) {
   if (_buffer != "") sink.add(_buffer);
   sink.close();
 }
}

void main() {
 new File('numbers.dat').openForRead()
   .transform(new Utf8Decoder())
   .transform(new LineTransformer())
   .mappedBy(int.parse)
   .mappedBy(fib)
   .subscribe(onData: print);
}




Example 2 (attached as big_example.dart"):

import 'dart:async';
import 'dart:crypto';
import 'dart:io';
import 'dart:isolate';

/**
* Computes the SHA256 hash for the incoming data. Intercepts all
* events and emits the computed hash as data-event when the incoming
* is done.
*/
class SHA256Transform extends StreamTransformer {
 SHA256 hash = new SHA256();

 void handleData(List<int> data, Sink sink) {
   hash.update(data);
 }

 void handleError(ASyncError error, Sink sink) {
   sink.add(CryptoUtils.bytesToHex(hash.digest()));
   sink.close();
 }

 void handleDone(Sink sink) {
   sink.add(CryptoUtils.bytesToHex(hash.digest()));
   sink.close();
 }
}

Stream<String> compute(String path) {
 // If the path doesn't exist, it's treated as an empty file.
 print("Serving $path");
 return new File(path).openForRead()
     .transform(new SHA256Transform());
}

void initIsolate() {
 var sink = null;
 var subscription = stream.subscribe();
 subscription.onData((data) {
   compute(data[0]).pipe(data[1]);
 });
 subscription.onDone(() {
   sink.close();
 });
}

void main() {
 // Create a pool of 32 isolates to compute from. Plain round-robin
 // distribution of jobs
 var pool = new List(32);
 int current = 0;
 for (int i = 0; i < pool.length; i++) {
   pool[i] = streamSpawnFunction(initIsolate);
 }

 Sink getSink() {
   return pool[current++ % pool.length];
 }

 Stream<String> getHash(String path) {
   var sink = getSink();
   var box = new MessageBox();
   sink.add([path, box.sink]);
   return box.stream;
 }

 var server = new HttpServer();
 server.listen("0.0.0.0", 8080);
 server.defaultRequestHandler = (request, response) {
   Process.start('cat', []).then((process) {
     getHash(".${request.path}")
       .transform(new Utf8Encoder())
       .pipe(process);
     process.stdoutStream.pipe(response);
   });
 };
}