Skip to main content

New site for Dart news and articles

For the latest Dart news, visit our new blog at  https://medium.com/dartlang .

Unboxing Packages: async Part 2

Two weeks ago, I introduced you to some of the marvels available for use in the async package. But that package is so big—there are so many marvels—that I couldn’t fit them all in. In fact, I only ended up writing about APIs that deal with futures and the individual values they represent.

This week I’ll focus on the other major asynchronous type: Stream. I often find it useful to think of asynchronous types as analogous to synchronous ones, and by that metaphor if futures are individual values then streams are Iterables. But in addition to representing asynchronously-computed collections of data, streams can represent things like UI events or communication (for example over a WebSocket).

Dart’s Stream class is very powerful. In addition to dispatching events, it allows the user to pause or cancel their subscription to the stream. What’s more, the creator of the stream can be notified of pauses or cancellations (or the initial subscriptions) and take appropriate action—like closing an HTTP connection when its data stream is canceled. Building this logic into the core types makes it easy to do the right thing with streams without the author having to think about it at all.

For example, Stream.first automatically cancels its subscription once the first event arrives. So if you call WebSocket.first, it’ll close the underlying connection once you have the event you need.

The other side of this power, though, is that you need to take extra care when writing code that manipulates streams to ensure that you handle pausing and canceling correctly. If your stream transformer doesn’t forward cancellations properly, you might end up with dangling WebSocket connections, which is bad news all around.

That’s why async’s stream utilities are so useful. Not only do they help you manipulate your streams, they make sure all your listens, pauses, and cancellations are handled exactly right. I call this being cancel-correct. This gives you, the developer, freedom to hook up components as you need without worrying about any extra complexity.

LazyStream

I mentioned briefly that stream creators can be notified when an initial stream subscription is created. You can pass an onListen argument to new StreamController() to only start emitting events once a listener exists. And usually that’s enough—but sometimes you aren’t the one producing the original events, and you don’t want to have to manually forward them to the controller. That’s what LazyStream is for.

When you call new LazyStream(), you pass in a callback that returns a Stream (or a Future<Stream>). This callback will only be invoked once the LazyStream.listen is called, and its events will automatically be piped to the listen handlers. Most of the time it won’t even create an intermediate StreamController, so it’s extra efficient, too.

/// Returns the contents of the getting [url].
///
/// Only starts the request when the returned stream is listened to.
Stream<List<int>> httpStream(Uri url) {
  return new LazyStream<List<int>>(() async {
    var client = new HttpClient();
    var request = await client.getUrl(Uri.parse(url));
    return await request.close();
  });
}

StreamCompleter

If you’ve done much asynchronous programming in Dart, you’re doubtless familiar with the Completer class. It lets you return a future immediately but only fill in its result later on when whatever asynchronous work you’re doing is finished. Well, a StreamCompleter is pretty much the same thing for streams.

StreamCompleter.stream returns a stream immediately, just like Completer.future. But this stream doesn’t emit any events until you call setSourceStream(), which—like Completer.complete()—provides the concrete value. All events from the source stream are forwarded to the output stream, and of course pauses and cancellations are passed back to the source.

What if you fail to get the source stream? Call setError() instead! Like Completer.completeError, this indicates that the stream completion was unsuccessful. In a more practical sense, it makes the output stream emit the error and then close immediately.

The whole process of getting a stream’s value asynchronously sounds a lot like a Future<Stream>. There are even a few places, like WebSocket.connect(), which return such futures in practice. That’s why the StreamCompleter.fromFuture() static utility function exists. It just takes a Future<Stream> and returns a Stream:

Stream fromFuture(Future<Stream> streamFuture) {
  var completer = new StreamCompleter();
  streamFuture.then(completer.setSourceStream,
      onError: completer.setError);
  return completer.stream;
}

In some cases—especially when you want to use async/await—it’s way easier to just use fromFuture() than to manually deal with a StreamCompleter yourself.

StreamGroup

I think of StreamGroup kind of like a funnel: it takes a bunch of different input streams and merges them all into a single output stream. Like FutureGroup, which I wrote about last time, it implements Sink. This means that you call add() to add new streams to the group, and close() once you’ve added all the streams that need adding.

The events from the input streams are piped into the output stream, which is accessible by calling stream. This stream will only close once all the input streams close and close() is called, because otherwise more streams may still be added later on. And of course, it’s cancel-correct: it only listens to the source streams once the output stream has a listener, and if the output stream is paused or canceled then all the source streams are as well.

class Phase {
  final _transformers = <Transformer, TranformerRunner>{};

  /// A broadcast stream of all log events from all transformers in this phase.
  Stream<LogEvent> get onLog => _onLogGroup.stream;
  final _onLogGroup = new StreamGroup<LogEvent>.broadcast();

  void addTransformer(Transformer transformer) {
    var runner = new TransformerRunner(transformer);
    _transformers[transformer] = runner;
    _onLogGroup.add(runner.onLog);
  }
}

If you don’t care about adding streams later on, you can also use the StreamGroup.merge() static utility function. This just takes an Iterable<Stream> and merges all the streams immediately. It’s the same as adding them all to a StreamGroup and closing it, but it’s a lot cleaner than doing that manually. That’s why we call it a utility function!

StreamSplitter

If StreamGroup is a funnel, then StreamSplitter is a sprinkler: it takes a single source stream and splits it into a bunch of identical copies. Each copy independently emits the same events as the source stream, with its own buffering and everything. There’s no way at all for actions on one copy to affect any others.

A splitter is created by passing the source stream to new StreamSplitter(), and copies are created by calling split(). Once you don’t need any more copies, call close() to let the splitter know.

Closing the splitter lets it maintain cancel-correctness. Before it’s closed, it can never safely cancel or pause the source stream, since a new copy could be created that might need additional events. But once it’s closed, the source stream can be canceled as soon as all of the copies are canceled.

Like StreamGroup.merge(), there’s a handy utility method for splitting streams: StreamSplitter.splitFrom(). This takes a source stream and splits it into a set number of copies—two by default, but it can be any number you want. I use it pretty frequently when I’m debugging to see what a stream is emitting without affecting its normal usage.

SubscriptionStream

Some APIs are made for end-users to use directly in their applications, and some are made for implementing other APIs. The SubscriptionStream class falls squarely into the latter category. It’s used internally in the async package itself, and it’s good to know about in case you ever need to implement some asynchronous utilities yourself.

When you call Stream.listen() to subscribe to a stream, you get a StreamSubscription back. This is usually just used to pause and cancel events, but it can also be used to replace the event handlers by calling methods like onData()1. SubscriptionStream takes advantage of this capability to convert a subscription into a brand new stream that can itself be listened to.

When you pass a subscription into new SubscriptionStream(), its old event handlers are removed and it’s paused. This means any additional events are buffered until the SubscriptionStream.listen() is called. Once it is, it sets the new handlers on the original subscription and just returns that. Most of the time it doesn’t even create any extra intermediate objects!

/// Calls [onFirst] with the first event emitted by [source].
///
/// Returns a stream that emits all events in [source] after the first.
Stream firstAndRest(Stream source, void onFirst(value)) {
  var completer = new StreamCompleter();
  var subscription;
  subscription = source.listen((event) {
    onFirst(event);
    completer.setSourceStream(new SubscriptionStream(subscription));
  });
  return completer.stream;
}

await last

This about covers the async package’s stream APIs, but there are other APIs still to come. The package has a lot of cool stuff! Join me again in two weeks when I cover everything else, sinks and queues and timers and all.


  1. This method should really be called setOnData(), or else should be a setter with a corresponding getter, but it’s hard to change APIs in the core libraries.