Tuesday, March 11, 2014

Channels, pipes, connections, and multiplexors

This post is the second in a series to describe how we adapted the CSP style of programming used in Go to the use of external pipes, files, network connections, and related artefacts.
This was the first post. In what follows we refer as "pipe" to any external file-descriptor-like artefact used to reach the external world (for input or output).


Connections
The nchan package defines a connection as

 type Conn struct {
  Tag string // debug
  In  <-chan []byte
  Out chan<- []byte
 }

This joins two channels to make a full-duplex connection. A process talking to an external entity relies on this structure to bridge the system pipe used to a pair of channels. There are utilies that leverage the func- tions described in the previous section and build a channel interface to external pipes, for example:

 func NewConn(rw io.ReadWriteCloser, nbuf int, win, wout chan bool) Conn

The function creates processes to feed and drain the connection channels from and to the external pipe. Fur- thermore, if rw supports closing only for reading or writing, a close on the input or output channels would close the respective halves of the pipe. Because of the message protocol explained in the previous section, errors are also propagated across the external pipe and the process using the connection can very much ignore that the source/sink of data is external.
It is easy to build pipes where the Out channel sends elements through the In channel:

        func NewPipe(nbuf int) Conn

And, using this, we can create in-memory connections that do not leave the process space:

 func NewConnPipe(nbuf int) (Conn, Conn)

This has been very useful during testing, because this connection can be created with no buffering and it is easier to spot dead-locks that involve both ends of the connection. Once the program is ready, we can replace the connection based pipe with an actual system provided pipe.

Multiplexors
Upon the channel based connections shown in the previous sections, the nchan package provides multiplex- ors.

 type Mux struct {
  In chan Conn
  ...
 }
 func NewMux(c Conn, iscaller bool) *Mux
 func (m *Mux) Close(err error)
 func (m *Mux) Out() chan<- []byte
 func (m *Mux) Rpc() (outc chan<- []byte, repc <-chan []byte)

A program speaking a protocol usually creates a new Conn connection by dialing or accepting connections and then creates a Mux by calling NewMux to multiplex the connection among multiple requests.



The nice thing of the multiplexed connection is that requests may carry a series of messages (and not just one message per request) and may or not have replies. Replies may also be a full series of messages. Both ends of a multiplexed connetion (the process using the mux and its peer at the other end of the pipe) may issue requests. Thus, this is not a client-server interaction model, although it may be used as such.
To issue new outgoing requests through the multiplexor, the process calls Out (to issue requests with no expected reply):

 oc := mux.Out()
 oc <- []byte("no reply")
 oc <- []byte("expected")
 close(oc)

Or the process may call Rpc (to issue requests with an expected reply).

 rc, rr := mux.Rpc()
 rc <- []byte("another")
 rc <- []byte("request")
 close(rc)
 for m := range rr {
  Printf("got %v as part of the reply\en", m)
 }
 Printf("and the final error status is %v\en", cerror(rr))

In the first case, the multiplexor returns a Conn to the caller with just the Out channel. Of course, this can be done multiple times to issue several concurrent outgoing requests:



In the figure, the two connections of the left were built by two calls to mux.Out(), which returns a Conn with an Out chan to issue requests. The process using the Out channel may issue as many messages as desired and then close the channel.
If the request depicted below requires a reply, mux.Rpc() is be called finstead of mux.Out() and the resulting picture is as shown.


The important part is that messages (and replies) sent as part of a request (or reply) may be streamed with- out affecting other requests and replies, other than by the usage of the underlying connection. That is, an idle stream does not block other streams.
The interface for the receiving part of the multiplexor is a single In channel that conveys one Conn per incoming request. The request has only the In channel if no reply is expected, and has both the In and Out channels set if a reply is expected.


To receive requests from the other end of the pipe, the code might look like this:

for call := range mux.In {
// call is a Conn
for m := range call.In {
Printf("got %v as part of the request\en", m)
}
if call.Out != nil {
call.Out <- []byte("a reply")
call.Out <- []byte("was expected, but...")
close(call.Out, "Oops!, failed")
}
}


For example, if a process received two requests, one with no reply expected and another with a reply expected, the picture would be:



Here, the two connections on the left represent requests that were received through the In channel depicted on top of the multiplexor.

The important thing to note is that processes may now issue streams of requests, or replies, through channels and they are fed to external pipes (or from them) as required. The interfaces shown have greatly simplified programming for (networked) system serviced being written for the new system.


