Skip to content

Instantly share code, notes, and snippets.

@aredington
Created June 14, 2013 13:32
Show Gist options
  • Save aredington/5781826 to your computer and use it in GitHub Desktop.
Save aredington/5781826 to your computer and use it in GitHub Desktop.
Add (multiplex <readports>) and (broadcast <writeports>) to core.async

What

Add two functions, multiplex and broadcast, to core.async.

Multiplex is a variadic function of n arguments, returning a new single core.async read port which produces values by multiplexing together all of the provided read ports.

Broadcast is a variadic function of n arguments, returning a new single core.async write port which will write values by writing the values it receives to each of the provided write ports.

What'

Multiplex returns a read port which when read from parks until at least one of the read ports the new port was created with produces a value. When one of the creating ports has a value to read, it will be returned when the new port is read. If more than one of the creating ports is ready to read from, then reading from the created port will return a value from one of the ready ports non-deterministically.

Broadcast returns a write port, which when written to will write to each of the ports it was created with. If writing to any of the ports parks, the write to the broadcast port will park until the write can complete on each of the ports. Writes will be completed as the underlying write ports are ready to be written. If multiple write ports are ready to be written to, writes will be completed non-deterministicly.

Why

There are many use cases where it is desireable to either aggregate multiple streams of data, or to broadcast one value to many receivers. With the provided tools present in core.async today, it is not particularly difficult for consumers of the core.async library to implement this behavior themselves. However, the underlying implementation and finer details of how core.asyncs channels are implemented may not be readily apparent to all users of the library. A sub-optimal, but easily apparent solution many developers might reach for is to spawn individual threads to do the necessary work of multiplexing and broadcasting, undermining some of the value core.async offers. Adding 'canonical' implementations of these cases to core.async which make effective use of the implementation details can offer lower run-time cost solutions, as well as giving the library a more "Batteries included" feel to its users.

Is this too complex? I like to think it is not; the interfaces and return types are significantly simple. The implementation of both multiplex and broadcast as described should not make any assumptions about the nature of the underlying ports other than what is implied by the ReadPort and WritePort interfaces.

Is this as simple as possible? I think it is perhaps not; the functionality offered is available already by implementing against the ReadPort and WritePort protocols with reifications or user defined types; offering nothing to specifically address these use cases is arguably simpler than offering an implementation which addresses them.

How

Implement the ReadPort and WritePort protocols against types defined as collections of ReadPorts and WritePorts. Reading from a multiplex created ReadPort will use alt! to consume one value from its collection of ReadPorts and return it. Writing to a broadcast created WritePort will use alt! to write to one of its collection of WritePorts, discard that WritePort from a collection of pending WritePorts, and recursively continue until its collection of pending WritePorts is exhausted.

Writes against unbuffered write ports

An unbuffered write port participating as a member of a broadcast set will cause writes to that set to park until values are read from its corresponding read port. While this can potentially bog down a broadcast write load, doing otherwise creates a thorny batch of questions that need addressing about what to do in what cases. I think it would be appropriate to mention this concern in the docstrings for broadcast.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment