Skip to main content

Unboxing Packages: async Part 2

Two weeks ago, I introduced you to some of the marvels available for use in the async package. But that package is so big—there are so many marvels—that I couldn’t fit them all in. In fact, I only ended up writing about APIs that deal with futures and the individual values they represent.

This week I’ll focus on the other major asynchronous type: Stream. I often find it useful to think of asynchronous types as analogous to synchronous ones, and by that metaphor if futures are individual values then streams are Iterables. But in addition to representing asynchronously-computed collections of data, streams can represent things like UI events or communication (for example over a WebSocket).

Dart’s Stream class is very powerful. In addition to dispatching events, it allows the user to pause or cancel their subscription to the stream. What’s more, the creator of the stream can be notified of pauses or cancellations (or the initial subscriptions) and take appropriate action—like closing an HTTP connection when its data stream is canceled. Building this logic into the core types makes it easy to do the right thing with streams without the author having to think about it at all.

For example, Stream.first automatically cancels its subscription once the first event arrives. So if you call WebSocket.first, it’ll close the underlying connection once you have the event you need.

The other side of this power, though, is that you need to take extra care when writing code that manipulates streams to ensure that you handle pausing and canceling correctly. If your stream transformer doesn’t forward cancellations properly, you might end up with dangling WebSocket connections, which is bad news all around.

That’s why async’s stream utilities are so useful. Not only do they help you manipulate your streams, they make sure all your listens, pauses, and cancellations are handled exactly right. I call this being cancel-correct. This gives you, the developer, freedom to hook up components as you need without worrying about any extra complexity.

LazyStream

I mentioned briefly that stream creators can be notified when an initial stream subscription is created. You can pass an onListen argument to new StreamController() to only start emitting events once a listener exists. And usually that’s enough—but sometimes you aren’t the one producing the original events, and you don’t want to have to manually forward them to the controller. That’s what LazyStream is for.

When you call new LazyStream(), you pass in a callback that returns a Stream (or a Future<Stream>). This callback will only be invoked once the LazyStream.listen is called, and its events will automatically be piped to the listen handlers. Most of the time it won’t even create an intermediate StreamController, so it’s extra efficient, too.

/// Returns the contents of the getting [url].
///
/// Only starts the request when the returned stream is listened to.
Stream<List<int>> httpStream(Uri url) {
  return new LazyStream<List<int>>(() async {
    var client = new HttpClient();
    var request = await client.getUrl(Uri.parse(url));
    return await request.close();
  });
}

StreamCompleter

If you’ve done much asynchronous programming in Dart, you’re doubtless familiar with the Completer class. It lets you return a future immediately but only fill in its result later on when whatever asynchronous work you’re doing is finished. Well, a StreamCompleter is pretty much the same thing for streams.

StreamCompleter.stream returns a stream immediately, just like Completer.future. But this stream doesn’t emit any events until you call setSourceStream(), which—like Completer.complete()—provides the concrete value. All events from the source stream are forwarded to the output stream, and of course pauses and cancellations are passed back to the source.

What if you fail to get the source stream? Call setError() instead! Like Completer.completeError, this indicates that the stream completion was unsuccessful. In a more practical sense, it makes the output stream emit the error and then close immediately.

The whole process of getting a stream’s value asynchronously sounds a lot like a Future<Stream>. There are even a few places, like WebSocket.connect(), which return such futures in practice. That’s why the StreamCompleter.fromFuture() static utility function exists. It just takes a Future<Stream> and returns a Stream:

Stream fromFuture(Future<Stream> streamFuture) {
  var completer = new StreamCompleter();
  streamFuture.then(completer.setSourceStream,
      onError: completer.setError);
  return completer.stream;
}

In some cases—especially when you want to use async/await—it’s way easier to just use fromFuture() than to manually deal with a StreamCompleter yourself.

StreamGroup

I think of StreamGroup kind of like a funnel: it takes a bunch of different input streams and merges them all into a single output stream. Like FutureGroup, which I wrote about last time, it implements Sink. This means that you call add() to add new streams to the group, and close() once you’ve added all the streams that need adding.

The events from the input streams are piped into the output stream, which is accessible by calling stream. This stream will only close once all the input streams close and close() is called, because otherwise more streams may still be added later on. And of course, it’s cancel-correct: it only listens to the source streams once the output stream has a listener, and if the output stream is paused or canceled then all the source streams are as well.

class Phase {
  final _transformers = <Transformer, TranformerRunner>{};

  /// A broadcast stream of all log events from all transformers in this phase.
  Stream<LogEvent> get onLog => _onLogGroup.stream;
  final _onLogGroup = new StreamGroup<LogEvent>.broadcast();

  void addTransformer(Transformer transformer) {
    var runner = new TransformerRunner(transformer);
    _transformers[transformer] = runner;
    _onLogGroup.add(runner.onLog);
  }
}

