Goblins for number theory, part 4

Client and server Goblins

After having introduced the basic concepts of Goblins, and in particular promises; after having looked at parallelisation over the network; and after an excursion to persistence, it is now time to get to the main topic. We would like more flexibility, as well in the behaviour of the clients as in the nature of the tasks they are handling and the control flow in the server. Clients should be able to come and go, maybe complete only one task never to be seen again (we will not handle the case of faults, however, that is, clients accepting a task and disappearing before completing it). Tasks could be heterogeneous, that is, take more or less time, or, equivalently, the clients could run on heterogeneous machines, and it would be nice to give out a new task to a client as soon as it finishes the previous one. And we would like the server to be able to work in rounds; in essence, distribute unrelated tasks corresponding to a loop, then gather the results and start with the next loop.

After David Thompson and Jessica Tallon had a look at my first solution, which had performance problems I could not explain, they came up with a much better idea, so I will present their solution without losing time in explaining what went wrong. Suffice it to say that one should avoid nesting with-vat expressions. In my experience, doing so with the same vat leads to a deadlock; doing so with different vats seems to work, but cause a severe performance penalty.

Queueing up clients, tasks and other promises

Our current solution already keeps a list of clients at the server, to which clients can register in the background. Instead of waiting until a fixed number of clients have arrived, we should be more dynamic and implement the server as follows. As long as there are tasks to be submitted and the client list is not empty, the server removes a client from the client list and submits a task to this client. If the client list is empty while there are still unsubmitted tasks, the server waits until a new client registers. So far, this scheme uses each client for exactly one task, and works if more clients register than there are tasks. The trick is to let the client do its computation, and at the end register itself again with the server as being available for the next task. My first solution used the existing ^registry actor, added an 'unregister method used by the server to retrieve an available client, and let the client call the 'register method after finishing a task. The problem with this straightforward approach is that one needs to have the server wait when no client is available, and this risks stalling everything. In a sense, we are back to the problem discussed in the first post: Goblins work with promises, and waiting for their resolution is not a Goblins concept; one should not try to master the time and write code to be executed at a specific moment, but rather define call-backs that are run when promises become true.

Jessica and David pointed out to me that since version 0.14 of Goblins, a suitable module is available in the actor library: the inbox, which is modelled after a post box that queues messages and delivers them one by one on request. Actually it rather delivers parcels, since it is a general first in, first out queue that can be filled with anything. We will replace the current clients list in a ^cell actor by an inbox that will contain client actors. The crucial difference with my home brew solution is that the level of an inbox can go below zero without blocking: If there are no elements in the queue, it nevertheless returns a promise to a future element, in our case a client actor that will register later. Thanks to promise pipelining, we can pretend that this empty promise is actually an actor and send messages to it using <-. Once a new actor registers, the promise fulfills itself, and the new actor will receive the message sent previously and act on it.

The essential modifications occur in the ^worker type actor in the client script:

(define-actor (^worker bcom server) #:self self
  (methods
    ((square x)
     (let ((res (* x x)))
          (format #t "square ~a\n" x)
          (sleep 3)
          (<- server self)
          res))
    ((finish)
     (signal-condition! end))))

After computing the result (and sleeping a little bit for testing purposes, since the tasks are so short that otherwise one client would end up grabbing all of them before we have a chance to start a second one), the client sends itself back to the server. To this purpose, it needs to know the server, which can be passed to it upon spawning; and it needs to have a notion of itself. This is why we have replaced the define by the more general define-actor, in which the optional #:self self defines a formal parameter self to later… speak to oneself! Notice that we have also modified the client so that as in the first blog posts, it does not register with its name (which thus is not passed on the command line either anymore): Using an inbox instead of the custom registry function implies that we would need to encapsulate the client actor into a composite data structure together with its name (for instance, SRFI-9 records), which is more hassle than warranted for our experimatal code. To make up for it, we let the client itself print the computing tasks it receives. The complete client script, after some shuffling around so that things are defined in the correct order, looks like this:

(use-modules (srfi srfi-1)
             (fibers conditions)
             (goblins)
             (goblins actor-lib methods)
             (goblins ocapn ids)
             (goblins ocapn captp)
             (goblins ocapn netlayer tcp-tls))

(define vat (spawn-vat))
(define net (spawn-vat))
(define end (make-condition))

(define-actor (^worker bcom server) #:self self
  (methods
    ((square x)
     (let ((res (* x x)))
          (format #t "square ~a\n" x)
          (sleep 3)
          (<- server self)
          res))
    ((finish)
     (signal-condition! end))))

(define capn
  (with-vat net (spawn-mycapn (spawn ^tcp-tls-netlayer "localhost"))))

(define server
  (with-vat vat
    (<- capn 'enliven (string->ocapn-id (second (command-line))))))

(define client
  (with-vat vat (spawn ^worker server)))
(with-vat net ($ capn 'register client 'tcp-tls))

(with-vat vat
  (<- server client))

(wait end)

In the server, we essentially replace the custom ^registry by an inbox, which results in the following code:

(use-modules (srfi srfi-1)
             (srfi srfi-26)
             (goblins)
             (goblins actor-lib cell)
             (goblins actor-lib inbox)
             (goblins actor-lib joiners)
             (goblins ocapn ids)
             (goblins ocapn captp)
             (goblins ocapn netlayer tcp-tls)
             (goblins persistence-store syrup)
             (goblins vat))

(define persistence-vat (spawn-vat))
(define persistence-registry
  (with-vat persistence-vat
    (spawn ^persistence-registry)))

(define-values (net capn)
  (spawn-persistent-vat
    (make-persistence-env #:extends (list captp-env tcp-tls-netlayer-env))
    (lambda ()
      (spawn-mycapn (spawn ^tcp-tls-netlayer "localhost")))
    (make-syrup-store "ocapn.syrup")
    #:persistence-registry persistence-registry))

(define (print-id prefix id)
  (with-vat net
    (on id
      (lambda (sref)
        (format #t "~a ~a\n"
                   prefix (ocapn-id->string sref))))))

(define-values (vat get-client put-client stop-clients)
  (spawn-persistent-vat
    (make-persistence-env
      #:extends inbox-env)
    (lambda ()
      (spawn-inbox))
    (make-syrup-store "registry.syrup")
    #:persist-on #f
    #:persistence-registry persistence-registry))

(let ((id (with-vat net ($ capn 'register put-client 'tcp-tls))))
  (print-id "Server ID" id))

(define all-clients (with-vat vat (spawn ^cell '())))
(define v '(1 2 3 4 5))
(with-vat vat
  (let
    ((clients (map (lambda (x) (<- get-client)) v)))
    ($ all-clients (append clients ($ all-clients)))
    (on (all-of* (map (cut <- <> 'square <>) clients v))
      (lambda (res)
        (format #t "~a\n" (sqrt (fold + 0 res)))
        (on (all-of* ($ all-clients))
          (lambda (c)
            (map (cut <- <> 'finish) (delete-duplicates c))))))))

(sleep 3600)

The spawn-inbox function does not return one actor, but actually three at the same time: one for adding elements into the queue, one for retrieving an element (or a promise thereof), and one for shutting the inbox down (which we will not use). The expression

(map (lambda (x) (<- get-client)) v)

creates a list of (promises to) client actors that is as long as the size of the vector. Then we send the tasks as before and use all-of* to wait for their results. There is a little subtlety for sending the 'finish messages: Since the variable clients in general does not contain a list of client actors any more, but a list of promises, we also need to use (on (all-of* …)) to retrieve the actual list of actors. We go further by memorising all clients ever encountered (with multiplicities, actually) in a separate cell all-clients. This is a bit convoluted at this point (since at the end of the script, ($ all-clients) is the same as clients), but will make things easier later. Without any extra code for sending the 'finish signal to the clients, the main part of the server script could be condensed into only a few lines:

(define v '(1 2 3 4 5))
(with-vat vat
  (on (all-of* (map (lambda (x) (<- (<- get-client) 'square x)) v))
    (lambda (res)
      (format #t "~a\n" (sqrt (fold + 0 res))))))

Notice that this solution is strictly more general than that of the previous posts: If only one client registers, it runs all the squaring tasks; if a second one arrives, it obtains every other task; and so on. And… that's it! We have parallelised a for loop which may contain tasks of differing (and a priori unknown) lengths, and it can handle the situation where clients join at any time. To handle clients that may leave after a task is completed, the framework is essentially there: Instead of having the client register again after each computing task, this could be made dependent on a condition to be checked in the client. For handling faults, that is, clients which disappear in the middle of a task, one would need to add timeouts at the server level and requeue tasks for which the result has not appeared after a reasonable waiting time, which would depend on the application. Then all-of* would not be a suitable joiner, but one could use race, which resolves as soon as one of several promises resolves. Or one could use all-of* on a list of promises created by race from a computation promise and a timeout promise, as given precisely as an example in the documentation of race. We will not pursue the topic of faults in this post, but it is clear that Goblins mechanisms could be used to solve the problem.

In any case, our current Goblins code is already more flexible than the MPI solution, which assumes that all clients are known at the beginning of the computation and do not change throughout, and which also breaks in the presence of faults.

Time for crochet: loops after loops!

A common situation is that after running one loop, one needs to start a second round that continues the computations with the intermediate results that have just been obtained. In what follows, we will modify the server script accordingly, while keeping the client script unmodified, which may be seen as a sign that the architecture developed so far makes sense.

For instance, the following sequential code computes the L4-norm of a vector, that is, the fourth root of the sum of the fourth powers of its entries:

(use-modules (srfi srfi-1))
(define (square x) (* x x))
(define v '(1 2 3 4 5))
(define w (map square v))
(define res (map square w))
(format #t "~a\n" (sqrt (sqrt (fold + 0 res))))

It consists of two loops, one for squaring each entry in v and putting the results into w, and a second one for squaring the entries in w (which effectively computes the fourth powers of the entries in v).

This can be goblinified quite naturally by nesting the task submission and (on (all-of* …)) handling of the results. Without 'finish signals, this results in the following code:

(define v '(1 2 3 4 5))
(with-vat vat
  (on (all-of* (map (lambda (x) (<- (<- get-client) 'square x)) v))
    (lambda (w)
      (on (all-of* (map (lambda (x) (<- (<- get-client) 'square x)) w))
        (lambda (res)
          (format #t "~a\n" (sqrt (sqrt (fold + 0 res)))))))))

Including 'finish handling, the following server script is not the shortest solution, but its symmetries will be helpful in the next section. To make the code more readable, we have moved some (repetitive) code into the functions submit-square-jobs and submit-finish-jobs.

(use-modules (srfi srfi-1)
             (srfi srfi-26)
             (goblins)
             (goblins actor-lib cell)
             (goblins actor-lib inbox)
             (goblins actor-lib joiners)
             (goblins ocapn ids)
             (goblins ocapn captp)
             (goblins ocapn netlayer tcp-tls)
             (goblins persistence-store syrup)
             (goblins vat))

(define persistence-vat (spawn-vat))
(define persistence-registry
  (with-vat persistence-vat
    (spawn ^persistence-registry)))

(define-values (net capn)
  (spawn-persistent-vat
    (make-persistence-env #:extends (list captp-env tcp-tls-netlayer-env))
    (lambda ()
      (spawn-mycapn (spawn ^tcp-tls-netlayer "localhost")))
    (make-syrup-store "ocapn.syrup")
    #:persistence-registry persistence-registry))

(define (print-id prefix id)
  (with-vat net
    (on id
      (lambda (sref)
        (format #t "~a ~a\n"
                   prefix (ocapn-id->string sref))))))

(define-values (vat get-client put-client stop-clients)
  (spawn-persistent-vat
    (make-persistence-env
      #:extends inbox-env)
    (lambda ()
      (spawn-inbox))
    (make-syrup-store "registry.syrup")
    #:persist-on #f
    #:persistence-registry persistence-registry))

(let ((id (with-vat net ($ capn 'register put-client 'tcp-tls))))
  (print-id "Server ID" id))

(define all-clients (with-vat vat (spawn ^cell '())))

(define (submit-square-jobs v)
  (let ((clients (map (lambda (x) (<- get-client)) v)))
    ($ all-clients (append clients ($ all-clients)))
    (map (cut <- <> 'square <>) clients v)))

(define (submit-finish-jobs clients)
  (map (cut <- <> 'finish) (delete-duplicates clients)))

(define v '(1 2 3 4 5))
(with-vat vat
  (on (all-of* (submit-square-jobs  v))
    (lambda (w)
      (on (all-of* (submit-square-jobs w))
        (lambda (res)
          (format #t "~a\n" (sqrt (sqrt (fold + 0 res))))
          (on (all-of* ($ all-clients))
            submit-finish-jobs))))))

(sleep 3600)

Untangling the threads: macros to the rescue

The last block of the server code now clearly shows a recurring pattern:

(on (all-of* SUBMIT SOME JOBS)
  (lambda (VAR)
    DO SOMETHING WITH THE RESULT IN VAR

which is actually nested, since handling the results of the first round requires to run the same pattern for the second round of job submissions. Now a pattern can be handled by a Guile macro, for instance as follows:

(define-syntax submit-reduce
  (syntax-rules ()
    ((submit-reduce submit v reduce ...)
     (on (all-of* submit)
       (lambda (v)
         (begin reduce ...))))))

The line following the syntax-rules () contains a pattern to be matched; the remainder of the macro is the Guile code above, with placeholders replaced by parts of the matched pattern. The first argument of the macro is a single expression corresponding to SUBMIT SOME JOBS; if several expressions are needed, they can be transformed into only one using let*, for instance. The second argument is the (formal) variable name VAR. All remaining arguments (of which there may be zero) correspond to DO SOMETHING WITH THE RESULT IN VAR; these will in general use the formal variable.

Using this macro, the main block of the server script can be compressed as follows:

(define v '(1 2 3 4 5))
(with-vat vat
  (submit-reduce (submit-square-jobs  v) w
    (submit-reduce (submit-square-jobs w) res
      (format #t "~a\n" (sqrt (sqrt (fold + 0 res))))
      (on (all-of* ($ all-clients))
        submit-finish-jobs))))

It is also possible to let the macro itself handle the nesting as follows:

(define-syntax submit-reduce
  (syntax-rules ()
    ((submit-reduce reduce)
     reduce)
    ((submit-reduce submit v reduce ...)
     (on (all-of* submit)
       (lambda (v)
         (submit-reduce reduce ...))))))

If the macro is called with at least three arguments, then the second pattern (submit-reduce submit v reduce ...) is matched. The first argument (a single expression) is considered to be the job submission phase, the second argument the variable name for the results of the first jobs; then the macro is called recursively, and more job submission phases, alternated with variable names, are expected; in the end, when only one argument remains, the first pattern (submit-reduce reduce) is matched, which corresponds to the handling of the results of the final round of job submissions. So to work, the macro requires an odd number of arguments, otherwise it raises an error (using just one argument is possible, but makes no sense). With this macro, the main server block looks as follows:

(define v '(1 2 3 4 5))
(with-vat vat
  (submit-reduce
    (submit-square-jobs v) w
    (submit-square-jobs w) res
    (begin
      (format #t "~a\n" (sqrt (sqrt (fold + 0 res))))
      (on (all-of* ($ all-clients))
        submit-finish-jobs))))

Notice that we needed to wrap the final reduction into (begin …) since it consists of several expressions. This macro, without its additional nesting, makes the sequence of submitting a series of tasks, submitting a new series of tasks depending on the results of the previous series, and so on, until the final result is handled through a side effect (to break out of the promises), quite clear. When exactly three arguments are given, both macros are equivalent, so that it is still possible to manually nest the macro invocations, and the second macro is more powerful than the first one (except that the first one admits several Guile expressions for the reduction phase).

To illustrate the simplicity with which the pattern continues, here is the complete server script for computing the L8-norm of a vector, that is, the eightth root of the sum of the eigtth powers of its entries:

(use-modules (srfi srfi-1)
             (srfi srfi-26)
             (goblins)
             (goblins actor-lib cell)
             (goblins actor-lib inbox)
             (goblins actor-lib joiners)
             (goblins ocapn ids)
             (goblins ocapn captp)
             (goblins ocapn netlayer tcp-tls)
             (goblins persistence-store syrup)
             (goblins vat))

(define persistence-vat (spawn-vat))
(define persistence-registry
  (with-vat persistence-vat
    (spawn ^persistence-registry)))

(define-values (net capn)
  (spawn-persistent-vat
    (make-persistence-env #:extends (list captp-env tcp-tls-netlayer-env))
    (lambda ()
      (spawn-mycapn (spawn ^tcp-tls-netlayer "localhost")))
    (make-syrup-store "ocapn.syrup")
    #:persistence-registry persistence-registry))

(define (print-id prefix id)
  (with-vat net
    (on id
      (lambda (sref)
        (format #t "~a ~a\n"
                   prefix (ocapn-id->string sref))))))

(define-values (vat get-client put-client stop-clients)
  (spawn-persistent-vat
    (make-persistence-env
      #:extends inbox-env)
    (lambda ()
      (spawn-inbox))
    (make-syrup-store "registry.syrup")
    #:persist-on #f
    #:persistence-registry persistence-registry))

(let ((id (with-vat net ($ capn 'register put-client 'tcp-tls))))
  (print-id "Server ID" id))

(define all-clients (with-vat vat (spawn ^cell '())))

(define (submit-square-jobs v)
  (let ((clients (map (lambda (x) (<- get-client)) v)))
    ($ all-clients (append clients ($ all-clients)))
    (map (cut <- <> 'square <>) clients v)))

(define (submit-finish-jobs clients)
  (map (cut <- <> 'finish) (delete-duplicates clients)))

(define-syntax submit-reduce
  (syntax-rules ()
    ((submit-reduce reduce)
     reduce)
    ((submit-reduce submit v reduce ...)
     (on (all-of* submit)
       (lambda (v)
         (submit-reduce reduce ...))))))

(define v '(1 2 3 4 5))
(with-vat vat
  (submit-reduce
    (submit-square-jobs v) w
    (submit-square-jobs w) t
    (submit-square-jobs t) res
    (begin
      (format #t "~a\n" (sqrt (sqrt (sqrt (fold + 0 res)))))
      (on (all-of* ($ all-clients))
        submit-finish-jobs))))

(sleep 3600)

(Preliminary) conclusion

At this point, the goal set out at the beginning of this series of blog posts is met. We have developed a client and server structure in which the clients register with the server and the server hands them computation tasks that correspond to a sequence of loops. As already said above, the result is even a bit more flexible than with MPI: The number of clients need not be known and communicated to the server in advance, but clients can come and go, as long as they do not vanish in the middle of a task. And Goblins make it possible to do so over the Internet, either with TCP/TLS or even through the Tor network.