Skip to main content

Unboxing Packages: async Part 2

Two weeks ago, I introduced you to some of the marvels available for use in the async package. But that package is so big—there are so many marvels—that I couldn’t fit them all in. In fact, I only ended up writing about APIs that deal with futures and the individual values they represent.

This week I’ll focus on the other major asynchronous type: Stream. I often find it useful to think of asynchronous types as analogous to synchronous ones, and by that metaphor if futures are individual values then streams are Iterables. But in addition to representing asynchronously-computed collections of data, streams can represent things like UI events or communication (for example over a WebSocket).

Dart’s Stream class is very powerful. In addition to dispatching events, it allows the user to pause or cancel their subscription to the stream. What’s more, the creator of the stream can be notified of pauses or cancellations (or the initial subscriptions) and take appropriate action—like closing an HTTP connection when its data stream is canceled. Building this logic into the core types makes it easy to do the right thing with streams without the author having to think about it at all.

For example, Stream.first automatically cancels its subscription once the first event arrives. So if you call WebSocket.first, it’ll close the underlying connection once you have the event you need.

The other side of this power, though, is that you need to take extra care when writing code that manipulates streams to ensure that you handle pausing and canceling correctly. If your stream transformer doesn’t forward cancellations properly, you might end up with dangling WebSocket connections, which is bad news all around.

That’s why async’s stream utilities are so useful. Not only do they help you manipulate your streams, they make sure all your listens, pauses, and cancellations are handled exactly right. I call this being cancel-correct. This gives you, the developer, freedom to hook up components as you need without worrying about any extra complexity.


I mentioned briefly that stream creators can be notified when an initial stream subscription is created. You can pass an onListen argument to new StreamController() to only start emitting events once a listener exists. And usually that’s enough—but sometimes you aren’t the one producing the original events, and you don’t want to have to manually forward them to the controller. That’s what LazyStream is for.

When you call new LazyStream(), you pass in a callback that returns a Stream (or a Future<Stream>). This callback will only be invoked once the LazyStream.listen is called, and its events will automatically be piped to the listen handlers. Most of the time it won’t even create an intermediate StreamController, so it’s extra efficient, too.

