Thursday, March 19, 2015

Limiting the number of concurrent processes

In some cases, a task can be easily split into multiple concurrent child tasks, but it is not a good idea to spawn each of of the child tasks as a concurrent process.

For example, some commands in Clive go through a tree to synchronise it, or to poll each node for changes, etc. At each node in the tree, it is clear that the children can be processed concurrently. This reduces the total latency for the entire process. However, doing it this way would place a high load on the server at the other end of the line, and also on the line itself, and it might not be a good idea.

In Go it is fairly trivial to place a limit on the maximum number of concurrent tasks kept but still exploit concurrency up to that limit. The technique shown here is not new and, in fact, has been in use for decades, even in C for Plan 9, and probably in many other languages and by many other authors. Nevertheless, I think this tiny bit of code might help those that never saw it before.

The idea is to send functions through a channel and keep a fixed set of workers at the other end of the channel. All of them receive a function, call it, and loop again. The number of workers places the limit on the number of concurrent tasks running at the same time.

This creates the pool to run tasks:

func NewPool(n int) *Pool {
p := &Pool{calls: make(chan call)}
for i := 0; i < n; i++ {
go p.worker(i)
}
return p
}

A worker is simply

func (p* Pool) worker(id int) {
for c := range p.calls {
c.fn()
c.donec <- true
}
}

And we can run functions using this method:

func (p *Pool) Go(donec chan bool, fn func()) chan bool {
if donec == nil {
donec = make(chan bool, 1)
}
p.calls <- call{fn: fn, donec: donec}
return donec
}

Now the thing is fairly easy to use. Go closures make the interface of the previous method powerful enough to do anything and accept/return any input/output values.

For example:

// Create a pool of 5 workers
p := NewPool(5) 
// Run 20 functions in the pool
dc := make(chan bool, 20)
for i := 0; i < 20; i++ {
p.Go(dc, func() {
dprintf("running this...\n")
})
}
// Wait until all of them are done
for i := 0; i < 20; i++ {
<-dc
}

The testing code in this package plots a nice diagram of the execution, as a side effect of testing.
This is one such plot, time on the X axis and each task on the Y axis. 

trace: abcdeAfBCDEghijFkGHIJlmnoKpLMNOqrstPQRST
+----+                                  
 +-----+                                
  +-----+                               
   +-----+                              
    +-----+                             
      +--------+                        
           +-----+                      
            +-----+                     
             +-----+                    
              +-----+                   
                +--------+              
                     +-----+            
                      +-----+           
                       +-----+          
                        +-----+         
                          +--------+    
                               +----+   
                                +----+  
                                 +----+ 
                                  +----+

The tests run fake jobs like this one:

func fakefn(r rune, t time.Duration) {
n := atomic.AddInt32(&ncalls, 1)
calls[n-1] = r
dprintf("fake call %c\n", r)
time.Sleep(t)
r = unicode.ToUpper(r)
n = atomic.AddInt32(&ncalls, 1)
calls[n-1] = r
}

They record a trace of the sequence of calls in a string like

abcdeAfBCDEghijFkGHIJlmnoKpLMNOqrstPQRST

where lower-case indicate the start of a task and upper-case its end.
Thus, checking that the scheduling is as expected for the test is easy as is producing a plot for fun.

func plot(trz string, ncalls int) {
c := 'a'
dprintf("trace: %s\n", trz)
ch := map[bool]string{true: "-", false: " "}
for i := 0; i < ncalls; i++ {
st := c+rune(i)
end := unicode.ToUpper(st)
sted := false
for _, ev := range trz {
switch ev {
case st, end:
dprintf("+")
sted = !sted
default:
dprintf(ch[sted])
}
}
dprintf("\n")
}
}