Saturday, December 14, 2013

Go servers and clients using belts

In a previous post I described the new belt abstraction, something similar to a Go channel, but capable of propagating error information from producers to consumers and capable of notifying producers that the consumers are no longer interested on more data, so they could terminate gracefully.

In this post I describe an example server and client using belts.

First, the server.  This is the code from a a test of the net/srv package.
imsg := 0
smsg := ""
h := NewBeltConn(&imsg, &smsg)

// An echo server from inb to outb.
go func() {
inb := h.In
outb := h.Out
defer outb.Close()
for {
m, err := inb.Rcv()
if err == belt.ErrClosed {
break
}
if err != nil {
Printv("srv: err %v\n", err)
err = outb.Snd(err)
} else {
Printv("srv: msg %v\n", m)
err = outb.Snd(m)
}
if err != nil {
t.Fatalf("srv: send: %s", err)
}
}
}()

s := New("test", "8081", h)
if err := s.Serve(); err != nil {
t.Fatal(err)
}
Before looking at the different parts, just see how the echo server can simply keep on receiving from the input belt and can tell errors from the client apart from regular client data. In fact, we echo errors back to the client.
Furthermore, if at some point the server does not want to receive more data from the client, it would simply inb.Close()and that would cleanly stop the process reading from the underlying network connection and sending data to the input belt.

The function NewBeltConn is as follows:

func NewBeltConn(proto ...interface{}) BeltConn {
h := BeltConn{
In:  belt.New(),
Out: belt.New(),
}
h.In.SetProto(proto...)
h.Out.SetProto(proto...)
return h
}

It creates two belts, one for input and one for output. Furthermore, the concrete values given as arguments are used in SetProto to tell the belts which data types they should accept as valid conveyed values (besides errors, which are always valid).

The code 
s := New("test", "8081", h)
if err := s.Serve(); err != nil {
t.Fatal(err)
}
from the network server package creates a new server listening at the indicated port, named test. This server spawns two processes per client. One calls belt.PipeFrom to convey data from the network to the input server belt, and the other calls belt.PipeTo to convey messages sent to the output belt back to the network.

In the normal situation, when the client closes the connection, the PipeFrom process closes the input belt and the server loop notices, closing then the output belt, which leads to the server network connection (to the client) being closed.

In the abnormal situation that the server decides to stop, it may close its input belt (and not just its output belt) to cleanly stop the two auxiliary processes on the server side.

The nice thing is that the server code is exactly the same code that would be used if it were just echoing to the client within the same machine. Because errors are propagated along with data (a send on a channel won't fail, but a send on a network or a pipe might fail).

Going back to the client, it might be as follows. This one is also from the tests for the package.
// client: send an error and several int/string messages.
ch, err := DialBeltTCP("[::1]:8081", &imsg, &smsg)
if err != nil {
t.Fatal(err)
}
if err := ch.Out.Snd(errors.New("errmsg")); err != nil {
t.Fatalf("send: %s", err)
}
for i := 0; i < 10; i++ {
Printv("cli: send %v\n", i)

if i%2 == 0 {
if err := ch.Out.Snd("str"); err != nil {
t.Fatalf("send: %s", err)
}
continue
}
if err := ch.Out.Snd(i); err != nil {
t.Fatalf("send: %s", err)
}
}
ch.Out.Close()

The first line calls DialBeltTCP, which is a convenience function to dial a TCP address and link an input and output belt to the resulting connection. Like in the case of the server, it relies on a BeltConn and two processes using belt.PipeFrom and belt.PipeTo to relay between the belts and the connection.

The pointers to values following the dialled address specify the protocol for the belts, so they could check out which messages are sent as part of the protocol and could configure themselves to be able to marshal and un-marshal those. 

As expected, the client can send any of the types configured, plus error indications.
To receive replies, the client would simply call ch.In.Rcv, like shown here for sending.

The interesting bit is that,  considering the chain made out of "client sender", "pipe to network", "server pipe from network", server echo, "server pipe to network", "client pipe from network", "client receiver", if at any point one process decides to stop, it can close both its input and output and the entire chain is shutdown both to the left and to the right of the closing point, cleanly, without making the code more complex or adding extra channels to the mix.