/// Returns the contents of the getting [url].
/// Only starts the request when the returned stream is listened to.
Stream<List<int>> httpStream(Uri url) {
  return new LazyStream<List<int>>(() async {
    var client = new HttpClient();
    var request = await client.getUrl(Uri.parse(url));
    return await request.close();


If you’ve done much asynchronous programming in Dart, you’re doubtless familiar with the Completer class. It lets you return a future immediately but only fill in its result later on when whatever asynchronous work you’re doing is finished. Well, a StreamCompleter is pretty much the same thing for streams. returns a stream immediately, just like Completer.future. But this stream doesn’t emit any events until you call setSourceStream(), which—like Completer.complete()—provides the concrete value. All events from the source stream are forwarded to the output stream, and of course pauses and cancellations are passed back to the source.

What if you fail to get the source stream? Call setError() instead! Like Completer.completeError, this indicates that the stream completion was unsuccessful. In a more practical sense, it makes the output stream emit the error and then close immediately.

The whole process of getting a stream’s value asynchronously sounds a lot like a Future<Stream>. There are even a few places, like WebSocket.connect(), which return such futures in practice. That’s why the StreamCompleter.fromFuture() static utility function exists. It just takes a Future<Stream> and returns a Stream:

Stream fromFuture(Future<Stream> streamFuture) {
  var completer = new StreamCompleter();
      onError: completer.setError);

In some cases—especially when you want to use async/await—it’s way easier to just use fromFuture() than to manually deal with a StreamCompleter yourself.


I think of StreamGroup kind of like a funnel: it takes a bunch of different input streams and merges them all into a single output stream. Like FutureGroup, which I wrote about last time, it implements Sink. This means that you call add() to add new streams to the group, and close() once you’ve added all the streams that need adding.

The events from the input streams are piped into the output stream, which is accessible by calling stream. This stream will only close once all the input streams close and close() is called, because otherwise more streams may still be added later on. And of course, it’s cancel-correct: it only listens to the source streams once the output stream has a listener, and if the output stream is paused or canceled then all the source streams are as well.

class Phase {
  final _transformers = <Transformer, TranformerRunner>{};

  /// A broadcast stream of all log events from all transformers in this phase.
  Stream<LogEvent> get onLog =>;
  final _onLogGroup = new StreamGroup<LogEvent>.broadcast();

  void addTransformer(Transformer transformer) {
    var runner = new TransformerRunner(transformer);
    _transformers[transformer] = runner;

If you don’t care about adding streams later on, you can also use the StreamGroup.merge() static utility function. This just takes an Iterable<Stream> and merges all the streams immediately. It’s the same as adding them all to a StreamGroup and closing it, but it’s a lot cleaner than doing that manually. That’s why we call it a utility function!


If StreamGroup is a funnel, then StreamSplitter is a sprinkler: it takes a single source stream and splits it into a bunch of identical copies. Each copy independently emits the same events as the source stream, with its own buffering and everything. There’s no way at all for actions on one copy to affect any others.

A splitter is created by passing the source stream to new StreamSplitter(), and copies are created by calling split(). Once you don’t need any more copies, call close() to let the splitter know.

Closing the splitter lets it maintain cancel-correctness. Before it’s closed, it can never safely cancel or pause the source stream, since a new copy could be created that might need additional events. But once it’s closed, the source stream can be canceled as soon as all of the copies are canceled.

Like StreamGroup.merge(), there’s a handy utility method for splitting streams: StreamSplitter.splitFrom(). This takes a source stream and splits it into a set number of copies—two by default, but it can be any number you want. I use it pretty frequently when I’m debugging to see what a stream is emitting without affecting its normal usage.


Some APIs are made for end-users to use directly in their applications, and some are made for implementing other APIs. The SubscriptionStream class falls squarely into the latter category. It’s used internally in the async package itself, and it’s good to know about in case you ever need to implement some asynchronous utilities yourself.

When you call Stream.listen() to subscribe to a stream, you get a StreamSubscription back. This is usually just used to pause and cancel events, but it can also be used to replace the event handlers by calling methods like onData()1. SubscriptionStream takes advantage of this capability to convert a subscription into a brand new stream that can itself be listened to.

When you pass a subscription into new SubscriptionStream(), its old event handlers are removed and it’s paused. This means any additional events are buffered until the SubscriptionStream.listen() is called. Once it is, it sets the new handlers on the original subscription and just returns that. Most of the time it doesn’t even create any extra intermediate objects!

/// Calls [onFirst] with the first event emitted by [source].
/// Returns a stream that emits all events in [source] after the first.
Stream firstAndRest(Stream source, void onFirst(value)) {
  var completer = new StreamCompleter();
  var subscription;
  subscription = source.listen((event) {
    completer.setSourceStream(new SubscriptionStream(subscription));

await last

This about covers the async package’s stream APIs, but there are other APIs still to come. The package has a lot of cool stuff! Join me again in two weeks when I cover everything else, sinks and queues and timers and all.

  1. This method should really be called setOnData(), or else should be a setter with a corresponding getter, but it’s hard to change APIs in the core libraries.

Popular posts from this blog

Const, Static, Final, Oh my!

Posted by Seth Ladd

(This is an "oldie but a goodie" post originally written by Bob Nystrom. It is being posted here as the explanations still ring true.)

Bob writes:

"static", "final", and "const" mean entirely distinct things in Dart:

"static" means a member is available on the class itself instead of on instances of the class. That's all it means, and it isn't used for anything else. static modifies *members*.

"final" means single-assignment: a final variable or field *must* have an initializer. Once assigned a value, a final variable's value cannot be changed. final modifies *variables*.

"const" has a meaning that's a bit more complex and subtle in Dart. const modifies *values*. You can use it when creating collections, like const [1, 2, 3], and when constructing objects (instead of new) like const Point(2, 3). Here, const means that the object's entire deep state can be determ…

AngularDart 4

AngularDart v4 is now available. We've been busy since the release angular2 v3.1.0 in May. Not only did we "drop the 2", but we also improved the compiler and tightened up the framework to give you smaller code, we updated the package structure to improve usability, and we added several new features. Check out the updated documentation to get started.
Just angular Upgrading to v4 will require more than updating your version constraint. The package has changed names (back) to angular – dropping the 2. You'll need to update your pubspec.yaml and the corresponding imports in your code. In most instances, find-and-replace should do the trick. Going forward, the package will be called package:angular. We'll just update the version number.
Smaller code The updated compiler in 4.0 allows type-based optimizations that not only improve runtime performance but generate better code because we are able to strongly type templates. A big result of the update is that many ap…

The new AdWords UI uses Dart — we asked why

Google just announced a re-designed AdWords experience. In case you’re not familiar with AdWords: businesses use it to advertise on and partner websites. Advertising makes up majority of Google’s revenue, so when Google decides to completely redo the customer-facing front end to it, it’s a big deal. The Dart team is proud to say that this new front end is built with Dart and Angular 2. Whenever you asked us whether Google is ‘even using Dart for anything,’ this is what we had in mind but couldn’t say aloud. Until now. We asked Joshy Joseph, the primary technical lead on the project, some questions. Joshy is focusing on things like infrastructure, application latency and development velocity, so he’s the right person to ask about Dart.Q: What exactly did we launch on Monday?It’s a complete redesign of the AdWords customer experience that is rolling out slowly as a test to a small initial set of advertisers. The most noticeable thing is probably the Material Design look and f…