Skip to main content

Unboxing Packages: async Part 3

We’ve covered individual values and we’ve covered streams, but there are still a few more goodies available in the async package. These don’t fit neatly in either bucket, but when you run into situations that call for them, they’re still plenty useful.

Wrappers

Just like the collection package, async provides a set of wrapper classes. Each of these classes implements a dart:async class and forwards all calls to an inner instance of that class. This makes it easy for users to provide customized versions of those classes. In fact, the async package itself uses some of its own wrappers.

There’s a DelegatingFuture class, of course. There’s also a DelegatingStreamSubscription, and wrappers for every kind of sink or consumer you can name: DelegatingSink, DelegatingEventSink, DelegatingStreamConsumer, and of course DelegatingStreamSink. Of these, DelegatingStreamSink is used most often since it encompasses all the functionality of the other classes.

/// A [StreamSink] that takes extra arguments for [close].
class WebSocketSink extends DelegatingStreamSink {
  final WebSocket _webSocket;

  WebSocketSink(WebSocket webSocket)
      : super(webSocket),
        _webSocket = webSocket;

  /// Closes the web socket connection with the given [code] and [reason].
  Future close([int code, String reason]) => _webSocket.close(code, reason);
}

You may notice that there’s no DelegatingStream. That’s because it already exists in the core libraries! It’s called StreamView, and it’s in dart:async. Its presence in the core libraries and its inconsistent name are historical accidents, and at some point we may add a DelegatingStream that just directly extends StreamView for consistency.

RestartableTimer

The RestartableTimer class is very small, but despite that I think it’s useful enough to warrant a mention here. It’s a subclass of Timer, and new RestartableTimer(), isActive, and cancel() all work just the same as the superclass.

The big difference is the all-new reset() method. This restarts the timer, with its original timer and callback. It’ll start counting down again whether or not it already fired, which means that the callback can be called more than once.

This is really useful for implementing a heartbeat—for example, in test, we use a RestartableTimer to time out a test if it doesn’t interact with the test framework for too long.

class Invoker {
  RestartableTimer _timeoutTimer;

  Invoker(...) {
    _timeoutTimer = new RestartableTimer(new Duration(seconds: 30), () {
      registerException("The test timed out.");
    });
  }

  // Functions like `expect()` call this to notify the test runner that the test
  // is still doing work. If it's not called for thirty seconds, the test times
  // out.
  void heartbeat() {
    _timeoutTimer.reset();
  }
}

Stream Sink Utilities

Conceptually, streams have two ends: the Stream object is where events are emitted, and it’s where most of the complexity lies, but for every Stream there’s a StreamSink behind the scenes feeding it events. If your code is the one in charge of that sink, you may need utilities for dealing with them, and the async package is ready to serve.

StreamSinkTransformer

The core dart:async library includes a StreamTransformer class, which provides an abstraction for transforming streams in a predefined way. The core libraries use it for converters for formats like JSON and UTF-8. But there’s no core library class for transforming StreamSinks, so async steps in to pick up the slack.

The StreamSinkTransformer class works almost exactly like StreamTransformer: you pass in a sink to bind(), and it returns a new sink that’s transformed according to the transformer’s internal logic. It also has a new StreamSinkTransformer.fromHandlers() transformer that’s just like new StreamTransformer.fromHandlers(): it invokes the handlers when the corresponding events are added to the transformed sink.

The one novel API is the new StreamSinkTransformer.fromStreamTransformer() constructor. This transforms a StreamTransformer into a corresponding SinkTransformer, which lets you take advantage of all the useful transformers in the core libraries.

main() async {
  // WebSocket implements both Stream and StreamSink. We want to communicate
  // using JSON, so we decode the streamed events and encode the events added to
  // the sink.
  var socket = await WebSocket.connect("ws://example.com");
  var stream = socket.transform(JSON.decoder);
  var sink = const StreamSinkTransformer.fromStreamTransformer(JSON.encoder)
      .bind(socket);

  // ...
}

StreamSinkCompleter

This complements a class, not from the core libraries, but from the async package itself. In my last article, I talked about StreamCompleter, which allows you to return a stream immediately when that stream is only actually generated later on. StreamSinkCompleter performs the same role, but for sinks.

The API is pretty much entirely parallel to StreamCompleter: the sink getter returns the sink that’s being completed, and the setDestinationSink() method sets the underlying implementation of that sink. There’s even a StreamSink.fromFuture() method that directly converts a Future<StreamSink> into a StreamSink.

There is a bit of extra subtlety with the setError() method, though. Sinks don’t emit events the same way streams do, but they do have a way of surfacing errors: the done future. So when you call setError(), it causes the sink to ignore all events that are added and emit the given error through done.

StreamQueue

StreamQueue may be my favorite of all the APIs in the async package. It straddles the boundary between streams and individual values by taking a stream—which operates by pushing events to callbacks—and exposing a pull-based API structured mostly around futures.

