Message Systems in Programming: Part 6 of 7 – Streams

Streams

Streams are merely Array’s that emit change events through a callback when items are added to it.

That’s it.

People try to make them sound magical, complicated, or like some core savior technique of programming life. They’re not. They’ve been around long before me in networking code. They’re how Github visualizes different branches. They’re just Arrays that emit change events to a callback you can filter before you get the event. You don’t even have to use events; you could just use any value you want. Most implementations also play nice with Promises.

Streams are like Events, except you can use Array methods on them BEFORE the event is given to the callback handler, and listeners can pause further events from being emitted. They take the asynchronous flattening abilities of Promises, the filtering abilities of functional Arrays (think all those Array methods on Underscore), and the 1 to many broadcast abilities of Events. They have a flattening power of their own once you start merging streams together. When used with Promises, you can work around the race conditions much like you do for Events.

As a side note, Node’s current version of Streams at the time of this writing is actually based on EventEmitter.

For example, you’ll use a command to get the weather in JSON:

String url = 'http://api.openweathermap.org/data/2.5/weather?q=Richmond,VA&units=imperial';
EventStream links = new EventStream(HttpRequest.request(url, responseType: 'json').asStream());
links.listen((data)
{
	print(data.response);
});

In Dart using Frappe, you use listen to hear about when streams fire events. Here’s RxJS doing the same thing:

function getWeather()
{
    return $.get('http://api.openweathermap.org/data/2.5/weather?q=Richmond,VA&units=imperial')
    .promise();
}
var currentWeather = distinct.flatMapLatest(getWeather);
currentWeather.subscribe(function(data)
{
   	console.log(data);
});

And in Bacon.js, another rad, and less intimidating, JavaScript Stream library:

function getWeather()
{
    var promise = $.get('http://api.openweathermap.org/data/2.5/weather?q=Richmond,VA&units=imperial')
    .promise();
    return Bacon.fromPromise(promise);
}
getWeather().onValue(function(data)
{
	console.log(data);
});

Like addEventListener, you pass in a callback to fire when they happen. Like events, listen will pass in the parameter the Stream emits. While events have the convention to pass events as the single and only parameter, many who are used to smaller promises, or putting streaming code on top of existing promise code, more primitive values along with multiple values may be common, and that’s ok. JavaScript will work.

Strongly Typed Streams

Dart however will give you compile warnings if you don’t match the types. If you don’t use strong-typing and expect a certain type, you’ll get a runtime exception.

For example, this is good:

StreamController controller = new StreamController();
EventStream nameChanges = new EventStream(controller.stream);
nameChanges.listen((String someName)
{
	print("someName: $someName");
});
controller.add("Jesse");

This will give you a compiler warning:

StreamController controller = new StreamController();
EventStream nameChanges = new EventStream(controller.stream);
nameChanges.listen((int someName)
{
	print("someName: $someName");
});
controller.add("Jesse");

And this will compile just fine, yet blow up at runtime:

StreamController controller = new StreamController();
EventStream nameChanges = new EventStream(controller.stream);
nameChanges.listen((someName)
{
	int oldAge = 70 + someName;
	print("oldAge: $oldAge");
});
controller.add("Jesse");
//	Exception: Uncaught Error: type 'String' is not a subtype of type 'num' of 'other'.
//    Stack Trace:
//    #0      int.+ (dart:core-patch/integers.dart:15)

#inb4HaxeDoesntHaveThisProblem

The strong-typing helps both while coding, when you start merging/piping streams or using Composition. It also helps in helping the compiler create faster JavaScript at runtime. This helps when writing Dart for the client or for the server (just like JavaScript in the client and Node).

Streams Being ‘Done’ vs. Infinite

If you map your mind to Streams firing multiple times like an event, they are immediately familiar. Here’s Bacon creating an event stream (a Stream of Events, usually from something that spits out events from addEventListener type deal):

