Skip to main content

Unboxing Packages: async Part 2

Two weeks ago, I introduced you to some of the marvels available for use in the async package. But that package is so big—there are so many marvels—that I couldn’t fit them all in. In fact, I only ended up writing about APIs that deal with futures and the individual values they represent.

This week I’ll focus on the other major asynchronous type: Stream. I often find it useful to think of asynchronous types as analogous to synchronous ones, and by that metaphor if futures are individual values then streams are Iterables. But in addition to representing asynchronously-computed collections of data, streams can represent things like UI events or communication (for example over a WebSocket).

Dart’s Stream class is very powerful. In addition to dispatching events, it allows the user to pause or cancel their subscription to the stream. What’s more, the creator of the stream can be notified of pauses or cancellations (or the initial subscriptions) and take appropriate action—like closing an HTTP connection when its data stream is canceled. Building this logic into the core types makes it easy to do the right thing with streams without the author having to think about it at all.

For example, Stream.first automatically cancels its subscription once the first event arrives. So if you call WebSocket.first, it’ll close the underlying connection once you have the event you need.

The other side of this power, though, is that you need to take extra care when writing code that manipulates streams to ensure that you handle pausing and canceling correctly. If your stream transformer doesn’t forward cancellations properly, you might end up with dangling WebSocket connections, which is bad news all around.

That’s why async’s stream utilities are so useful. Not only do they help you manipulate your streams, they make sure all your listens, pauses, and cancellations are handled exactly right. I call this being cancel-correct. This gives you, the developer, freedom to hook up components as you need without worrying about any extra complexity.


I mentioned briefly that stream creators can be notified when an initial stream subscription is created. You can pass an onListen argument to new StreamController() to only start emitting events once a listener exists. And usually that’s enough—but sometimes you aren’t the one producing the original events, and you don’t want to have to manually forward them to the controller. That’s what LazyStream is for.

When you call new LazyStream(), you pass in a callback that returns a Stream (or a Future<Stream>). This callback will only be invoked once the LazyStream.listen is called, and its events will automatically be piped to the listen handlers. Most of the time it won’t even create an intermediate StreamController, so it’s extra efficient, too.

