Goblins for number theory, part 4
Client and server Goblins
After having introduced the basic concepts of Goblins, and in particular promises; after having looked at parallelisation over the network; and after an excursion to persistence, it is now time to get to the main topic. We would like more flexibility, as well in the behaviour of the clients as in the nature of the tasks they are handling and the control flow in the server. Clients should be able to come and go, maybe complete only one task never to be seen again (we will not handle the case of faults, however, that is, clients accepting a task and disappearing before completing it). Tasks could be heterogeneous, that is, take more or less time, or, equivalently, the clients could run on heterogeneous machines, and it would be nice to give out a new task to a client as soon as it finishes the previous one. And we would like the server to be able to work in rounds; in essence, distribute unrelated tasks corresponding to a loop, then gather the results and start with the next loop.
After David Thompson and Jessica Tallon had a look at my first solution,
which had performance problems I could not explain, they came up with a
much better idea, so I will present their solution without losing time
in explaining what went wrong. Suffice it to say that one should avoid
nesting with-vat
expressions. In my experience, doing so with
the same vat leads to a deadlock; doing so with different vats seems to
work, but cause a severe performance penalty.
Queueing up clients, tasks and other promises
Our current solution already keeps a list of clients at the server, to
which clients can register in the background. Instead of waiting until
a fixed number of clients have arrived, we should be more dynamic and
implement the server as follows. As long as there are tasks to be
submitted and the client list is not empty, the server removes a client
from the client list and submits a task to this client. If the client list
is empty while there are still unsubmitted tasks, the server waits until a
new client registers. So far, this scheme uses each client for exactly one
task, and works if more clients register than there are tasks.
The trick is to let the client do its computation, and at the end
register itself again with the server as being available for the next task.
My first solution used the existing ^registry
actor, added an
'unregister
method used by the server to retrieve an available
client, and let the client call the 'register
method after
finishing a task. The problem with this straightforward approach is that
one needs to have the server wait when no client is available, and this
risks stalling everything. In a sense, we are back to the problem discussed
in the first post:
Goblins work with promises, and waiting for their resolution is not
a Goblins concept; one should not try to master the time and write code
to be executed at a specific moment, but rather define call-backs that
are run when promises become true.
Jessica and David pointed out to me that since version 0.14 of Goblins,
a suitable module is available in the actor library: the
inbox,
which is modelled after a post box that queues messages and delivers them
one by one on request. Actually it rather delivers parcels, since it is
a general first in, first out queue that can be filled with anything.
We will replace the current clients
list in a
^cell
actor by an inbox
that will contain
client actors.
The crucial difference with my home brew solution is that the level of
an inbox can go below zero without blocking:
If there are no elements in the queue, it nevertheless returns a promise to
a future element, in our case a client actor that will register later.
Thanks to
promise
pipelining, we can pretend that this empty promise is actually an actor
and send messages to it using <-
. Once a new actor
registers, the promise fulfills itself, and the new actor will receive
the message sent previously and act on it.
The essential modifications occur in the ^worker
type actor
in the client script:
(define-actor (^worker bcom server) #:self self (methods ((square x) (let ((res (* x x))) (format #t "square ~a\n" x) (sleep 3) (<- server self) res)) ((finish) (signal-condition! end))))
After computing the result (and sleeping a little bit for testing purposes,
since the tasks are so short that otherwise one client would end up
grabbing all of them before we have a chance to start a second one), the
client sends itself back to the server.
To this purpose, it needs to know the server, which can be passed to it
upon spawning; and it needs to have a notion of itself. This is why we
have replaced the define
by the more general
define-actor
, in which the optional
#:self self
defines a formal parameter self
to
later… speak to oneself!
Notice that we have also modified the client so that as in the first blog
posts, it does not register with its name (which thus is not passed on the
command line either anymore): Using an inbox
instead of the
custom registry function implies that we would need to encapsulate the
client actor into a composite data structure together with its name (for
instance,
SRFI-9
records), which is more hassle than warranted for our experimatal
code. To make up for it, we let the client itself print the computing tasks
it receives.
The complete client script, after some shuffling around so that things are
defined in the correct order, looks like this:
(use-modules (srfi srfi-1) (fibers conditions) (goblins) (goblins actor-lib methods) (goblins ocapn ids) (goblins ocapn captp) (goblins ocapn netlayer tcp-tls)) (define vat (spawn-vat)) (define net (spawn-vat)) (define end (make-condition)) (define-actor (^worker bcom server) #:self self (methods ((square x) (let ((res (* x x))) (format #t "square ~a\n" x) (sleep 3) (<- server self) res)) ((finish) (signal-condition! end)))) (define capn (with-vat net (spawn-mycapn (spawn ^tcp-tls-netlayer "localhost")))) (define server (with-vat vat (<- capn 'enliven (string->ocapn-id (second (command-line)))))) (define client (with-vat vat (spawn ^worker server))) (with-vat net ($ capn 'register client 'tcp-tls)) (with-vat vat (<- server client)) (wait end)
In the server, we essentially replace the custom ^registry
by an inbox
, which results in the following code:
(use-modules (srfi srfi-1) (srfi srfi-26) (goblins) (goblins actor-lib cell) (goblins actor-lib inbox) (goblins actor-lib joiners) (goblins ocapn ids) (goblins ocapn captp) (goblins ocapn netlayer tcp-tls) (goblins persistence-store syrup) (goblins vat)) (define persistence-vat (spawn-vat)) (define persistence-registry (with-vat persistence-vat (spawn ^persistence-registry))) (define-values (net capn) (spawn-persistent-vat (make-persistence-env #:extends (list captp-env tcp-tls-netlayer-env)) (lambda () (spawn-mycapn (spawn ^tcp-tls-netlayer "localhost"))) (make-syrup-store "ocapn.syrup") #:persistence-registry persistence-registry)) (define (print-id prefix id) (with-vat net (on id (lambda (sref) (format #t "~a ~a\n" prefix (ocapn-id->string sref)))))) (define-values (vat get-client put-client stop-clients) (spawn-persistent-vat (make-persistence-env #:extends inbox-env) (lambda () (spawn-inbox)) (make-syrup-store "registry.syrup") #:persist-on #f #:persistence-registry persistence-registry)) (let ((id (with-vat net ($ capn 'register put-client 'tcp-tls)))) (print-id "Server ID" id)) (define all-clients (with-vat vat (spawn ^cell '()))) (define v '(1 2 3 4 5)) (with-vat vat (let ((clients (map (lambda (x) (<- get-client)) v))) ($ all-clients (append clients ($ all-clients))) (on (all-of* (map (cut <- <> 'square <>) clients v)) (lambda (res) (format #t "~a\n" (sqrt (fold + 0 res))) (on (all-of* ($ all-clients)) (lambda (c) (map (cut <- <> 'finish) (delete-duplicates c)))))))) (sleep 3600)
The spawn-inbox
function does not return one actor, but
actually three at the same time: one for adding elements into the queue,
one for retrieving an element (or a promise thereof), and one for shutting
the inbox
down (which we will not use).
The expression
(map (lambda (x) (<- get-client)) v)
creates a list of (promises to) client actors that is as long as the size
of the vector. Then we send the tasks as before and use all-of*
to wait for their results. There is a little subtlety for sending the
'finish
messages: Since the variable clients
in general does not contain a list of client actors any more, but a list
of promises, we also need to use (on (all-of* …))
to
retrieve the actual list of actors. We go further by memorising all clients
ever encountered (with multiplicities, actually) in a separate cell
all-clients
. This is a bit convoluted at this point (since at
the end of the script, ($ all-clients)
is the same as
clients
), but will make things easier later.
Without any extra code for sending the 'finish
signal to the
clients, the main part of the server script could be condensed into only
a few lines:
(define v '(1 2 3 4 5)) (with-vat vat (on (all-of* (map (lambda (x) (<- (<- get-client) 'square x)) v)) (lambda (res) (format #t "~a\n" (sqrt (fold + 0 res))))))
Notice that this solution is strictly more general than that of the
previous posts:
If only one client registers, it runs all the squaring tasks; if a second
one arrives, it obtains every other task; and so on.
And… that's it! We have parallelised a for
loop which may
contain tasks of differing (and a priori unknown) lengths, and it can
handle the situation where clients join at any time.
To handle clients that may leave after a task is completed, the
framework is essentially there: Instead of having the client
register again after each computing task, this could be
made dependent on a condition to be checked in the client.
For handling faults, that is, clients which disappear in the middle of
a task, one would need to add timeouts at the server level and requeue
tasks for which the result has not appeared after a reasonable waiting
time, which would depend on the application.
Then all-of*
would not be a suitable joiner, but one could use
race
,
which resolves as soon as one of several promises resolves.
Or one could use all-of*
on a list of promises created
by race
from a computation promise and a timeout promise,
as given precisely as an example in the documentation of race
.
We will not pursue the topic of faults in this post, but it is clear
that Goblins mechanisms could be used to solve the problem.
In any case, our current Goblins code is already more flexible than the MPI solution, which assumes that all clients are known at the beginning of the computation and do not change throughout, and which also breaks in the presence of faults.
Time for crochet: loops after loops!
A common situation is that after running one loop, one needs to start a second round that continues the computations with the intermediate results that have just been obtained. In what follows, we will modify the server script accordingly, while keeping the client script unmodified, which may be seen as a sign that the architecture developed so far makes sense.
For instance, the following sequential code computes the L4-norm of a vector, that is, the fourth root of the sum of the fourth powers of its entries:
(use-modules (srfi srfi-1)) (define (square x) (* x x)) (define v '(1 2 3 4 5)) (define w (map square v)) (define res (map square w)) (format #t "~a\n" (sqrt (sqrt (fold + 0 res))))
It consists of two loops, one for squaring each entry in v
and putting the results into w
, and a second one for
squaring the entries in w
(which effectively computes the
fourth powers of the entries in v
).
This can be goblinified quite naturally by nesting the task submission
and (on (all-of* …))
handling of the results.
Without 'finish
signals, this results in the following code:
(define v '(1 2 3 4 5)) (with-vat vat (on (all-of* (map (lambda (x) (<- (<- get-client) 'square x)) v)) (lambda (w) (on (all-of* (map (lambda (x) (<- (<- get-client) 'square x)) w)) (lambda (res) (format #t "~a\n" (sqrt (sqrt (fold + 0 res)))))))))
Including 'finish
handling, the following server script is not
the shortest solution, but its symmetries will be helpful in the next
section. To make the code more readable, we have moved some (repetitive)
code into the functions submit-square-jobs
and
submit-finish-jobs
.
(use-modules (srfi srfi-1) (srfi srfi-26) (goblins) (goblins actor-lib cell) (goblins actor-lib inbox) (goblins actor-lib joiners) (goblins ocapn ids) (goblins ocapn captp) (goblins ocapn netlayer tcp-tls) (goblins persistence-store syrup) (goblins vat)) (define persistence-vat (spawn-vat)) (define persistence-registry (with-vat persistence-vat (spawn ^persistence-registry))) (define-values (net capn) (spawn-persistent-vat (make-persistence-env #:extends (list captp-env tcp-tls-netlayer-env)) (lambda () (spawn-mycapn (spawn ^tcp-tls-netlayer "localhost"))) (make-syrup-store "ocapn.syrup") #:persistence-registry persistence-registry)) (define (print-id prefix id) (with-vat net (on id (lambda (sref) (format #t "~a ~a\n" prefix (ocapn-id->string sref)))))) (define-values (vat get-client put-client stop-clients) (spawn-persistent-vat (make-persistence-env #:extends inbox-env) (lambda () (spawn-inbox)) (make-syrup-store "registry.syrup") #:persist-on #f #:persistence-registry persistence-registry)) (let ((id (with-vat net ($ capn 'register put-client 'tcp-tls)))) (print-id "Server ID" id)) (define all-clients (with-vat vat (spawn ^cell '()))) (define (submit-square-jobs v) (let ((clients (map (lambda (x) (<- get-client)) v))) ($ all-clients (append clients ($ all-clients))) (map (cut <- <> 'square <>) clients v))) (define (submit-finish-jobs clients) (map (cut <- <> 'finish) (delete-duplicates clients))) (define v '(1 2 3 4 5)) (with-vat vat (on (all-of* (submit-square-jobs v)) (lambda (w) (on (all-of* (submit-square-jobs w)) (lambda (res) (format #t "~a\n" (sqrt (sqrt (fold + 0 res)))) (on (all-of* ($ all-clients)) submit-finish-jobs)))))) (sleep 3600)
Untangling the threads: macros to the rescue
The last block of the server code now clearly shows a recurring pattern:
(on (all-of* SUBMIT SOME JOBS) (lambda (VAR) DO SOMETHING WITH THE RESULT IN VAR
which is actually nested, since handling the results of the first round requires to run the same pattern for the second round of job submissions. Now a pattern can be handled by a Guile macro, for instance as follows:
(define-syntax submit-reduce (syntax-rules () ((submit-reduce submit v reduce ...) (on (all-of* submit) (lambda (v) (begin reduce ...))))))
The line following the syntax-rules ()
contains a pattern
to be matched; the remainder of the macro is the Guile code above, with
placeholders replaced by parts of the matched pattern.
The first argument of the macro is a single expression corresponding to
SUBMIT SOME JOBS
; if several expressions are needed, they can
be transformed into only one using
let*
,
for instance.
The second argument is the (formal) variable name VAR
.
All remaining arguments (of which there may be zero) correspond to
DO SOMETHING WITH THE RESULT IN VAR
; these will in general
use the formal variable.
Using this macro, the main block of the server script can be compressed as follows:
(define v '(1 2 3 4 5)) (with-vat vat (submit-reduce (submit-square-jobs v) w (submit-reduce (submit-square-jobs w) res (format #t "~a\n" (sqrt (sqrt (fold + 0 res)))) (on (all-of* ($ all-clients)) submit-finish-jobs))))
It is also possible to let the macro itself handle the nesting as follows:
(define-syntax submit-reduce (syntax-rules () ((submit-reduce reduce) reduce) ((submit-reduce submit v reduce ...) (on (all-of* submit) (lambda (v) (submit-reduce reduce ...))))))
If the macro is called with at least three arguments, then the second
pattern (submit-reduce submit v reduce ...)
is matched.
The first argument (a single expression) is considered to be the job
submission phase, the second argument the variable name for the results
of the first jobs; then the macro is called recursively, and more
job submission phases, alternated with variable names, are expected;
in the end, when only one argument remains, the first pattern
(submit-reduce reduce)
is matched, which corresponds to the
handling of the results of the final round of job submissions.
So to work, the macro requires an odd number of arguments, otherwise it
raises an error (using just one argument is possible, but makes no sense).
With this macro, the main server block looks as follows:
(define v '(1 2 3 4 5)) (with-vat vat (submit-reduce (submit-square-jobs v) w (submit-square-jobs w) res (begin (format #t "~a\n" (sqrt (sqrt (fold + 0 res)))) (on (all-of* ($ all-clients)) submit-finish-jobs))))
Notice that we needed to wrap the final reduction into
(begin …)
since it consists of several expressions.
This macro, without its additional nesting, makes the sequence of
submitting a series of tasks, submitting a new series of tasks depending
on the results of the previous series, and so on, until the final result
is handled through a side effect (to break out of the promises), quite
clear. When exactly three arguments are given, both macros are equivalent,
so that it is still possible to manually nest the macro invocations, and
the second macro is more powerful than the first one (except that the first
one admits several Guile expressions for the reduction phase).
To illustrate the simplicity with which the pattern continues, here is the complete server script for computing the L8-norm of a vector, that is, the eightth root of the sum of the eigtth powers of its entries:
(use-modules (srfi srfi-1) (srfi srfi-26) (goblins) (goblins actor-lib cell) (goblins actor-lib inbox) (goblins actor-lib joiners) (goblins ocapn ids) (goblins ocapn captp) (goblins ocapn netlayer tcp-tls) (goblins persistence-store syrup) (goblins vat)) (define persistence-vat (spawn-vat)) (define persistence-registry (with-vat persistence-vat (spawn ^persistence-registry))) (define-values (net capn) (spawn-persistent-vat (make-persistence-env #:extends (list captp-env tcp-tls-netlayer-env)) (lambda () (spawn-mycapn (spawn ^tcp-tls-netlayer "localhost"))) (make-syrup-store "ocapn.syrup") #:persistence-registry persistence-registry)) (define (print-id prefix id) (with-vat net (on id (lambda (sref) (format #t "~a ~a\n" prefix (ocapn-id->string sref)))))) (define-values (vat get-client put-client stop-clients) (spawn-persistent-vat (make-persistence-env #:extends inbox-env) (lambda () (spawn-inbox)) (make-syrup-store "registry.syrup") #:persist-on #f #:persistence-registry persistence-registry)) (let ((id (with-vat net ($ capn 'register put-client 'tcp-tls)))) (print-id "Server ID" id)) (define all-clients (with-vat vat (spawn ^cell '()))) (define (submit-square-jobs v) (let ((clients (map (lambda (x) (<- get-client)) v))) ($ all-clients (append clients ($ all-clients))) (map (cut <- <> 'square <>) clients v))) (define (submit-finish-jobs clients) (map (cut <- <> 'finish) (delete-duplicates clients))) (define-syntax submit-reduce (syntax-rules () ((submit-reduce reduce) reduce) ((submit-reduce submit v reduce ...) (on (all-of* submit) (lambda (v) (submit-reduce reduce ...)))))) (define v '(1 2 3 4 5)) (with-vat vat (submit-reduce (submit-square-jobs v) w (submit-square-jobs w) t (submit-square-jobs t) res (begin (format #t "~a\n" (sqrt (sqrt (sqrt (fold + 0 res))))) (on (all-of* ($ all-clients)) submit-finish-jobs)))) (sleep 3600)
(Preliminary) conclusion
At this point, the goal set out at the beginning of this series of blog posts is met. We have developed a client and server structure in which the clients register with the server and the server hands them computation tasks that correspond to a sequence of loops. As already said above, the result is even a bit more flexible than with MPI: The number of clients need not be known and communicated to the server in advance, but clients can come and go, as long as they do not vanish in the middle of a task. And Goblins make it possible to do so over the Internet, either with TCP/TLS or even through the Tor network.