The simplest example is the next getter, which returns a future that completes to the next value in the stream. So the first time you call it, it returns the first event. Then it’ll return the second event, then the third, and so on. This is all true even if you call it multiple times before the first future completes—StreamQueue keeps track of which future events belong to which futures.

void main() async {
  var queue = new StreamQueue(new Stream.fromIterable([1, 2, 3]));
  var first = queue.next;
  var second = queue.next;
  var third = queue.next;
  print(await Future.wait([first, second, third])); // => [1, 2, 3]
}

If you call next and there turn out to be no events left in the stream, it’ll complete with a StateError. If you’re worried about this, you can check hasNext first to see if such a value exists.

There’s a take() method, which consumes a given number of events and returns a list containing them—or just all the remaining values, if there are fewer than the given number. There’s also skip(), which is like take() except that it ignores the values entirely.

If you have a StreamQueue and you want a stream instead, you can use the rest getter. This returns the portion of the underlying stream after all the reserved events. We can use it to re-write the firstAndRest() function from my last article:

/// Calls [onFirst] with the first event emitted by [source].
///
/// Returns a stream that emits all events in [source] after the first.
Stream firstAndRest(Stream source, void onFirst(value)) {
  var queue = new StreamQueue(source);
  queue.next.then(onFirst);
  return queue.rest;
}

Finally, when you’re done with a queue, you can call cancel(). This cancels the subscription to the underlying stream, but by default, it waits to do so until after the stream fires enough events to complete all outstanding futures from next and friends. However, if you pass immediate: true, it’ll cancel the subscription right away instead. The outstanding futures will complete as though the stream had closed.

StreamQueue is particularly useful for testing streams. It lets you write very straightforward code that just tests each event in series, rather than fiddling around explicit calls to Stream.listen(). For example, here’s a real test for CancelableOperation:

void main() {
  test("asStream() emits an error and then closes", () async {
    var completer = new CancelableCompleter();
    var queue = new StreamQueue(completer.operation.asStream());
    expect(queue.next, throwsA("error"));
    expect(queue.hasNext, isFalse);
    completer.completeError("error");
  });
}

In addition to writing great blog posts about cool packages, I maintain the test package, and one of my long-term plans is to introduce a set of stream matchers. These matchers will be heavily based on StreamQueue, since it’s so handy for moving through a stream event by event.

isEmpty

Three articles later, and I’m finally done showing off everything cool in the async package—at least until more cool stuff gets added! Master these tools and you’ll be well on your way to being an asynchrony expert. Maybe you’ll even find some unfilled needs, work up a solution, and send us a pull request!

Come back again in two weeks when I talk about a package that builds a brand new abstraction on top of async.

Popular posts from this blog

Dart in 2016: The fastest growing programming language at Google, 2nd fastest growing in TIOBE Index

Dart was the fastest growing programming language at Google in 2016 with millions of lines of code written. It also made it to TIOBE Index Top 20 this month (see TIOBE's methodology ). It takes time to build something as ambitious as Dart and, in some ways, Dart is still in its infancy. But we're glad the hard work is starting to pay off. Many thanks to our amazing community! We're going to celebrate by ... releasing 1.22 next week (as per our usual 6 week release schedule).

AngularDart is going all Dart

Until now, the multiple language flavors of Angular 2 were written as TypeScript source, and then automatically compiled to both JavaScript and Dart. We're happy to announce that we’re splitting the Angular 2 codebase into two flavors – a Dart version and a TypeScript/JavaScript version – and creating a dedicated AngularDart team. This is amazing news for Dart developers because: The framework will feel more like idiomatic Dart. It will make use of Dart features that couldn't work with the TypeScript flavor. It will be faster. This is equally great news for our TypeScript and JavaScript developers, by the way. Cleaner API, performance gains, easier contributions. Read more on the Angular blog. Angular 2 for Dart is used by many teams at Google. Most famously by the AdWords team, but many other Google teams build large, mobile-friendly web apps. Some of the top requests from these teams were: make the API feel like Dart, provide a faster edit-refresh cycle, and

The new AdWords UI uses Dart — we asked why

Google just announced a re-designed AdWords experience. In case you’re not familiar with AdWords: businesses use it to advertise on google.com and partner websites. Advertising makes up majority of Google’s revenue, so when Google decides to completely redo the customer-facing front end to it, it’s a big deal. The Dart team is proud to say that this new front end is built with Dart and Angular 2. Whenever you asked us whether Google is ‘even using Dart for anything,’ this is what we had in mind but couldn’t say aloud. Until now. We asked Joshy Joseph , the primary technical lead on the project, some questions. Joshy is focusing on things like infrastructure, application latency and development velocity, so he’s the right person to ask about Dart.   Q: What exactly did we launch on Monday? It’s a complete redesign of the AdWords customer experience that is rolling out slowly as a test to a small initial set of advertisers. The most noticeable thing is probably the Material