If you don’t care about adding streams later on, you can also use the StreamGroup.merge() static utility function. This just takes an Iterable<Stream> and merges all the streams immediately. It’s the same as adding them all to a StreamGroup and closing it, but it’s a lot cleaner than doing that manually. That’s why we call it a utility function!

StreamSplitter

If StreamGroup is a funnel, then StreamSplitter is a sprinkler: it takes a single source stream and splits it into a bunch of identical copies. Each copy independently emits the same events as the source stream, with its own buffering and everything. There’s no way at all for actions on one copy to affect any others.

A splitter is created by passing the source stream to new StreamSplitter(), and copies are created by calling split(). Once you don’t need any more copies, call close() to let the splitter know.

Closing the splitter lets it maintain cancel-correctness. Before it’s closed, it can never safely cancel or pause the source stream, since a new copy could be created that might need additional events. But once it’s closed, the source stream can be canceled as soon as all of the copies are canceled.

Like StreamGroup.merge(), there’s a handy utility method for splitting streams: StreamSplitter.splitFrom(). This takes a source stream and splits it into a set number of copies—two by default, but it can be any number you want. I use it pretty frequently when I’m debugging to see what a stream is emitting without affecting its normal usage.

SubscriptionStream

Some APIs are made for end-users to use directly in their applications, and some are made for implementing other APIs. The SubscriptionStream class falls squarely into the latter category. It’s used internally in the async package itself, and it’s good to know about in case you ever need to implement some asynchronous utilities yourself.

When you call Stream.listen() to subscribe to a stream, you get a StreamSubscription back. This is usually just used to pause and cancel events, but it can also be used to replace the event handlers by calling methods like onData()1. SubscriptionStream takes advantage of this capability to convert a subscription into a brand new stream that can itself be listened to.

When you pass a subscription into new SubscriptionStream(), its old event handlers are removed and it’s paused. This means any additional events are buffered until the SubscriptionStream.listen() is called. Once it is, it sets the new handlers on the original subscription and just returns that. Most of the time it doesn’t even create any extra intermediate objects!

/// Calls [onFirst] with the first event emitted by [source].
///
/// Returns a stream that emits all events in [source] after the first.
Stream firstAndRest(Stream source, void onFirst(value)) {
  var completer = new StreamCompleter();
  var subscription;
  subscription = source.listen((event) {
    onFirst(event);
    completer.setSourceStream(new SubscriptionStream(subscription));
  });
  return completer.stream;
}

await last

This about covers the async package’s stream APIs, but there are other APIs still to come. The package has a lot of cool stuff! Join me again in two weeks when I cover everything else, sinks and queues and timers and all.


  1. This method should really be called setOnData(), or else should be a setter with a corresponding getter, but it’s hard to change APIs in the core libraries.

Popular posts from this blog

Dart in 2016: The fastest growing programming language at Google, 2nd fastest growing in TIOBE Index

Dart was the fastest growing programming language at Google in 2016 with millions of lines of code written. It also made it to TIOBE Index Top 20 this month (see TIOBE's methodology).

It takes time to build something as ambitious as Dart and, in some ways, Dart is still in its infancy. But we're glad the hard work is starting to pay off.

Many thanks to our amazing community!

We're going to celebrate by ... releasing 1.22 next week (as per our usual 6 week release schedule).

The new Google AdSense user interface: built with AngularDart

AdSense is a free, simple way to earn money by placing ads on your website. The team just launched a completely new version of their app for publishers. Read all about it here. We asked Daniel White, the tech lead for the project, some questions because the new UI happens to be built with Dart and Angular2.


AdSense launched way back in 2003. How long is it since the last big redesign?
Last big redesign was called ‘AdSense 3’ and launched about 6 years ago. It was written in Google Web Toolkit (GWT) and the UI has evolved through several iterations - but this is the first ground-up redesign in 6 years. There are a number of long-standing UX issues that we’ve taken the opportunity to solve. A big shout-out to our UX team who’ve been 100% behind this project. We couldn’t have done it without them!

How many software engineers worked on the project?
Purely on the AdSense applications, we have a team of close to 100. Around 25% of them write Dart.

How many lines of code?
We have around 160K LO…

A stronger Dart for everyone

We are constantly asking ourselves:
How do we make developers even more productive when writing Dart apps? We believe that a critical part of the answer to this question is to make strongmode – a sound static type system for Dart – the standard for all Dart developers.

Teams that use Dart to build apps like Soundtrap, AdWords, AdSense, and Greentea say they really enjoy using strong mode features, such as early error detection. In fact, teams that have switched completely to strong mode cite not only early error detection but also better code readability and maintainability as major benefits. We hear this both from small teams and – even more so – from large teams with hundreds of developers writing and maintaining millions of lines of Dart code. As Björn Sperber from Soundtrap says,
Strong mode and the smooth integration with IntelliJ is a joy to use and a huge improvement. If you’ve tried out Flutter, you’ve already used strong mode checks from the Dart analyzer.

Given the benefits …