Skip to main content

Unboxing Packages: async Part 3

We’ve covered individual values and we’ve covered streams, but there are still a few more goodies available in the async package. These don’t fit neatly in either bucket, but when you run into situations that call for them, they’re still plenty useful.


Just like the collection package, async provides a set of wrapper classes. Each of these classes implements a dart:async class and forwards all calls to an inner instance of that class. This makes it easy for users to provide customized versions of those classes. In fact, the async package itself uses some of its own wrappers.

There’s a DelegatingFuture class, of course. There’s also a DelegatingStreamSubscription, and wrappers for every kind of sink or consumer you can name: DelegatingSink, DelegatingEventSink, DelegatingStreamConsumer, and of course DelegatingStreamSink. Of these, DelegatingStreamSink is used most often since it encompasses all the functionality of the other classes.

/// A [StreamSink] that takes extra arguments for [close].
class WebSocketSink extends DelegatingStreamSink {
  final WebSocket _webSocket;

  WebSocketSink(WebSocket webSocket)
      : super(webSocket),
        _webSocket = webSocket;

  /// Closes the web socket connection with the given [code] and [reason].
  Future close([int code, String reason]) => _webSocket.close(code, reason);

You may notice that there’s no DelegatingStream. That’s because it already exists in the core libraries! It’s called StreamView, and it’s in dart:async. Its presence in the core libraries and its inconsistent name are historical accidents, and at some point we may add a DelegatingStream that just directly extends StreamView for consistency.


The RestartableTimer class is very small, but despite that I think it’s useful enough to warrant a mention here. It’s a subclass of Timer, and new RestartableTimer(), isActive, and cancel() all work just the same as the superclass.

The big difference is the all-new reset() method. This restarts the timer, with its original timer and callback. It’ll start counting down again whether or not it already fired, which means that the callback can be called more than once.

This is really useful for implementing a heartbeat—for example, in test, we use a RestartableTimer to time out a test if it doesn’t interact with the test framework for too long.

class Invoker {
  RestartableTimer _timeoutTimer;