/// Returns the contents of the getting [url].
/// Only starts the request when the returned stream is listened to.
Stream<List<int>> httpStream(Uri url) {
  return new LazyStream<List<int>>(() async {
    var client = new HttpClient();
    var request = await client.getUrl(Uri.parse(url));
    return await request.close();


If you’ve done much asynchronous programming in Dart, you’re doubtless familiar with the Completer class. It lets you return a future immediately but only fill in its result later on when whatever asynchronous work you’re doing is finished. Well, a StreamCompleter is pretty much the same thing for streams. returns a stream immediately, just like Completer.future. But this stream doesn’t emit any events until you call setSourceStream(), which—like Completer.complete()—provides the concrete value. All events from the source stream are forwarded to the output stream, and of course pauses and cancellations are passed back to the source.

What if you fail to get the source stream? Call setError() instead! Like Completer.completeError, this indicates that the stream completion was unsuccessful. In a more practical sense, it makes the output stream emit the error and then close immediately.

The whole process of getting a stream’s value asynchronously sounds a lot like a Future<Stream>. There are even a few places, like WebSocket.connect(), which return such futures in practice. That’s why the StreamCompleter.fromFuture() static utility function exists. It just takes a Future<Stream> and returns a Stream:

Stream fromFuture(Future<Stream> streamFuture) {
  var completer = new StreamCompleter();
      onError: completer.setError);

In some cases—especially when you want to use async/await—it’s way easier to just use fromFuture() than to manually deal with a StreamCompleter yourself.


I think of StreamGroup kind of like a funnel: it takes a bunch of different input streams and merges them all into a single output stream. Like FutureGroup, which I wrote about last time, it implements Sink. This means that you call add() to add new streams to the group, and close() once you’ve added all the streams that need adding.

The events from the input streams are piped into the output stream, which is accessible by calling stream. This stream will only close once all the input streams close and close() is called, because otherwise more streams may still be added later on. And of course, it’s cancel-correct: it only listens to the source streams once the output stream has a listener, and if the output stream is paused or canceled then all the source streams are as well.

class Phase {
  final _transformers = <Transformer, TranformerRunner>{};

  /// A broadcast stream of all log events from all transformers in this phase.
  Stream<LogEvent> get onLog =>;
  final _onLogGroup = new StreamGroup<LogEvent>.broadcast();

  void addTransformer(Transformer transformer) {
    var runner = new TransformerRunner(transformer);
    _transformers[transformer] = runner;

If you don’t care about adding streams later on, you can also use the StreamGroup.merge() static utility function. This just takes an Iterable<Stream> and merges all the streams immediately. It’s the same as adding them all to a StreamGroup and closing it, but it’s a lot cleaner than doing that manually. That’s why we call it a utility function!


If StreamGroup is a funnel, then StreamSplitter is a sprinkler: it takes a single source stream and splits it into a bunch of identical copies. Each copy independently emits the same events as the source stream, with its own buffering and everything. There’s no way at all for actions on one copy to affect any others.

A splitter is created by passing the source stream to new StreamSplitter(), and copies are created by calling split(). Once you don’t need any more copies, call close() to let the splitter know.

Closing the splitter lets it maintain cancel-correctness. Before it’s closed, it can never safely cancel or pause the source stream, since a new copy could be created that might need additional events. But once it’s closed, the source stream can be canceled as soon as all of the copies are canceled.

Like StreamGroup.merge(), there’s a handy utility method for splitting streams: StreamSplitter.splitFrom(). This takes a source stream and splits it into a set number of copies—two by default, but it can be any number you want. I use it pretty frequently when I’m debugging to see what a stream is emitting without affecting its normal usage.


Some APIs are made for end-users to use directly in their applications, and some are made for implementing other APIs. The SubscriptionStream class falls squarely into the latter category. It’s used internally in the async package itself, and it’s good to know about in case you ever need to implement some asynchronous utilities yourself.

When you call Stream.listen() to subscribe to a stream, you get a StreamSubscription back. This is usually just used to pause and cancel events, but it can also be used to replace the event handlers by calling methods like onData()1. SubscriptionStream takes advantage of this capability to convert a subscription into a brand new stream that can itself be listened to.

When you pass a subscription into new SubscriptionStream(), its old event handlers are removed and it’s paused. This means any additional events are buffered until the SubscriptionStream.listen() is called. Once it is, it sets the new handlers on the original subscription and just returns that. Most of the time it doesn’t even create any extra intermediate objects!

/// Calls [onFirst] with the first event emitted by [source].
/// Returns a stream that emits all events in [source] after the first.
Stream firstAndRest(Stream source, void onFirst(value)) {
  var completer = new StreamCompleter();
  var subscription;
  subscription = source.listen((event) {
    completer.setSourceStream(new SubscriptionStream(subscription));

await last

This about covers the async package’s stream APIs, but there are other APIs still to come. The package has a lot of cool stuff! Join me again in two weeks when I cover everything else, sinks and queues and timers and all.

  1. This method should really be called setOnData(), or else should be a setter with a corresponding getter, but it’s hard to change APIs in the core libraries.

Popular posts from this blog

A stronger Dart for everyone

We are constantly asking ourselves:
How do we make developers even more productive when writing Dart apps? We believe that a critical part of the answer to this question is to make strongmode – a sound static type system for Dart – the standard for all Dart developers.

Teams that use Dart to build apps like Soundtrap, AdWords, AdSense, and Greentea say they really enjoy using strong mode features, such as early error detection. In fact, teams that have switched completely to strong mode cite not only early error detection but also better code readability and maintainability as major benefits. We hear this both from small teams and – even more so – from large teams with hundreds of developers writing and maintaining millions of lines of Dart code. As Björn Sperber from Soundtrap says,
Strong mode and the smooth integration with IntelliJ is a joy to use and a huge improvement. If you’ve tried out Flutter, you’ve already used strong mode checks from the Dart analyzer.

Given the benefits …

AngularDart 3.0: Easy upgrade, better performance

AngularDart 3.0 is now available. It brings better performance and smaller generated code, while also making you more productive.

Version 3.0 is an evolution: although it has some breaking changes (detailed below), it is a smooth upgrade due to minimal public API adjustments. Most of the progress is under the hood—in code quality, stability, generated code size, performance, and developer experience.

Code quality:
2731 instances of making the framework code more type safe (using sound Dart).The AngularDart framework code size is down by 12%.Many additional style updates to the codebase:Changed to use idiomatic <T> for generic methods.Removed NgZoneImpl, all the code exists in NgZone now.Stability:
Many CSS encapsulation fixes due to update with csslib package.Fixed bugs with IE11.

For the Mail sample app, we see 30% faster time-to-interactive (currently 3812 ms on a simulated 3G connection, measured via Lighthouse).Our large app benchmark shows 2x faster render times of…

Dart 1.24: Faster edit-refresh cycle on the web & new function type syntax

Dart 1.24 is now available. It includes the Dart Development Compiler and supports a new generic function type syntax. Get it now!

Figure 1: DDC debugging in Chrome.

Some notable changes in this release:
pub serve now has support for the Dart Development Compiler. Unlike dart2js, this new compiler is modular, which allows pub to do incremental re-builds for pub serve.In practice what that means is you can edit your Dart files, refresh in Chrome (or other supported browsers), and see your edits almost immediately. This is because pub is only recompiling your package, not all packages that you depend on.There is one caveat with the new compiler, which is that your package and your dependencies must all be strong mode clean.You can also use the new compiler to run your tests in Chrome much more quickly than you can with dart2js.Read more in the changelog.You can now publish packages that depend on the Flutter SDK to pub. Moreover, has started tagging Flutter plugins with …