Skip to main content

Unboxing Packages: async Part 3

We’ve covered individual values and we’ve covered streams, but there are still a few more goodies available in the async package. These don’t fit neatly in either bucket, but when you run into situations that call for them, they’re still plenty useful.


Just like the collection package, async provides a set of wrapper classes. Each of these classes implements a dart:async class and forwards all calls to an inner instance of that class. This makes it easy for users to provide customized versions of those classes. In fact, the async package itself uses some of its own wrappers.

There’s a DelegatingFuture class, of course. There’s also a DelegatingStreamSubscription, and wrappers for every kind of sink or consumer you can name: DelegatingSink, DelegatingEventSink, DelegatingStreamConsumer, and of course DelegatingStreamSink. Of these, DelegatingStreamSink is used most often since it encompasses all the functionality of the other classes.

/// A [StreamSink] that takes extra arguments for [close].
class WebSocketSink extends DelegatingStreamSink {
  final WebSocket _webSocket;

  WebSocketSink(WebSocket webSocket)
      : super(webSocket),
        _webSocket = webSocket;

  /// Closes the web socket connection with the given [code] and [reason].
  Future close([int code, String reason]) => _webSocket.close(code, reason);

You may notice that there’s no DelegatingStream. That’s because it already exists in the core libraries! It’s called StreamView, and it’s in dart:async. Its presence in the core libraries and its inconsistent name are historical accidents, and at some point we may add a DelegatingStream that just directly extends StreamView for consistency.


The RestartableTimer class is very small, but despite that I think it’s useful enough to warrant a mention here. It’s a subclass of Timer, and new RestartableTimer(), isActive, and cancel() all work just the same as the superclass.

The big difference is the all-new reset() method. This restarts the timer, with its original timer and callback. It’ll start counting down again whether or not it already fired, which means that the callback can be called more than once.

This is really useful for implementing a heartbeat—for example, in test, we use a RestartableTimer to time out a test if it doesn’t interact with the test framework for too long.

class Invoker {
  RestartableTimer _timeoutTimer;

  Invoker(...) {
    _timeoutTimer = new RestartableTimer(new Duration(seconds: 30), () {
      registerException("The test timed out.");

  // Functions like `expect()` call this to notify the test runner that the test
  // is still doing work. If it's not called for thirty seconds, the test times
  // out.
  void heartbeat() {

Stream Sink Utilities

Conceptually, streams have two ends: the Stream object is where events are emitted, and it’s where most of the complexity lies, but for every Stream there’s a StreamSink behind the scenes feeding it events. If your code is the one in charge of that sink, you may need utilities for dealing with them, and the async package is ready to serve.


The core dart:async library includes a StreamTransformer class, which provides an abstraction for transforming streams in a predefined way. The core libraries use it for converters for formats like JSON and UTF-8. But there’s no core library class for transforming StreamSinks, so async steps in to pick up the slack.

The StreamSinkTransformer class works almost exactly like StreamTransformer: you pass in a sink to bind(), and it returns a new sink that’s transformed according to the transformer’s internal logic. It also has a new StreamSinkTransformer.fromHandlers() transformer that’s just like new StreamTransformer.fromHandlers(): it invokes the handlers when the corresponding events are added to the transformed sink.

The one novel API is the new StreamSinkTransformer.fromStreamTransformer() constructor. This transforms a StreamTransformer into a corresponding SinkTransformer, which lets you take advantage of all the useful transformers in the core libraries.

main() async {
  // WebSocket implements both Stream and StreamSink. We want to communicate
  // using JSON, so we decode the streamed events and encode the events added to
  // the sink.
  var socket = await WebSocket.connect("ws://");
  var stream = socket.transform(JSON.decoder);
  var sink = const StreamSinkTransformer.fromStreamTransformer(JSON.encoder)

  // ...


This complements a class, not from the core libraries, but from the async package itself. In my last article, I talked about StreamCompleter, which allows you to return a stream immediately when that stream is only actually generated later on. StreamSinkCompleter performs the same role, but for sinks.

The API is pretty much entirely parallel to StreamCompleter: the sink getter returns the sink that’s being completed, and the setDestinationSink() method sets the underlying implementation of that sink. There’s even a StreamSink.fromFuture() method that directly converts a Future<StreamSink> into a StreamSink.

There is a bit of extra subtlety with the setError() method, though. Sinks don’t emit events the same way streams do, but they do have a way of surfacing errors: the done future. So when you call setError(), it causes the sink to ignore all events that are added and emit the given error through done.


StreamQueue may be my favorite of all the APIs in the async package. It straddles the boundary between streams and individual values by taking a stream—which operates by pushing events to callbacks—and exposing a pull-based API structured mostly around futures.

The simplest example is the next getter, which returns a future that completes to the next value in the stream. So the first time you call it, it returns the first event. Then it’ll return the second event, then the third, and so on. This is all true even if you call it multiple times before the first future completes—StreamQueue keeps track of which future events belong to which futures.

void main() async {
  var queue = new StreamQueue(new Stream.fromIterable([1, 2, 3]));
  var first =;
  var second =;
  var third =;
  print(await Future.wait([first, second, third])); // => [1, 2, 3]

If you call next and there turn out to be no events left in the stream, it’ll complete with a StateError. If you’re worried about this, you can check hasNext first to see if such a value exists.

There’s a take() method, which consumes a given number of events and returns a list containing them—or just all the remaining values, if there are fewer than the given number. There’s also skip(), which is like take() except that it ignores the values entirely.

If you have a StreamQueue and you want a stream instead, you can use the rest getter. This returns the portion of the underlying stream after all the reserved events. We can use it to re-write the firstAndRest() function from my last article:

/// Calls [onFirst] with the first event emitted by [source].
/// Returns a stream that emits all events in [source] after the first.
Stream firstAndRest(Stream source, void onFirst(value)) {
  var queue = new StreamQueue(source);;

Finally, when you’re done with a queue, you can call cancel(). This cancels the subscription to the underlying stream, but by default, it waits to do so until after the stream fires enough events to complete all outstanding futures from next and friends. However, if you pass immediate: true, it’ll cancel the subscription right away instead. The outstanding futures will complete as though the stream had closed.

StreamQueue is particularly useful for testing streams. It lets you write very straightforward code that just tests each event in series, rather than fiddling around explicit calls to Stream.listen(). For example, here’s a real test for CancelableOperation:

void main() {
  test("asStream() emits an error and then closes", () async {
    var completer = new CancelableCompleter();
    var queue = new StreamQueue(completer.operation.asStream());
    expect(, throwsA("error"));
    expect(queue.hasNext, isFalse);

In addition to writing great blog posts about cool packages, I maintain the test package, and one of my long-term plans is to introduce a set of stream matchers. These matchers will be heavily based on StreamQueue, since it’s so handy for moving through a stream event by event.


Three articles later, and I’m finally done showing off everything cool in the async package—at least until more cool stuff gets added! Master these tools and you’ll be well on your way to being an asynchrony expert. Maybe you’ll even find some unfilled needs, work up a solution, and send us a pull request!

Come back again in two weeks when I talk about a package that builds a brand new abstraction on top of async.


Popular posts from this blog

Const, Static, Final, Oh my!

Posted by Seth Ladd

(This is an "oldie but a goodie" post originally written by Bob Nystrom. It is being posted here as the explanations still ring true.)

Bob writes:

"static", "final", and "const" mean entirely distinct things in Dart:

"static" means a member is available on the class itself instead of on instances of the class. That's all it means, and it isn't used for anything else. static modifies *members*.

"final" means single-assignment: a final variable or field *must* have an initializer. Once assigned a value, a final variable's value cannot be changed. final modifies *variables*.

"const" has a meaning that's a bit more complex and subtle in Dart. const modifies *values*. You can use it when creating collections, like const [1, 2, 3], and when constructing objects (instead of new) like const Point(2, 3). Here, const means that the object's entire deep state can be determ…

The new AdWords UI uses Dart — we asked why

Google just announced a re-designed AdWords experience. In case you’re not familiar with AdWords: businesses use it to advertise on and partner websites. Advertising makes up majority of Google’s revenue, so when Google decides to completely redo the customer-facing front end to it, it’s a big deal. The Dart team is proud to say that this new front end is built with Dart and Angular 2. Whenever you asked us whether Google is ‘even using Dart for anything,’ this is what we had in mind but couldn’t say aloud. Until now. We asked Joshy Joseph, the primary technical lead on the project, some questions. Joshy is focusing on things like infrastructure, application latency and development velocity, so he’s the right person to ask about Dart.Q: What exactly did we launch on Monday?It’s a complete redesign of the AdWords customer experience that is rolling out slowly as a test to a small initial set of advertisers. The most noticeable thing is probably the Material Design look and f…

Dart 1.12 Released, with Null-Aware Operators and more

Dart 1.12.0 is now released! It contains the new null-aware operators language feature, and enhancements to pub, Observatory, dartdoc, and much more.

Null-aware operators

The new null-aware operators help you reduce the amount of code required to work with references that are potentially null. This feature is a collection of syntactic sugar for traversing (potentially null) object calls, conditionally setting a variable, and evaluating two (potentially null) expressions.

Click or tap the red Run button below to see them in action.


  if null operator. `expr1 ?? expr2` evaluates to `expr1` if not `null`, otherwise `expr2`.


  null-aware assignment. `v ??= expr` causes `v` to be assigned `expr` only if `v` is `null`.


  null-aware access. `x?.p` evaluates to `x.p` if `x` is not `null`, otherwise evaluates to `null`.


  null-aware method invocation. `x?.m()` invokes `m` only if `x` is not `null`.

Learn more about Dart's null-aware operators in our Language Tour.

.packages fi…