  Invoker(...) {
    _timeoutTimer = new RestartableTimer(new Duration(seconds: 30), () {
      registerException("The test timed out.");

  // Functions like `expect()` call this to notify the test runner that the test
  // is still doing work. If it's not called for thirty seconds, the test times
  // out.
  void heartbeat() {

Stream Sink Utilities

Conceptually, streams have two ends: the Stream object is where events are emitted, and it’s where most of the complexity lies, but for every Stream there’s a StreamSink behind the scenes feeding it events. If your code is the one in charge of that sink, you may need utilities for dealing with them, and the async package is ready to serve.


The core dart:async library includes a StreamTransformer class, which provides an abstraction for transforming streams in a predefined way. The core libraries use it for converters for formats like JSON and UTF-8. But there’s no core library class for transforming StreamSinks, so async steps in to pick up the slack.

The StreamSinkTransformer class works almost exactly like StreamTransformer: you pass in a sink to bind(), and it returns a new sink that’s transformed according to the transformer’s internal logic. It also has a new StreamSinkTransformer.fromHandlers() transformer that’s just like new StreamTransformer.fromHandlers(): it invokes the handlers when the corresponding events are added to the transformed sink.

The one novel API is the new StreamSinkTransformer.fromStreamTransformer() constructor. This transforms a StreamTransformer into a corresponding SinkTransformer, which lets you take advantage of all the useful transformers in the core libraries.

main() async {
  // WebSocket implements both Stream and StreamSink. We want to communicate
  // using JSON, so we decode the streamed events and encode the events added to
  // the sink.
  var socket = await WebSocket.connect("ws://");
  var stream = socket.transform(JSON.decoder);
  var sink = const StreamSinkTransformer.fromStreamTransformer(JSON.encoder)

  // ...


This complements a class, not from the core libraries, but from the async package itself. In my last article, I talked about StreamCompleter, which allows you to return a stream immediately when that stream is only actually generated later on. StreamSinkCompleter performs the same role, but for sinks.

The API is pretty much entirely parallel to StreamCompleter: the sink getter returns the sink that’s being completed, and the setDestinationSink() method sets the underlying implementation of that sink. There’s even a StreamSink.fromFuture() method that directly converts a Future<StreamSink> into a StreamSink.

There is a bit of extra subtlety with the setError() method, though. Sinks don’t emit events the same way streams do, but they do have a way of surfacing errors: the done future. So when you call setError(), it causes the sink to ignore all events that are added and emit the given error through done.


StreamQueue may be my favorite of all the APIs in the async package. It straddles the boundary between streams and individual values by taking a stream—which operates by pushing events to callbacks—and exposing a pull-based API structured mostly around futures.

The simplest example is the next getter, which returns a future that completes to the next value in the stream. So the first time you call it, it returns the first event. Then it’ll return the second event, then the third, and so on. This is all true even if you call it multiple times before the first future completes—StreamQueue keeps track of which future events belong to which futures.

void main() async {
  var queue = new StreamQueue(new Stream.fromIterable([1, 2, 3]));
  var first =;
  var second =;
  var third =;
  print(await Future.wait([first, second, third])); // => [1, 2, 3]

If you call next and there turn out to be no events left in the stream, it’ll complete with a StateError. If you’re worried about this, you can check hasNext first to see if such a value exists.

There’s a take() method, which consumes a given number of events and returns a list containing them—or just all the remaining values, if there are fewer than the given number. There’s also skip(), which is like take() except that it ignores the values entirely.

If you have a StreamQueue and you want a stream instead, you can use the rest getter. This returns the portion of the underlying stream after all the reserved events. We can use it to re-write the firstAndRest() function from my last article:

/// Calls [onFirst] with the first event emitted by [source].
/// Returns a stream that emits all events in [source] after the first.
Stream firstAndRest(Stream source, void onFirst(value)) {
  var queue = new StreamQueue(source);;

Finally, when you’re done with a queue, you can call cancel(). This cancels the subscription to the underlying stream, but by default, it waits to do so until after the stream fires enough events to complete all outstanding futures from next and friends. However, if you pass immediate: true, it’ll cancel the subscription right away instead. The outstanding futures will complete as though the stream had closed.

StreamQueue is particularly useful for testing streams. It lets you write very straightforward code that just tests each event in series, rather than fiddling around explicit calls to Stream.listen(). For example, here’s a real test for CancelableOperation:

void main() {
  test("asStream() emits an error and then closes", () async {
    var completer = new CancelableCompleter();
    var queue = new StreamQueue(completer.operation.asStream());
    expect(, throwsA("error"));
    expect(queue.hasNext, isFalse);

In addition to writing great blog posts about cool packages, I maintain the test package, and one of my long-term plans is to introduce a set of stream matchers. These matchers will be heavily based on StreamQueue, since it’s so handy for moving through a stream event by event.


Three articles later, and I’m finally done showing off everything cool in the async package—at least until more cool stuff gets added! Master these tools and you’ll be well on your way to being an asynchrony expert. Maybe you’ll even find some unfilled needs, work up a solution, and send us a pull request!

Come back again in two weeks when I talk about a package that builds a brand new abstraction on top of async.

Popular posts from this blog

A stronger Dart for everyone

We are constantly asking ourselves:
How do we make developers even more productive when writing Dart apps? We believe that a critical part of the answer to this question is to make strongmode – a sound static type system for Dart – the standard for all Dart developers.

Teams that use Dart to build apps like Soundtrap, AdWords, AdSense, and Greentea say they really enjoy using strong mode features, such as early error detection. In fact, teams that have switched completely to strong mode cite not only early error detection but also better code readability and maintainability as major benefits. We hear this both from small teams and – even more so – from large teams with hundreds of developers writing and maintaining millions of lines of Dart code. As Björn Sperber from Soundtrap says,
Strong mode and the smooth integration with IntelliJ is a joy to use and a huge improvement. If you’ve tried out Flutter, you’ve already used strong mode checks from the Dart analyzer.

Given the benefits …

AngularDart 3.0: Easy upgrade, better performance

AngularDart 3.0 is now available. It brings better performance and smaller generated code, while also making you more productive.

Version 3.0 is an evolution: although it has some breaking changes (detailed below), it is a smooth upgrade due to minimal public API adjustments. Most of the progress is under the hood—in code quality, stability, generated code size, performance, and developer experience.

Code quality:
2731 instances of making the framework code more type safe (using sound Dart).The AngularDart framework code size is down by 12%.Many additional style updates to the codebase:Changed to use idiomatic <T> for generic methods.Removed NgZoneImpl, all the code exists in NgZone now.Stability:
Many CSS encapsulation fixes due to update with csslib package.Fixed bugs with IE11.

For the Mail sample app, we see 30% faster time-to-interactive (currently 3812 ms on a simulated 3G connection, measured via Lighthouse).Our large app benchmark shows 2x faster render times of…

Dart 1.24: Faster edit-refresh cycle on the web & new function type syntax

Dart 1.24 is now available. It includes the Dart Development Compiler and supports a new generic function type syntax. Get it now!

Figure 1: DDC debugging in Chrome.

Some notable changes in this release:
pub serve now has support for the Dart Development Compiler. Unlike dart2js, this new compiler is modular, which allows pub to do incremental re-builds for pub serve.In practice what that means is you can edit your Dart files, refresh in Chrome (or other supported browsers), and see your edits almost immediately. This is because pub is only recompiling your package, not all packages that you depend on.There is one caveat with the new compiler, which is that your package and your dependencies must all be strong mode clean.You can also use the new compiler to run your tests in Chrome much more quickly than you can with dart2js.Read more in the changelog.You can now publish packages that depend on the Flutter SDK to pub. Moreover, has started tagging Flutter plugins with …