Monday, March 10, 2014

Channels and pipes: close and errors

This post describes changes made to Go channels and tools built upon those to provide system and network-wide services for a new OS under construction. Further posts will follow and describe other related set of tools, but the post is already long enough. Yes, belts are gone, we use our modified channels directly now.

A channel is an artifact that can be used to send (typed) data through it. The Go language operations on channels include sending, receiving, and selection among a set of send and receive operations. Go permits also to close a channel after the last item has been sent.
On most systems, processes and applications talk through pipes, network connections, FIFOS, and related artifacts. In short, they are just file descriptors once open, permit the application to write data (for sending) and/or read data (for receiving). Some of these are duplex, but they can be considered to be a pair of devices (one for each direction). In what follows we will refer to all these artifacts as pipes (e.g., a net- work connection may be considered as a pair of simplex pipes).
There is a mismatch between channels and pipes and this paper shows what we did to try to bridge the gap between both abstractions for a new system. The aim is to let applications leverage the CSP style of programming while, at the same time, let them work across the network.
We assume that the reader is familiar with channels in the Go language, and describes only our modi- fications and additions.


Close and errors

When using pipes, each end of the pipe may close and the pipe implementation takes care of propagating the error to the other end. That is not the case with standard Go channels. Furthermore, upon errors, it is desirable for one end of the pipe to learn about the error that did happen at the other end.
We have modified the standard Go implementation to:
  1. 1  Accept an optional error argument to close.
  2. 2  Make the send operation return false when used on a closed channel (instead of panicing; the receive operation already behaves nicely the case of a closed channel).
  3. 3  Provide a primitive, cerror, that returns the error given when the channel was closed.
  4. 4  Make a close of an already closed channel a no-operation (instead of a panic).
With this modified tool in hand, it is feasible to write the following code:

var inc, outc chan[]byte
...
for data := range inc {
ndata := modify(data)
if ok := outc <-ndata; !ok {
close(inc, cerror(outc))
break
}
}
close(outc, cerror(inc))


Here, a process consumes data from inc and produces new data through outc for another one. The image to have in mind is


where the code shown corresponds to the middle process. Perhaps the first process terminates normally (or abnormally), in which case it would close inc. In this case, our code closes outc as expected. But this time, the error given by the first process is known to the second process, and it can even forward such error to the third one.
A more interesting case is when the third process decides to cease consuming data from outc and calls close. Now, our middle process will notice that ok becomes false when it tries to send more data, and can break its loop cleanly, closing also the input channel to singal to the first process that there is no point in producing further data. In this second example, the last call to close is a no-operation because the output channel was already closed, and we don’t need to add unnecessary code to prevent the call.
The important point is that termination of the data stream is easy to handle for the program without resorting to exceptions (or panics), and we know which one is the error, so we can take whatever measures are convenient in that case.


Channels and pipes
There are three big differences between channels and pipes (we are using pipe to refer to any ‘‘file descrip- tor’’ used to convey data, as stated before). One is that pipes may have errors when sending or receiving, but channels do not. Another one is that pipes carry ony streams of bytes and not separate messages. Yet another is that channels convey a data type but pipes convey just bytes.
The first difference is mostly dealt with the changes made to channels as described in the previous section. That is, channels may have errors while sending and or receiving, considered the changes made. Therefore, the code using a channel must consider errors in very much the same way it would do if using a pipe.
To address the third difference we are going to consider channels of byte arrays by now.
The second difference can be dealt with by ensuring that applications using channels to speak through a pipe preserve message boundaries within the pipe. With this in mind, a new nchan package pro- vides new channel tools to bridge the gap between the channel and the pipe domains.
The following function writes each message received from c into w as it arrives. If w preserves mes- sage boundaries, that is enough. The second function is its counterpart.

func WriteBytesTo(w io.Writer, c <-chan []byte) (int64, error)
func ReadBytesFrom(r io.Reader, c chan<- []byte) (int64, error)

 However, is most cases, the transport does not preserve message boundaries. Thus, the next function writes all messages received from c into w, but precedes each such write with a header indicating the message length. The second function can rely on this to read one message at a time and forward it to the given chan- nel.
  func WriteMsgsTo(w io.Writer, c <-chan []byte) (int64, error)
func ReadMsgsFrom(r io.Reader, c chan<- []byte) (int64, error)