$('#someButton').asEventStream('click').subscribe(function(mouseEvent)
{
	console.log("mouseEvent:", mouseEvent);
});

“JQuery, find me this button hidden in the DOM soup, create a Stream from it’s click addEventListener, and let my anonymous function know about events as they are added in the Stream, kthxbai!”

Since Events work very similarly to Streams, most Streaming libraries have these kinds of converters to quickly make them work like Streams, yet retain the original Event interface. Lowers the barrier of entry.

Filtering

As previously mentioned unlike Events, you can filter them BEFORE you’re subscribe/listen function is fired. For example, when designing a game, I only want to know about events where a particular character is ready so I can redraw the UI:

initiativeStream.where((InitiativeEvent event)
{
	return event.type == InitiativeEvent.CHARACTER_READY;
})
.listen((event)
{
	print("character ready: ${event.character}");
});

You’ll notice I’m treating the initiativeStream here as an Array, using a where function. If you’re not familiar with uber functional Array stuff, check out Underscore’s definition of where or try your hand at the low-level implementations tutorial to understand why you even need or would want this stuff.

What if I didn’t have filtering abilities? You’d write crap like this:

initiativeStream.
.listen((event)
{
	if(event.type == InitiativeEvent.CHARACTER_READY)
	{
		print("character ready: ${event.character}");
	}
});

Bottom line, every time an initiative event is fired, only let me know about the ready events. Cool. However, anyone who has played RPG games know’s there is one issue with the above; I’ll hear about the Monster events as well. Again, without functional Array methods:

initiativeStream.
.listen((event)
{
	if(event.character is Player)
	{
		if(event.type == InitiativeEvent.CHARACTER_READY)
		{
			print("character ready: ${event.character}");
		}
	}
});

Machete don’t write nested ifs.

With functional Arrays, you can filter on filters. We’ll ensure it’s the correct event type, only from Players, and convert it so we get the player out since that’s all we care about:

initiativeStream.where((InitiativeEvent event)
{
	return event.type == InitiativeEvent.CHARACTER_READY;
})
.where((InitiativeEvent event)
{
	return event.character is Player;
})
.map((InitiativeEvent event)
{
	return event.character;
})
.listen((Player player)
{
	print("player is ready: ${character}");
});

Good stuff. Remember, though, like events, that listen callback will get called multiple times if more things come into the stream. The docs for the various libraries and runtimes are confusing in that many methods will go “when the stream is done”. Done is a strange word because if you never un-listen, is it REALLY done? The whole chain above will get executed if 5 seconds later when another IntiativeEvent is pumped through the system. In implementations like Dart’s, they actually can emit a ‘done’ type of event if you wish to know. However, for UI Streams like the Bacon one I gave above, it’s never really done because the user could click anytime they want.

Merging and Piping Streams

Many systems use Streams to link a series of inputs and outputs together with very little code. The most common are string operations on the command line in many Unix systems.

$ ps aux | grep conky | grep -v grep | awk '{print $2}' | xargs kill

To read that in Jesse Engrish, “List my running processes, throw that list to a search program called grep, and look for ‘conky’ within it, exclude all the lines that don’t match, use a program called awk to snag out the process ID (called a PID) I need from the string, and throw that PID at xargs kill which will stop the process from running. Notice in this case the pipe is literally a pipe vs. a “Stream.pipe()” function. You can imagine what happens if someone starts changing ‘conky’ and keeps re-running ps aux… continuously streaming pipes in hot action, sucka!

It should be noted that Node already has a way to pipe streams together which is more performant and shorter to write:

var errorsFromClient = getErrorPostStream();
var errorLogFile = fs.createWriteStream('error.log');
errorsFromClient.pipe(errorLogFile);

You’ll often “pipe” the data from one command to the next, whether manually doing it like I showed, or using the platform/libraries built in pipe command.

Streams can also indicate a “series of events”. As previously mentioned, Node.js uses streams much in this fashion using EventEmitters. They’re basically Events that have start, progress, and stopping points and are called Streams (api in flux at time of this writing).

