Skip to main content

New site for Dart news and articles

For the latest Dart news, visit our new blog at  https://medium.com/dartlang .

Unboxing Packages: stream_channel

The stream_channel package is the youngest I’ve written about so far—the first version was published on 28 January 2016, only three months ago as I write this! But despite its youth, it fills a very important role in the Dart ecosystem by providing a common abstraction for two-way communication.

In my article on source_span, I wrote about how important it is for a package ecosystem to provide common conventions that can be used throughout the language. stream_channel is another great example of that. The core API it provides is extremely simple, just two getters and a set of rules for them to follow, but the ability for Dart code to implement protocols independent of the underlying implementation is profound.

abstract class StreamChannel<T> {
  Stream<T> get stream;
  StreamSink<T> get sink;
}

The test package uses StreamChannel to implement a protocol for running tests that works whether the tests are in an isolate, a separate process, or even an iframe in a browser window. The web_socket_channel package uses it to define a common API for WebSockets that works the same on all platforms. And having a common API means that it’s also possible to create common utility classes in the style of the async package.

The Rules

As nice and simple as the API is, it’s important to note that there’s more to being a stream channel. A channel is logically a single entity, which means that the two APIs it exposes—its stream and its sink—have to work in concert with one another. So there are a set of rules that all valid implementations of StreamChannel must follow. These rules are designed to make it easy for users to interact with stream channels without leaving resources dangling or errors unhandled.

They also model the behavior of the dart:io WebSocket class almost exactly. Since it’s the only core library API that provides a connected stream and sink, it inspired a lot of the initial stream channel design.

Single-Subscription

The core SDK defines two types of streams. The listen() method may be called any number of times for broadcast streams, but only once for single-subscription streams. The first rule of stream channels is that the stream must be single-subscription. This is the default for streams, so most users will assume it unless stated otherwise, but it’s important to be explicit.

Stream Closure

Most of the rules have to do with the stream and/or the sink closing. This is no coincidence: the moment a channel is closed is one of the highest-risk times for logic errors, because resources can fail to be freed and events can happen in unexpected orders. The second rule addresses this: once the sink is closed, the stream closes without emitting any more events.

This means that the callback passed to stream.listen() won’t ever be called once sink.close() is called. It eliminates a whole class of potential bugs where an event could sneak in before the underlying channel was fully closed.

Sink Closure

The third rule is the inverse of the second: once the stream closes, the sink silently drops all events. This means that if your code receives an onDone event from the stream, it doesn’t need to do any extra work to avoid calling sink methods; they just automatically don’t do anything, guaranteed.

This and the stream closure rule work together to ensure that both components of the channel agree about whether it’s open or closed. Having a single canonical state makes channels more straightforward to work with and reason about, and give the user a consistent way to control and react to that state.

Subscription Canceling

The fourth rule is unusual in that it’s about a connection that shouldn’t exist between the stream and the sink. It says that canceling the stream’s subscription has no effect on the sink. This means that the only way for the channel to be closed locally is by calling sink.close()—code that’s dealing with the sink can be confident that only it (and the remote endpoint) is in charge of the channel’s connection.

Error Bouncing

The fifth rule only applies to channels that don’t have a way of transmitting arbitrary errors to the remote endpoint. If a channel can’t transmit an error, it closes and forwards the error to the sink—in particular, to the sink.done future. Errors sent to a channel that can’t handle them are probably caused by bugs in the program, and forwarding them to done makes it possible to handle them without making them look like events from the remote endpoint.

Early Closure

The sixth and final rule is less of a requirement and more of a guideline. If the stream closes before it has a listener, the sink should silently drop all events if possible, but only if possible. This is tricky because the connection may not be established at all until the stream has a listener, and if it’s not established there might be no way to tell when the stream closes. But if it is possible, this ensures that no events are sent over a channel that the user expects to be closed.

Premade Stream Channels

Even in the few short months that stream_channel has existed, there are already a few classes that implement the interface to wrap commonly-used two-way communication channels.

IsolateChannel

The IsolateChannel class is part of the stream_channel package. Its default constructor, new IsolateChannel(), simply wraps an existing ReceivePort and a SendPort in a stream channel.

That’s great if you already have both ports, but what if you’re establishing the initial connection? Anyone who’s written a bunch of isolate code knows what a pain it is to correctly do the dance of sending a port that sends back another port to establish a two-way connection. To make that easier, IsolateChannel provides two utility constructors. new IsolateChannel.connectReceive() takes a ReceivePort, and new IsolateChannel.connectSend() takes the attached SendPort. They then use an internal protocol to connect ports going the other direction so you don’t have to worry about it.