One interesting feature of WriteMsgsTo and ReadMsgsFrom is that when the channel is closed, its error sta- tus is checked out and forwarded through the pipe. The other end notices that the message is an error indi- cation and closes the channel with said error.
Thus, code like the excerpt shown for our middle process in the stream of processes would work correctly even if the input channel comes from a pipe and not from a another process within the same program.

The the post in the series will be about channel connections and channel multiplexors.


Saturday, January 25, 2014

Tiny go debug tools

A recent post from Rob Pike in his blog reminded me that I didn't post about some tiny tools I use to debug programs by enabling traces and conditional prints. I think these were borrowed or inspired by those written by him or by others using Go time ago.

These are packaged in a tiny git.lsub.org/go.git/dbg.go package.

First, this is a well known idiom to trace function calls, only that I modified it to report also the file and line number in a convenient way.

It is used as in this excerpt

func ReadMsgsFrom(r io.Reader, c chan<- []byte) (int64, error) {
defer dbg.Trace(dbg.Call("ReadMsgsFrom"))
...

And produces messages like these ones:

bgn ReadMsgsFrom nchan.go:116
end ReadMsgsFrom nchan.go:116

The functions are like follows:

// For use as in defer dbg.Trace(Call("funcname"))
func Trace(s string) {
fmt.Printf("end %s\n", s)
}

// For use as in defer dbg.Trace(Call("funcname"))
func Call(s string) string {
if _, file, lno, ok := runtime.Caller(1); ok {
rel, _ := filepath.Rel(cwd, file)
s = fmt.Sprintf("%s %s:%d", s, rel, lno)
}
fmt.Printf("bgn %s\n", s)
return s
}

The second tool is a couple of functions that enable prints only if a flag is set or if another function says so. They are used like in

var Printf = dbg.FuncPrintf(os.Stdout, testing.Verbose)

func TestChanSend(t *testing.T) {
...
Printf("receiver %v\n", msg)
}

The variable name should reflect that it is a conditional print, but in this case I took the code from a testing file, which prints only if verbose is set during the test. The idea is that you can declare as many print functions (variables) you want to print conditionally when certain flags are enabled.

This is the code from the debug package

type PrintFunc func(fmts string, arg ...interface{}) (int, error)


/*
Return a function that calls fmt.Printf only if fn returns true.
To be used like in
var Printf = verb.PrintfFunc(testing.Verbose)
...
Printf(...)
 */
func FuncPrintf(w io.Writer, fn func()bool) PrintFunc {
return func(fmts string, arg ...interface{}) (int, error) {
if fn() {
return fmt.Fprintf(w, fmts, arg...)
}
return 0, nil
}
}

The function returns a function that prints, only that it prints only if the function given as an argument says so. Thus, when you declare your print function variable you can set the condition to trigger the print and the writer the print should use. The rest of the code is relieved from the burden of testing the condition or typing more to use a particular output stream.

There is a similar function (returning a conditional printing function) that takes a pointer to a boolean flag instead of a function, for those cases when checking a flag suffice.


Wednesday, January 22, 2014

New Go channels at lsub

After some experience with belts (see a previous post), we have changed the go language to provide the functionality we were missing from their channel abstraction. We refer to the modified environment as Lsub Go.

NB: I'm editing this post to reflect the new interface used after some discussion and reading the comments. 

The modified compiler and runtime can be retrieved with

git clone git://git.lsub.org/golang.git

This required changes to both the compiler and the run-time.
We also tried to keep the core language as undisturbed as feasible yet be able to track the changes made.

  • The builtin close can be called now also on a chan<- data type (a channel to send things through). This won't cause a panic as it did anymore.
  • A variant, close(chan, interface{})  has been added to both close a channel (for sending or receiving) and report the cause for the closing. close(c) is equivalent to cclose(c, nil)
  • A new builtin cerror(chan) error has been added to report the cause for a closed channel (or  nil or is not closed or was closed without indicating the reason)
  • The primitive send operation now returns a bool value indicating if the send could be made or not (the channel might be closed). Thus, ok := c <- 0 now indicates if we could send or the channel was closed and we couldn't send anymore. 
Not directly related to this change, but because we already changed the language and because we missed this construct a lot...

  • A new doselect construct has been added, which is equivalent to a for loop with a single select construct as the body. It is a looping select.
The doselect construct is very convenient. Many times a process would just loop serving a set of channels, and

  1. There is no actual reason for having to indent twice (the loop and the select)
  2. It would be desirable to be able to break the service loop directly or to continue with the next request.
For example:

doselect {
case x, ok := <-c1:
if !ok {
...
break
}
...
case x, ok := <-c2:
...
if ... {
    continue
}
default:
...
}

Here, the break does break the entire loop and there is no need to add a label to the loop implied by the construct.

Also, continue would continue with the next request selected in the loop.

But this is just a convenience, and not really a change that would make a difference.

Now, the important change is to be able to

for x := range inc {
dosomethingto(x)
if ok := outc <- x; !ok {
close(inc, "not interested in your stream")
break
}
}

In this example, we loop receiving elements from an input channel,  inc, and after some processing, send the output to another process through outc. Now, if somehow that process is no longer interested in the stream we are piping to it, there is no point in
  • forcing it to drain the entire stream, or
  • forcing our interface to have another different channel to report the error, or
  • ignoring the sender and leaving it forever blocked trying to send
Instead, the receiver of outc might just close(outc) to indicate to any sender that nobody will ever be interested in more elements sent through there. 
After that, our attempt to send would return false (recorded in ok in this example), and we can check that value to stop sending.
Furthermore, because we are sending data received from an input channel, inc,  we can also tell the sender feeding us data that we are done, and also use close to report why.

Thus, an entire pipeline of processes can cleanly shut down and stop when it is necessary to do so. The same could have been done recovering from panics, but this is not really a panic, it is not really that different from detecting EOF when reading and should be processed by normal error checking code and not by panic/recover tricks. Moreover, such tricks would be inconvenient if the function is doing more than just sending through the channel, because a recover does not continue with the execution of the panicking function.

To continue with the motivating example, this code could report to the sender the possible cause that did lead to the closing of outc, by doing something like:
close(inc, cerror(outc))

Unlike before, this time the actual cause is reported through the pipeline of processes involved and anyone might now issue a reasonable error diagnostic more useful than "the channel was closed".

The modified compiler and runtime can be retrieved with

git clone git://git.lsub.org/golang.git

The repository there contains a full go distribution as retrieved from the Go mercurial, modified a few times to make these changes. The compiler packages from the standard library (used by go fmt among other things) are also updated to reflect the new changes.

To reduce the changes required, we did not modify the select cases to let you send with the
ok := outc <- x construct. Thus, within select you can only try to send and it would just fail to send on a closed channel, but would not panic nor would it break your construct. You should probably check in the code that the send could be done if you have to do so. We might modify further the Lsub version of go to let select handle this new case, but haven't done it yet.

Saturday, December 14, 2013

Go servers and clients using belts

In a previous post I described the new belt abstraction, something similar to a Go channel, but capable of propagating error information from producers to consumers and capable of notifying producers that the consumers are no longer interested on more data, so they could terminate gracefully.

In this post I describe an example server and client using belts.

First, the server.  This is the code from a a test of the net/srv package.
imsg := 0
smsg := ""
h := NewBeltConn(&imsg, &smsg)

// An echo server from inb to outb.
go func() {
inb := h.In
outb := h.Out
defer outb.Close()
for {
m, err := inb.Rcv()
if err == belt.ErrClosed {
break
}
if err != nil {
Printv("srv: err %v\n", err)
err = outb.Snd(err)
} else {
Printv("srv: msg %v\n", m)
err = outb.Snd(m)
}
if err != nil {
t.Fatalf("srv: send: %s", err)
}
}
}()

s := New("test", "8081", h)
if err := s.Serve(); err != nil {
t.Fatal(err)
}
Before looking at the different parts, just see how the echo server can simply keep on receiving from the input belt and can tell errors from the client apart from regular client data. In fact, we echo errors back to the client.
Furthermore, if at some point the server does not want to receive more data from the client, it would simply inb.Close()and that would cleanly stop the process reading from the underlying network connection and sending data to the input belt.

The function NewBeltConn is as follows:

func NewBeltConn(proto ...interface{}) BeltConn {
h := BeltConn{
In:  belt.New(),
Out: belt.New(),
}
h.In.SetProto(proto...)
h.Out.SetProto(proto...)
return h
}

It creates two belts, one for input and one for output. Furthermore, the concrete values given as arguments are used in SetProto to tell the belts which data types they should accept as valid conveyed values (besides errors, which are always valid).

The code 
s := New("test", "8081", h)
if err := s.Serve(); err != nil {
t.Fatal(err)
}
from the network server package creates a new server listening at the indicated port, named test. This server spawns two processes per client. One calls belt.PipeFrom to convey data from the network to the input server belt, and the other calls belt.PipeTo to convey messages sent to the output belt back to the network.

In the normal situation, when the client closes the connection, the PipeFrom process closes the input belt and the server loop notices, closing then the output belt, which leads to the server network connection (to the client) being closed.

In the abnormal situation that the server decides to stop, it may close its input belt (and not just its output belt) to cleanly stop the two auxiliary processes on the server side.

The nice thing is that the server code is exactly the same code that would be used if it were just echoing to the client within the same machine. Because errors are propagated along with data (a send on a channel won't fail, but a send on a network or a pipe might fail).

Going back to the client, it might be as follows. This one is also from the tests for the package.
// client: send an error and several int/string messages.
ch, err := DialBeltTCP("[::1]:8081", &imsg, &smsg)
if err != nil {
t.Fatal(err)
}
if err := ch.Out.Snd(errors.New("errmsg")); err != nil {
t.Fatalf("send: %s", err)
}
for i := 0; i < 10; i++ {
Printv("cli: send %v\n", i)

if i%2 == 0 {
if err := ch.Out.Snd("str"); err != nil {
t.Fatalf("send: %s", err)
}
continue
}
if err := ch.Out.Snd(i); err != nil {
t.Fatalf("send: %s", err)
}
}
ch.Out.Close()

The first line calls DialBeltTCP, which is a convenience function to dial a TCP address and link an input and output belt to the resulting connection. Like in the case of the server, it relies on a BeltConn and two processes using belt.PipeFrom and belt.PipeTo to relay between the belts and the connection.

The pointers to values following the dialled address specify the protocol for the belts, so they could check out which messages are sent as part of the protocol and could configure themselves to be able to marshal and un-marshal those. 

As expected, the client can send any of the types configured, plus error indications.
To receive replies, the client would simply call ch.In.Rcv, like shown here for sending.

The interesting bit is that,  considering the chain made out of "client sender", "pipe to network", "server pipe from network", server echo, "server pipe to network", "client pipe from network", "client receiver", if at any point one process decides to stop, it can close both its input and output and the entire chain is shutdown both to the left and to the right of the closing point, cleanly, without making the code more complex or adding extra channels to the mix.



Friday, December 13, 2013

More on Channels and belts

This is a second cut at channels done right, after a nice idea from Roger.
This time I prefer to link to a pdf version of the draft for the paper describing them,
so it is easier to read.

The source may be retrieved (as other lsub go public source) by cloning or go getting from git.lsub.org/go.git. (No web server there, only a git server).

Wednesday, December 11, 2013

Channels done right

This is from a early draft for some work we are doing on belts, a new abstraction for
a new system inheriting from nix and go.

[It seems the blogger editor mangled things a little bit when I pasted the text, which is
a shame. I'll post again when there's a TR and when the code is made public, so I simply
adjusted things a little bit by hand; sorry.]

Francisco J. Ballesteros
ABSTRACT
Channels in the style of CSP are a powerful abstraction. The Go language includes them, inheriting much from earlier languages from Plan 9 and Research UNIX. But, to use them as a system abstraction it is necessary to be able to send in-band errors along with data and to stop senders when receivers are no longer interested in the data being sent. This paper describes Belts, a new channel abstraction that includes such features.
Channels
The Go language includes channels as a builtin type. A channel is includes in its type the data type for ele- ments sent through it. For example,
var c chan int
declares a channel to send int values. They are created using make in either unbuffered or buffered variants. For example, this declares and creates a couple of channels:
             unbufc := make(chan int)
             bufc := make(chan int, 10)
Receiving from a channel blocks until an element can be received. Sending on an unbuffered channel blocks until another process receives from it. Sending on a buffered channel blocks only when the buffer is full. One operator is used to send or receive, depending on which side of the channel it is written. For example:
bufc <- 0
x := <-bufc
unbufc <- 0
// send 0, does not block
// receive 0, copied to x.
// send 0, blocks (no proc receiving)
Here the first two sentences proceed without blocking, because the message is buffered in the channel. The last blocks because nobody is receiving in this example.
There is a construct to select one among multiple send or receive operations. When no operation may proceed, the construct blocks. One some may proceed, one is executed at random and the construct termi- nates. For example:
             select {
             case c1 <- 3:
                     // send 3 as soon as we can send to c1
             case c2 <- 5:
                     // send 5 as soon as we can send to c2
             case x := c3:
                     // receive x from c3 as soon as we can.
}
This constructs admits a default label to execute when no send or receive operation may be used, which leads to non-blocking variants of send and receive. In this example,

             x := 0
             select {
             case x = <-c:
                     // receive x from c
             default:
                     // didn’t receive, and didn’t block
}
we pretend that we received a zero if we couldn’t receive anything.
Another feature of channels is that we may close a channel, to signal the receiver that no further data will be sent on it. In this example we send two values and close the channel, and later (or concurrently in another process) we receive from the channel until we note it is closed:
             // send something and close it.
             c <- 1
             c <- 2
             close(c)
             // receive from c until closed or a zero value is sent.
             for <-c != 0 {
             }
Once closed, a receive returns a zero value without blocking. But we can also check if the channel was closed or if a zero value was just sent:
             if x, ok := <-c; !ok {
                     // c was closed
}
In this case, ok is set to false if we couldn’t receive becase the channel was closed. Usually, to loop receiv- ing the range operator is used instead:
             for x := range c {
                     // use x as received from c
}
It is aware of close and behaves nicely when the sender is done.
There are more features documented in the Go Programming Language Specification, but what has

been said is enough to understand our motivation and the discussion that follows.
Going problems
There are problems with the behavior of channels as described in the previous section.
One problen is that, to use channels as the primary structure used to glue different processes in one application, we should be able to stop the sender when the receiver is no longer interested in data being sent.
It has been argued that a second channel can be used to convey a termination note from the receiver of data to the producer. However, this complicates the interfaces between the elements involved. If one process is a data producer and another is a consumer, we should be able to connect them in very much the same way we do with pipes:

producer | consumer
or in this case
             c := make(chan data)
             // start a producer
             go produce(c)
             // start a consumer
             go consume(c)
The code for the producer and the consumer should be as simple as follows:
             func produce(c chan data) {
                     for {
                             x := make a new item
                             if c <- x failed {
                                   break
                             }
                     }
             }
             func consume(c chan data) {
                     for x := range c {
                             // use x
                             if don’t want to use more {
                                     tell c we are done
                                     break
                              } 
                     }
             }
The argument against notifying the sender about the stop of the receiver is that a channel should convely data only from the sender to the receiver. But, to cleanly terminate the sender if the receiver decides to stop we have to complicate the code and do one of three things:
  1. 1  Use a second channel to notify the sender that the receiver is done.
  2. 2  Spawn a new process (or use the receiver process) to consume the rest of the stream of data without actually processing it.
  3. 3  Let the sender block forever and forget about it.
The last two cases are a waste of resources, and thus should not be used in practice. In the first case, the code would be more complex:
where
c := make(chan data)
endc := make(chan bool)
// start a producer
go produce(c, endc)
// start a consumer
go consume(c, endc)

where 

func produce(c chan data, endc chan bool) {
        for {
             x := make a new item
             select {
             case c <- x:
             case <-endc:
                return 
             }
        }
}

and

func consume(c chan data, endc chan bool) {
        for {
               x, ok := <-c:
               if !ok {
                     break
                }
                // use x
                if don’t want more {
                        close(endc)
                        break
                 } 
        }
}

Considering this code, the connection between then sender and the receiver is now two-ways. As it would be if we could use the channel to indicate that we are done receiving, so the argument against a backward flow of information does not seem sound at this point. We are still sending information backward, but, the code is more complex and what would be a single abstraction for message passing is now two separate structures.
But there are more problems. Another important one is that if the sender fails to produce an item, the error indication is lost and can’t be send to the receiver. The receiver should have a way to know that the sender had a problem, perhaps to propagate the error to others interested in the result.
To achieve this, we must further complicate the scheme to use data structure that packs either data or an error indication, and then complicate the receiver code to unpack it. Or we must use a separate error channel to convey error messages, which is even more complex.
If should be easy to let the producer notify an error to the consumer and terminate, and then let the consumer notice at each reception if it was an error or a regular data message.
Belts: problems are gone
To fix this issues, a new abstraction, belt channels has been built. By now, we have not modified the lan- guage to include it, but written a package providing a data type for the new abstraction. A belt is defined as
     // An typed belt channel.
     type Chan struct {
             Donec chan bool        // notify the sender that the receiver is done.
             Datac chan interface{} // send data or errors.
             /* other unexported fields */
}
The Datac conveys data and the Donec conveys receiver-close indications to senders. We left these fields public to let clients use belts in select constructs using more than one channel or belt but in the future it is likely that all this will be hidden.
A belt can be used to send data or errors:
     func (b *Chan) Snd(d interface{}) error
Here, d would be the desired data type to be sent or an error indication. Unlike with channels, if the receiver is done, the send operation returns an error to the caller, and it decides what to do next. Perhaps stop.
The receive operation tells errors apart from data:
     func (b *Chan) Rcv() (interface{}, error)
If the sender is done, an error indication is returned. In the same way, if the sender posts an error through the belt the receiver will get an error indication instead of data. Otherwise, a piece of data sent is received. The following operations can be used to close a belt for sending or for receiving:
     func (b *Chan) CloseSnd()
     func (b *Chan) CloseRcv()
The constructor functions creates either a buffered or an unbuffered belt as it could be expected. 
     func New() *Chan
     func NewBuffered(nbuf int) *Chan
Another interesting feature enabled is that belts are polymorphic, unlike channels. Any type can be sent. The language and its type assertions and reflection can be use to keep type safety, but multiple different data items can be sent easily.
A protocol can be defined in a belt so that only certain message types (plus error indications) are accepted (and other messages are rejected with error when trying to be sent). To define a protocol, the next operation accepts an array (a slice actually) of example message values, each one being a pointer to a mes- sage instance.
     func (b *Chan) SetProto(msgptrs ...interface{})
In this case, the reflection interface in the language is used to accept or reject messages to be sent through the belt. This makes it easy to define protocols made by different message types without having to define facades for the set of messages.
There are wrappers that adapt belts to traditional reader and writer interfaces, so it would be easy to write to a belt, read from it, or write a belt to a writer (to copy the data streamed to an external writer).
Futher wrappers know how to pipe a belt to an external connection (a writer) and how to pipe a belt from an external connection (a reader). In this case, the connection carries messages encoded as Gobs that carry any of the types defined in the belt protocol or an error indication. This permits one belt to be con- veyed through a network connection or through a system file or pipe.
It would be hard to do this with standard channels, because of the problems mentioned. However, it is easy to maintain reasonable semantics that work fine in practice when writing clients and servers that use belts to and from the network. Each network connection requires two different belts if it is to be duplex.
Example
As an example, this is how our example producer and consumer processes might be written:
func produce(c *belt.Chan) {
     for {
        // make a new x
        if err := c.Snd(x); err != nil {
                // couldn’t send. done.
                break
        }
        // or to send an error...
        c.Send(err)
      }
      c.CloseSnd()
}


func consume(c *belt.Chan) {
        for { 
            x, err := c.Rcv()
            if err != nil {
                  // couldn’t receive. done.
                  break
            }
            // use x
            if don’t want more {
                  c.CloseRcv()
            } 
        }
}

The result is quite similar to that of a UNIX Pipe, although belts do not kill the sender process when the
receiver is gone (because they might have to terminate cleanly or might decide to do other things).
Early evaluation
The next is the output from the package benchmarks, which try to send 512 byte slices by different mecha- nisms.


BenchmarkChan   20000000
BenchmarkAlt     5000000
BenchmarkTSend   5000000
BenchmarkTWrite  5000000
BenchmarkTRead   1000000
BenchmarkPipe    1000000
 134 ns/op
 345 ns/op
 418 ns/op
 723 ns/op
1394 ns/op
2476 ns/op
The first one uses a native channel without being able to stop the sender. That is the fastest. The second uses a select construct to let the receiver stop the sender. The third one uses a belt, and is not too slow when compared to the second one. We have still to optimize the code, but it is fast enough to be used in practice as it stands, compared to using a select directly. The extra time taken is probably due to the use of reflec- tion.
The last three ones report the performance when using adaptors to write on the belt, or to read from the belt, and the performance of a standard Pipe as implemented by go as a reference for this case.
Future work
We will optimize and fine tune the interfaces for the belts in the near future, and will probably experiment by modifying the language to make belts first-class citizens, so they can be used like channels in select and other constructs. We will also experiment with belts used for network communication and conduct further evaluation for them.