Tuesday, March 11, 2014

Channels, pipes, connections, and multiplexors

This post is the second in a series to describe how we adapted the CSP style of programming used in Go to the use of external pipes, files, network connections, and related artefacts.
This was the first post. In what follows we refer as "pipe" to any external file-descriptor-like artefact used to reach the external world (for input or output).


Connections
The nchan package defines a connection as

 type Conn struct {
  Tag string // debug
  In  <-chan []byte
  Out chan<- []byte
 }

This joins two channels to make a full-duplex connection. A process talking to an external entity relies on this structure to bridge the system pipe used to a pair of channels. There are utilies that leverage the func- tions described in the previous section and build a channel interface to external pipes, for example:

 func NewConn(rw io.ReadWriteCloser, nbuf int, win, wout chan bool) Conn

The function creates processes to feed and drain the connection channels from and to the external pipe. Fur- thermore, if rw supports closing only for reading or writing, a close on the input or output channels would close the respective halves of the pipe. Because of the message protocol explained in the previous section, errors are also propagated across the external pipe and the process using the connection can very much ignore that the source/sink of data is external.
It is easy to build pipes where the Out channel sends elements through the In channel:

        func NewPipe(nbuf int) Conn

And, using this, we can create in-memory connections that do not leave the process space:

 func NewConnPipe(nbuf int) (Conn, Conn)

This has been very useful during testing, because this connection can be created with no buffering and it is easier to spot dead-locks that involve both ends of the connection. Once the program is ready, we can replace the connection based pipe with an actual system provided pipe.

Multiplexors
Upon the channel based connections shown in the previous sections, the nchan package provides multiplex- ors.

 type Mux struct {
  In chan Conn
  ...
 }
 func NewMux(c Conn, iscaller bool) *Mux
 func (m *Mux) Close(err error)
 func (m *Mux) Out() chan<- []byte
 func (m *Mux) Rpc() (outc chan<- []byte, repc <-chan []byte)

A program speaking a protocol usually creates a new Conn connection by dialing or accepting connections and then creates a Mux by calling NewMux to multiplex the connection among multiple requests.



The nice thing of the multiplexed connection is that requests may carry a series of messages (and not just one message per request) and may or not have replies. Replies may also be a full series of messages. Both ends of a multiplexed connetion (the process using the mux and its peer at the other end of the pipe) may issue requests. Thus, this is not a client-server interaction model, although it may be used as such.
To issue new outgoing requests through the multiplexor, the process calls Out (to issue requests with no expected reply):

 oc := mux.Out()
 oc <- []byte("no reply")
 oc <- []byte("expected")
 close(oc)

Or the process may call Rpc (to issue requests with an expected reply).

 rc, rr := mux.Rpc()
 rc <- []byte("another")
 rc <- []byte("request")
 close(rc)
 for m := range rr {
  Printf("got %v as part of the reply\en", m)
 }
 Printf("and the final error status is %v\en", cerror(rr))

In the first case, the multiplexor returns a Conn to the caller with just the Out channel. Of course, this can be done multiple times to issue several concurrent outgoing requests:



In the figure, the two connections of the left were built by two calls to mux.Out(), which returns a Conn with an Out chan to issue requests. The process using the Out channel may issue as many messages as desired and then close the channel.
If the request depicted below requires a reply, mux.Rpc() is be called finstead of mux.Out() and the resulting picture is as shown.


The important part is that messages (and replies) sent as part of a request (or reply) may be streamed with- out affecting other requests and replies, other than by the usage of the underlying connection. That is, an idle stream does not block other streams.
The interface for the receiving part of the multiplexor is a single In channel that conveys one Conn per incoming request. The request has only the In channel if no reply is expected, and has both the In and Out channels set if a reply is expected.


To receive requests from the other end of the pipe, the code might look like this:

for call := range mux.In {
// call is a Conn
for m := range call.In {
Printf("got %v as part of the request\en", m)
}
if call.Out != nil {
call.Out <- []byte("a reply")
call.Out <- []byte("was expected, but...")
close(call.Out, "Oops!, failed")
}
}


For example, if a process received two requests, one with no reply expected and another with a reply expected, the picture would be:



Here, the two connections on the left represent requests that were received through the In channel depicted on top of the multiplexor.

The important thing to note is that processes may now issue streams of requests, or replies, through channels and they are fed to external pipes (or from them) as required. The interfaces shown have greatly simplified programming for (networked) system serviced being written for the new system.