Skip to main content

New site for Dart news and articles

For the latest Dart news, visit our new blog at  https://medium.com/dartlang .

Unboxing Packages: async Part 3

We’ve covered individual values and we’ve covered streams, but there are still a few more goodies available in the async package. These don’t fit neatly in either bucket, but when you run into situations that call for them, they’re still plenty useful.

Wrappers

Just like the collection package, async provides a set of wrapper classes. Each of these classes implements a dart:async class and forwards all calls to an inner instance of that class. This makes it easy for users to provide customized versions of those classes. In fact, the async package itself uses some of its own wrappers.

There’s a DelegatingFuture class, of course. There’s also a DelegatingStreamSubscription, and wrappers for every kind of sink or consumer you can name: DelegatingSink, DelegatingEventSink, DelegatingStreamConsumer, and of course DelegatingStreamSink. Of these, DelegatingStreamSink is used most often since it encompasses all the functionality of the other classes.

/// A [StreamSink] that takes extra arguments for [close].
class WebSocketSink extends DelegatingStreamSink {
  final WebSocket _webSocket;

  WebSocketSink(WebSocket webSocket)
      : super(webSocket),
        _webSocket = webSocket;

  /// Closes the web socket connection with the given [code] and [reason].
  Future close([int code, String reason]) => _webSocket.close(code, reason);
}

You may notice that there’s no DelegatingStream. That’s because it already exists in the core libraries! It’s called StreamView, and it’s in dart:async. Its presence in the core libraries and its inconsistent name are historical accidents, and at some point we may add a DelegatingStream that just directly extends StreamView for consistency.

RestartableTimer

The RestartableTimer class is very small, but despite that I think it’s useful enough to warrant a mention here. It’s a subclass of Timer, and new RestartableTimer(), isActive, and cancel() all work just the same as the superclass.

The big difference is the all-new reset() method. This restarts the timer, with its original timer and callback. It’ll start counting down again whether or not it already fired, which means that the callback can be called more than once.

This is really useful for implementing a heartbeat—for example, in test, we use a RestartableTimer to time out a test if it doesn’t interact with the test framework for too long.

class Invoker {
  RestartableTimer _timeoutTimer;

  Invoker(...) {
    _timeoutTimer = new RestartableTimer(new Duration(seconds: 30), () {
      registerException("The test timed out.");
    });
  }

  // Functions like `expect()` call this to notify the test runner that the test
  // is still doing work. If it's not called for thirty seconds, the test times
  // out.
  void heartbeat() {
    _timeoutTimer.reset();
  }
}

Stream Sink Utilities

Conceptually, streams have two ends: the Stream object is where events are emitted, and it’s where most of the complexity lies, but for every Stream there’s a StreamSink behind the scenes feeding it events. If your code is the one in charge of that sink, you may need utilities for dealing with them, and the async package is ready to serve.

StreamSinkTransformer

The core dart:async library includes a StreamTransformer class, which provides an abstraction for transforming streams in a predefined way. The core libraries use it for converters for formats like JSON and UTF-8. But there’s no core library class for transforming StreamSinks, so async steps in to pick up the slack.

The StreamSinkTransformer class works almost exactly like StreamTransformer: you pass in a sink to bind(), and it returns a new sink that’s transformed according to the transformer’s internal logic. It also has a new StreamSinkTransformer.fromHandlers() transformer that’s just like new StreamTransformer.fromHandlers(): it invokes the handlers when the corresponding events are added to the transformed sink.

The one novel API is the new StreamSinkTransformer.fromStreamTransformer() constructor. This transforms a StreamTransformer into a corresponding SinkTransformer, which lets you take advantage of all the useful transformers in the core libraries.

main() async {
  // WebSocket implements both Stream and StreamSink. We want to communicate
  // using JSON, so we decode the streamed events and encode the events added to
  // the sink.
  var socket = await WebSocket.connect("ws://example.com");
  var stream = socket.transform(JSON.decoder);
  var sink = const StreamSinkTransformer.fromStreamTransformer(JSON.encoder)
      .bind(socket);

  // ...
}

StreamSinkCompleter

This complements a class, not from the core libraries, but from the async package itself. In my last article, I talked about StreamCompleter, which allows you to return a stream immediately when that stream is only actually generated later on. StreamSinkCompleter performs the same role, but for sinks.

The API is pretty much entirely parallel to StreamCompleter: the sink getter returns the sink that’s being completed, and the setDestinationSink() method sets the underlying implementation of that sink. There’s even a StreamSink.fromFuture() method that directly converts a Future<StreamSink> into a StreamSink.

There is a bit of extra subtlety with the setError() method, though. Sinks don’t emit events the same way streams do, but they do have a way of surfacing errors: the done future. So when you call setError(), it causes the sink to ignore all events that are added and emit the given error through done.

StreamQueue

StreamQueue may be my favorite of all the APIs in the async package. It straddles the boundary between streams and individual values by taking a stream—which operates by pushing events to callbacks—and exposing a pull-based API structured mostly around futures.

The simplest example is the next getter, which returns a future that completes to the next value in the stream. So the first time you call it, it returns the first event. Then it’ll return the second event, then the third, and so on. This is all true even if you call it multiple times before the first future completes—StreamQueue keeps track of which future events belong to which futures.

void main() async {
  var queue = new StreamQueue(new Stream.fromIterable([1, 2, 3]));
  var first = queue.next;
  var second = queue.next;
  var third = queue.next;
  print(await Future.wait([first, second, third])); // => [1, 2, 3]
}

If you call next and there turn out to be no events left in the stream, it’ll complete with a StateError. If you’re worried about this, you can check hasNext first to see if such a value exists.

There’s a take() method, which consumes a given number of events and returns a list containing them—or just all the remaining values, if there are fewer than the given number. There’s also skip(), which is like take() except that it ignores the values entirely.

If you have a StreamQueue and you want a stream instead, you can use the rest getter. This returns the portion of the underlying stream after all the reserved events. We can use it to re-write the firstAndRest() function from my last article:

/// Calls [onFirst] with the first event emitted by [source].
///
/// Returns a stream that emits all events in [source] after the first.
Stream firstAndRest(Stream source, void onFirst(value)) {
  var queue = new StreamQueue(source);
  queue.next.then(onFirst);
  return queue.rest;
}

Finally, when you’re done with a queue, you can call cancel(). This cancels the subscription to the underlying stream, but by default, it waits to do so until after the stream fires enough events to complete all outstanding futures from next and friends. However, if you pass immediate: true, it’ll cancel the subscription right away instead. The outstanding futures will complete as though the stream had closed.

StreamQueue is particularly useful for testing streams. It lets you write very straightforward code that just tests each event in series, rather than fiddling around explicit calls to Stream.listen(). For example, here’s a real test for CancelableOperation:

void main() {
  test("asStream() emits an error and then closes", () async {
    var completer = new CancelableCompleter();
    var queue = new StreamQueue(completer.operation.asStream());
    expect(queue.next, throwsA("error"));
    expect(queue.hasNext, isFalse);
    completer.completeError("error");
  });
}

In addition to writing great blog posts about cool packages, I maintain the test package, and one of my long-term plans is to introduce a set of stream matchers. These matchers will be heavily based on StreamQueue, since it’s so handy for moving through a stream event by event.

isEmpty

Three articles later, and I’m finally done showing off everything cool in the async package—at least until more cool stuff gets added! Master these tools and you’ll be well on your way to being an asynchrony expert. Maybe you’ll even find some unfilled needs, work up a solution, and send us a pull request!

Come back again in two weeks when I talk about a package that builds a brand new abstraction on top of async.