/// Spawns a worker isolate and returns a [StreamChannel] for communicating with
/// it.
Future<StreamChannel> spawnWorker() async {
  var port = new ReceivePort();
  var isolate = await Isolate.spawn(worker, port.sendPort);
  return new IsolateChannel.connectReceive(port);
}

/// The entrypoint for the worker isolate.
void worker(SendPort port) {
  var channel = new IsolateChannel.connectSend(port);
  /// ...
}

WebSocketChannel

This isn’t strictly part of the stream_channel package, but the WebSocketChannel class defined in the web_socket_channel package is another great example of a stream channel. It has two concrete implementations: IOWebSocketChannel wraps the WebSocket class from dart:io, whereas HtmlWebSocketChannel wraps the one from dart:html.

Both implementations have one constructor that wraps the underlying class, as well as another that opens a new connection. Otherwise, they provide pretty much the same API (with one platform-specific getter).

WebSocketChannel is particularly interesting because it’s not just a vanilla StreamChannel. It provides a few additional APIs: protocol includes the meta-protocol if one was negotiated, and if the socket is closed by the remote end point closeCode and closeReason indicate why. Its sink is also a custom subclass whose close() method allows the user to specify their own code and reason.

Creating New Stream Channels

If you need a stream channel for a different kind of underlying communication channel, you may need to create your own. You could just call new StreamChannel() with a stream and a sink, but be careful: if your stream and sink don’t satisfy the rules I described above, you’re liable to run into some really tricky bugs.

The new StreamChannel.withGuarantees() constructor is much safer, at the cost of providing some extra layers of wrapping. It ensures that, regardless of the behavior of the stream and sink that are passed in, the ones exposed by the channel satisfy all the rules.

StreamChannelController

If you don’t have a preexisting stream and sink, you can create a channel from scratch using the StreamChannelController class. This exposes two stream channels. The code managing the controller interacts directly with the local channel. This is connected to the foreign channel, which is meant to be returned to be used by external code.

/// Returns a [StreamChannel] that communicates over [port].
StreamChannel messagePortChannel(MessagePort port) {
  var controller = new StreamChannelController(allowForeignErrors: false);

  // Pipe all events from the message port into the local sink...
  port.listen((message) => controller.local.sink.add(message.data));

  // ...and all events from the local stream into the send port.
  controller.local.stream.listen(port.postMessage, onDone: port.close);

  // Then return the foreign controller for your users to use.
  return controller.foreign;
}

StreamChannelController automatically ensures that the stream channel rules are satisfied for both the local and remote channels. The allowForeignErrors parameter to the constructor controls how error bouncing is handled. By default, errors are passed straight from the foreign channel to the local one. But if there’s no way to deal with those errors, allowForeignErrors: false can be passed to forward those errors to foreign.sink.done instead.

MultiChannel

I’ll finish this article by talking about one of the coolest stream channel utility classes. MultiChannel allows multiple independent virtual channels to communicate over a single underlying channel—it’s similar to having multiple SendPorts all communicating with different parts of the same isolate, but it works across any channel at all.

The MultiChannel is itself a stream channel, and it’s usually used to establish the initial connection. But the most important part of the class is its virtual channels, which are created using the virtualChannel() method. Each VirtualChannel provides an opaque id that the remote endpoint can pass to virtualChannel() to create its own virtual channel connected to the local one.

/// Serializes [test] into a JSON-safe map.
Map serializeTest(MultiChannel channel, Test test) {
  // Create a virtual channel for the test so that the remote endpoint can tell
  // us to run it.
  var testChannel = channel.virtualChannel();
  testChannel.stream.listen((message) async {
    assert(message['command'] == 'run');
    testChannel.add({"result": await test.run()});
  });

  return {
    "type": "test",
    "name": test.name,
    "channel": testChannel.id
  };
}

/// Deserializes [test] into a concrete [Test] class.
Test deserializeTest(MultiChannel channel, Map test) {
  // Create a virtual channel connected to the one created in [serializeTest].
  var testChannel = channel.virtualChannel(test['channel']);
  return new Test(test['name'], testChannel);
}

The underlying stream is only closed once the initial MultiChannel and all virtual channels are closed. This lets the channels remain fully independent, but it also means it’s important to be scrupulous about closing them when their job is done.

Sink or Stream

This isn’t quite everything in the stream_channel package, but it’s all the most important parts. You’ll just have to check out the API docs for the rest! And next time you need two-way communication, you know where to look.

Join me next week when I talk about a package that’s built on top of stream_channel.