Finally, Streams can be merged. One use case is when you’re building an orchestration on the Node server to have 4 API calls be 1 to make the client side code easier to write and not have to worry about authentication & parsing details. Another use case is flattening all the Events on a client that cause the same thing to happen.

For example, you know how when building a login form, either clicking the “Login” button will submit the form as well as hitting enter while within the password field? You can merge Streams to make that easier to read. Here’s a Bacon example:

var loginButton = $("#loginButton").asEventStream("click");
var loginField = $("#passwordField").asEventStream("click");
var loginStream = loginButton.merge(loginField);
loginStream.onValue(function()
{
	$.post('server.com/login');
});

The same technique can be used server-side or client side to flatten orchestrations when making many REST API calls. This example first loads the client’s location using a geoIP service (uses the server vs. navigator.geolocation) to identify the user’s latitude and longitude, ensures it’s not called more than every half a second, parses the result into a Point object, loads the weather passing in the latitude and longitude, and ensures the result isn’t the same as last time, else it ignores it:

GetLocation locationService = new GetLocation();
EventStream getWeatherFromGeoIP = new EventStream(locationService.asStream())
.debounce(new Duration(milliseconds: 500))
.map((HttpRequest request)
{
	return new Point(request.response.lat, request.response.lon);
})
.asyncMap((Point point)
{
	return getWeather(point.x, point.y);
})
.distinct();
getWeatherFromGeoIP.listen((num weather)
{
	print("It is $weather degrees.");
});

new Timer(new Duration(minutes: 1), ()
{
	locationService.getLocation();
});

If you didn’t catch it, check out line #10 (getWeather method) where the Stream supports asynchronous filters as well, specifically Promises. This is also an option where you can use Composition vs. Merging of Streams.

Here’s another example with a less linear approach where you can see different streams emitting events at different times, in this case, Monsters and Players attacking each other. One is controlled by AI, the other controlled by the user:

EventStream monsterAttacks = new EventStream(new StreamController().stream);
EventStream playerAttacks = new EventStream(new StreamController().stream);
EventStream attacks = monsterAttacks.merge(playerAttacks);
attacks.listen((AttackResult result)
{
	textDropper.addTextDrop(result.attackTarget, result.hitValue);
});

In the case of chat applications which will do a request/response in the background, but a real-time socket when you have the application active, like on your phone with HipChat. Two different application modes, your data comes out the same:

EventStream ajaxStream = new EventStream(new StreamController().stream);
EventStream socketStream = new EventStream(new StreamController().stream);
EventStream messages = ChatMessage.merge(socketStream);
messages.listen((ChatMessage message)
{
	querySelector("#chatField").text = message.text;
});

Note On Observables

While not always mentioned in some Stream API’s, an Observable is often included when describing Stream API’s. All they’re referring to is the Observer pattern applied over an Array, like a Collection. When you do an arrayInstance.push(), there are no change events. No one else knows that the Array changed unless you use a CustomEvent or pub sub to inform others that it changed. Thus, many use the Observer pattern to wrap Array’s to create Collections; similar API, except you know what changed, where, and how. Backbone’s Model and Collection classes are perfect examples of this.

Streams, however, represent the data coming into their Array IN ORDER, even when merged streams. This is why all the cray cray Functional Array stuff works on top, no matter how many Streams you use. Now, your listen function may fire only 1 time because your Array/Stream only has 1 data item. As that data “changes” however, you’ll get more events. Hence why they call it an Observable; somewhere deep in the API is the Observer pattern watching a piece of data, usually passed by ref if possible to emulate Promise state encapsulation.

You’ll also sometimes see Properties. These are sometimes like Observables depending on the library. Other times they’re like Promises that cache data.

Be aware of the context of Observable. This is one of the reasons Streams are so hard to grok because people often cite Observable in many different ways without defining what the hell it actually is. For example, the RX family calls Streams:

RxJS = Observables + Operators + Schedulers.

Except it’s not. That quote form the docs is actually talking about RX, not Streams. You could be easily fooled (I was) to think using a Stream library called RX would be defined as Observers, Operators, and Schedulers.

Streams ARE an Observable. Like the Observer pattern: “This thing changed, let the world know.” When you change a Stream, it lets the world know. You use functional Array methods to “operate” on that data. Operators == Functional Array Methods. And putting Schedulers there makes it sound like Streams are one third made up of these scheduler things. Except their not. It’s 1 plugin out of the 11 RX has… and that’s only for some advanced unit testing/functional testing mojo. Most of us ain’t going to use that.

If you think of a Stream as an Array that emits events when it changes, and thus is an Observer, you’ll get it. The 2 diverging extremes are “the Observable’s current value”, I.e. The latest value in the Stream. The other end of the spectrum is what I call them as; the Stream is just an Array of values, and you get them in order; so the Array’s an Observable. They’re both not technically correct because some Streaming libraries give you insane ballz control over Observables. Some like Frappe even go the Property route.

This is why specs exist after the innovation love has died down and developers have self-loathing when their toys start to get annoying.

Cold vs. Hot

When they start speaking really advanced about “cold vs. Hot”, don’t sweat it. All this means is something is Cold when it doesn’t dispatch events until someone listens. In a cold stream, if I click the mouse 50 times on a Hot button Stream, none of those Events are actually dispatched on the Stream until someone calls listen/subscribe. If it was a hot stream, she’d act like Events; the Stream would emit the events even if no one is listening.

Another example is reading text from a file. You won’t start receiving events for it until someone says, “Yo, read the this text file, and lemme know each line of text read as an event”. On demand type stuff like ajax calls, reading serialized data from window.localeStorage, querying a Model/Collection for the first time, etc.

Hot is the opposite, and works like Events; they’re fired whether anyone’s listening or not. This is for things that are happening whether you’re there or not. If you’re listener is late to the party, he’ll start getting new events from that point forward. If I click the mouse 50 times on a button Stream, and you only add a listen/subscribe callback after 25th has fired, you’ll only get 25 Events. Same as if you did 25 mouse clicks, then domElement.addEventListener(“click”, mouseCallback), then clicked 25 more times.

Dart streams are hot by default, whereas RX ones are cold unless you explicitly make it hot.

Stream Pros

The core goal with Streams is to reduce the amount of code you have to write, centralize the event handling, and filter the events all with Promise support. They do this by embracing functional Array filter methods, improving upon the Events API, support of Promises, with the ability to merge & pipe streams together.

Promises deal with 1 event, not many. Although you can chain and/or bubble the event forward, it’s still 1 specific event that starts each part of the chain. Streams deal with many events with many listeners.

Promises usually are a 1 time affair. With their caching, while you can restart the sequence with many Promises in a chain and it’s a lot faster, it’s still the same piece of data. Streams on the other hand represent new data each time an event occurs by default unless combined with Promises or Observables/Properties. For cold observables, this helps solve the Event race condition problem by using the fact Streams will not usually post events until someone has subscribed/listened. In the case of many, you can utilize Promises to help.

Most Event mechanisms are synchronous whereas Streams can support asynchronous operations as well, yet like Promises, be written in a synchronous way. While not 100% solving race conditions, it helps. Some API’s such as Dart’s provide options allowing the Stream to be synchronous or asynchronous. Event emitters can’t be paused by default, where in some Stream implementations, its assumed those who are listening to the streams can pause them. This means any consumer, anywhere, can pause the stream without having knowledge of what Stream their pausing, whether that Stream is merged or via Composition. Since you can get multiple instances, you can have multiple people with varying levels of control. With great power comes great responsibility. Those who use Streams are now responsible for managing this subscription. This is an additional instance that you must store and later destroy. Here’s an RX example:

var stream = getAStream();
var subscription = stream
  .subscribe(
    function (hotness) {
      console.log('Droppin it: ' + hotness);
    });
subscription.dispose();

Events make this simpler by just forcing everyone to get the emitter instance, and just manager the listeners directly from the emitter. Except in larger code bases, finding this instance isn’t so simple. Some frameworks in more dynamic languages will offload the listeners secretly through the Decorator pattern onto those who call addEventListener so that garbage collection can more easily deal with the references.

For larger applications, flattening the complexity with Streams is very similar to flattening a complex architecture through Composition and Facades. The difference with Streams is a common API to do so for both consumption of Events as well as the abilities to wire Streams together. In heavy GUI applications in Flash/Flex, we’d do this with Events, or some other messaging system like Signals.

They also scale better than Events in larger systems. As Stream / pub sub / Event systems grow, you start to run into the cognitive load challenge of keep track of all the different Event types and who dispatches them in your head including the data and types of data they contain. The difference with Streams is you have the ability to query both the data/Events you’re getting and operate on that data, all from a higher level structure. Using CSS classes and a DOM query syntax allows a more flexible UI. You can change & refactor things under the surface API, and the surface API still works. Streams are the same way. No matter what you modify, move around underneath, you can still have flexible queries up top to ensure your data is in the format you need when it arrives. For game developers this may seem like something horrible; deeply nested, disparate structures IN SUPPORT OF allowing disparate parts to flexibly talk to each other.

For applications developers on a deadline with ever changing API’s across different domains, however, different story. You can quickly see how “querying” your data and composing it into something more useful from smaller parts, all in synchronous looking code that has a small line count, sounds hot.

To be clear; this isn’t a panacea; a large code base still has a heavy cognitive load. Streams just help. Helping is good.

For unit testing purposes, it’s slightly easier in some cold Streams to write unit tests specifically because many Streams will not dispatch events until the 1st listener is subscribed. Some of this behavior is predictable in some API’s, thus you can write better tests to ensure the listener actually received and reacted on the API regardless of what transformations it and the data composing went through. This all continues to work regardless of how many Streams you merge.

Stream Cons

There is no spec right now. As such, they are quite confusing to learn. This also inadvertently makes them quite intimidating. Unless you have a game development, animation, or networking hardware background, visualizing how streams scale in larger systems can be quite hard. Bottom line, the lack of a spec means a lack of consensus on how these things are implemented. That’s really the only valid con for Streams, right now. Everything below is just nitpicking.

Streams also assume you already have good experience with Functional Array programming, and can answer why you’d use a map, where, and zip on the same Array… yet applied to Events. Wait, what? If you don’t, it can be hard to extricate what is part of a Stream, and what’s part of the library. You have to “get” why lodash/underscore are awesome libraries first before you can see how they can “improve” events.

Many also assume you already are comfortable with Promises. Many developers are comfortable with Promises, but still create nested Promise chains vs. flattening them. Most Streaming libraries embrace Promises to make dealing with asynchronous code in a multitude of events easier. This can be a real mind-bender. It takes time to get used to coding this way. If your teammates don’t, they’ll get mad when they need you debug the code if they’re not up to speed yet.

Managing, or rather, choosing NOT to manage Stream subscriptions can get you into trouble memory wise. Most Promises you see on the web don’t really show how to clean them up. Most Stream examples are the same way. Just be sure you realize their writing pseudo code to get the point across and assume you will do your due diligence by disposing of your subscriptions.

Lastly is the merge/pipe mania. Many will use Stream features simply because they a Stream features. New, fresh, shiny, hot. Once they get bored, they’re just another way of doing events or async Promises. Be wary of people who want to merge/pipe all the things. If they go all Katamari Damacy with Streams attempting to eat and consume MVC, you know they’ve jumped the shark. That said, I encourage you to do this in your spare time off of project/client work. The various libraries approach things differently so it’s neat to learn those differences.

<< Part 5 – Promise Deferred | Part 7 – Conclusions >>