riot: Distributed computing for the masses
Riot is a distributed computing system for Racket. With Riot, you can run parallel tasks on multiple machines accross the network with minimal changes to your code.
You need a Racket that supports submodules. At the time of writing, only the nightly builds will work.
1 In which we construct a networked mapreduce cluster from scratch in about thirty seconds
- To get started, first start the tracker server. In a terminal, run:
$ racket -p gcr/riot/server
- Parellel code looks like this:
;; Save to simple-demo.rkt #lang racket (require (planet gcr/riot)) (define (run) (for/work ([i (in-range 10)]) (sleep 3) ; or some big task (* 10 i))) (module+ main (connect-to-riot-server! "localhost") (displayln (run)) ;; Note that (run) must be in a separate function--see below (displayln "Complete")) You just wrote the “master” program that hands out work and processes the results. The (for/work ...) form acts just like for/list, but it runs in parallell: for/work packages its body into “workunits” that will be run by other worker processes. for/work will generate one workunit for each iteration. (Using for/work is not the only way to control riot and it has restrictions and odd behavior, but it is the easiest.)
Go ahead and start your program:$ racket simple-demo.rkt
The loop runs 10 times, so your program will register 10 units of work with the tracker. It will then appear to freeze because there aren’t any workers to run the work yet. Once we make some workers, the tracker will assign workunits to them and will return results back to your program once the workers finish. Workers can even run on other machines; in these cases, the tracker will simply send workunits accross the network. There’s no difference between local and networked workers, so commandeer your entire computer lab if you like.
Let’s start some worker processes. If you want your workers to run on other machines, copy simple-demo.rkt there.
From the same directory that contains simple-demo.rkt, run$ racket -p gcr/riot/worker -- localhost
where localhost is the hostname of the tracker server you ran earlier. Once you start a worker, it will immediately start to process workunits. Once all workunits are finished, the master program will un-freeze and the for/work form will return the results of each workunit to the caller.
Add as many workers as you like. The more workers you run, the faster your program goes. If you kill a worker with Ctrl+C or subject it to some other horrible fate, the tracker server should notice and will reassign abandoned workunits to other workers.
If one of the workers throws an exception, the tracker will forward the exception to for/work, which will in turn will throw an exception with a message about which worker caused the problem. Don’t worry —
the tracker remembers completed workunits after your program exits, so if you run your program again, it will pick up where it left off. If you change your program, be sure to copy the new version to all of the workers and restart them all too. If you don’t, they might complain (throw exceptions) if you’re lucky, or they just might give results generated from the older code if you’re unlucky.
2 In which we gain significant speedups through the copious application of spare machinery
Here’s a slightly more complicated example. Here, we find all compound dictionary words: words in /usr/share/dict/words that are made by concatenating two other dictionary words.
Here’s some simple code to do that:
#lang racket ;; dict.rkt (require (planet gcr/riot)) (define dictionary ;; List of words (for/set ([word (in-list (file->lines "/usr/share/dict/words"))] #:when (>= (string-length word) 3)) word)) (define (word-combinations) (apply append ; This flattens the list (for/list ([first-word (in-set dictionary)]) (for/list ([second-word (in-set dictionary)] #:when (set-member? dictionary (string-append first-word second-word))) (cons first-word second-word))))) (module+ main (define words (time (word-combinations))) (printf "There are ~a words.\n" (length words)) ;; Print a random subset of the results. (write (take (shuffle words) 20)) (newline))
We can split up the outer loop of this dictionary search into parts
Each iteration of the outer loop doesn’t depend on any other iteration; each is self-contained
There isn’t very much processing to do after we have the final word list
cpu time: 11233134 real time: 11231587 gc time: 103748 |
There are 17658 words. |
(("for" . "going") ("tail" . "gating") ("minima" . "list's") ("wise" . "acres") ("mill" . "stone's") ("hare" . "brained") ("under" . "bids") ("Chi" . "lean") ("clod" . "hopper") ("reap" . "points") ("dis" . "missal's") ("scholars" . "hip's") ("over" . "load") ("kilo" . "watts") ("trash" . "cans") ("snaps" . "hot") ("lattice" . "work") ("mast" . "head") ("over" . "coming") ("whole" . "sales")) |
Add a (connect-to-riot-server!) call in the main submodule
Start workers. For this example, I ran twenty total workers on four spare lab machines and started the tracker server on "alfred"
#lang racket ;; dict.rkt (require (planet gcr/riot)) (define dictionary ;; List of words (for/set ([word (in-list (file->lines "/usr/share/dict/words"))] #:when (>= (string-length word) 3)) word)) (define (word-combinations) (apply append ; This flattens the list (for/work ([first-word (in-set dictionary)]) (for/list ([second-word (in-set dictionary)] #:when (set-member? dictionary (string-append first-word second-word))) (cons first-word second-word))))) (module+ main (connect-to-riot-server! "alfred") (define words (time (word-combinations))) (printf "There are ~a words.\n" (length words)) ;; Print a random subset of the results. (write (take (shuffle words) 20)) (newline))
$ ~/racket/bin/racket dict.rkt |
cpu time: 51903 real time: 1121990 gc time: 1732 |
There are 17658 words. |
(("nick" . "name's") ("head" . "lights") ("ran" . "sacks") ("disc" . "lose") ("build" . "ups") ("wind" . "breaks") ("hot" . "headed") ("god" . "parent") ("main" . "frame") ("fiddle" . "sticks") ("pro" . "verbs") ("Volta" . "ire") ("select" . "ions") ("trail" . "blazer") ("bat" . "ten's") ("sniff" . "led") ("over" . "joys") ("down" . "hill") ("panel" . "led") ("tempera" . "ting")) |
This version took 18.7 minutes, which is about a factor of 10 improvement. Our cluster completed about 100 workunits (outer loop iterations) per second. To make this more efficient, we would want to find some way of splitting the work up into larger and less workunits to lower the tracker’s network overhead.
$ ~/racket/bin/racket dict.rkt |
cpu time: 30133 real time: 63214 gc time: 772 |
3 In which we present an overview and clarity is achieved
A master program
A tracker server
One or more worker processeses
3.1 The master program
The master program sends workunits to the tracker and waits for the tracker to send results back. To do this, the master program uses for/work, which sends units of work to the tracker and returns the results. The program can also use do-work, do-work/call, and do-work/eval forms for lower-level control.
3.2 The tracker
$ racket -p gcr/riot/server |
$ racket -p gcr/riot/server -- --port 12345 |
3.3 The workers
racket -p gcr/riot/worker -- server-host |
racket -p gcr/riot/worker -- --port 1234 --name worker-name server-host |
$ cd |
/tmp/demo; racket simple-demo.rkt |
$ cd /home/worker/demo; |
racket -p gcr/riot/worker -- tracker-server |
3.4 In which we describe the peculiarities of for/work and do-work
(do-work body ...)
(for/work (for-clause ...) body ...)
(let ([workunits (for/list (for-clause ...) (do-work body ...))]) (for/list ([p workunits]) (wait-until-done p)))
...only refer to serializable? values. If a free variable is unserializable, the workunit cannot be packaged up for transmission accross the network to workers.
...do not cause and do not depend on global side effects. Otherwise, each worker may have different state, causing unpredictable behavior.
The do-work form works by wrapping all of the body ... expressions inside a serial-lambda with no arguments. This effectively makes each workunit its own closure. Free variables are serialized when the workunit is sent to the tracker, and the resulting value is serialized on the return trip.
Workunits created by do-work (and, by extension, for/work) can refer to free variables, like this:
#lang racket (require (planet gcr/riot)) (define (run) (define master-random-number (round (* 100 (random)))) (define running-workunits (for/list ([x (in-range 10)]) (do-work (format "Workunit #~a: Master chose ~a, but we choose ~a." x master-random-number (round (* 100 (random))))))) (for ([workunit (in-list running-workunits)]) (displayln (wait-until-done workunit)))) (module+ main (connect-to-riot-server! "localhost") (run))
Workunit #0: Master chose 67.0, but we choose 27.0. |
Workunit #1: Master chose 67.0, but we choose 51.0. |
Workunit #2: Master chose 67.0, but we choose 49.0. |
Workunit #3: Master chose 67.0, but we choose 64.0. |
Workunit #4: Master chose 67.0, but we choose 62.0. |
Workunit #5: Master chose 67.0, but we choose 41.0. |
Workunit #6: Master chose 67.0, but we choose 5.0. |
Workunit #7: Master chose 67.0, but we choose 54.0. |
Workunit #8: Master chose 67.0, but we choose 100.0. |
Workunit #9: Master chose 67.0, but we choose 33.0. |
To ensure that workers use the same version of the code that the master thinks they’re using, do-work signs the code with an md5 hash of the body ... expressions. If a worker’s hash of a do-work body does not match the master’s hash, the worker will complain. Always keep all worker code up to date.
3.5 In which, with a heavy heart, we outline restrictions and limitations of for/work and do-work
The for/work and do-work forms don’t transmit their code; they only transmit the free variables that the bodies refer to. This creates a number of constraints:
for/work and do-work must be inside a module and this module must be manually copied to each worker. These forms won’t work from the REPL.
for/work and do-work do not currently work within submodules.
As mentioned earlier, all free variables that the body refers to must be serializable. This means using serialize-struct instead of normal structs and taking care to ensure that all required libraries do the same.
Be careful about referring to large variables. Top-level variables are OK, but if you dynamically generate a large variable in a function (say, our dictionary) outside of the workunit, the network overhead of transferring the entire dictionary will dwarf the computation time of the workunit.
In other words, this is perfectly fine because dictionary is in the top-level:
;; Acceptable! (define dictionary (list->set (file->lines "/usr/share/dict/words"))) (define (word-combinations) (for/work ([word (in-set dictionary)]) ... word ...)) Each worker will load the dictionary once when the file is required and use that copy for each workunit.
This, however, will send an entire copy of the dictionary along with each and every workunit:When workers attempt to execute a workunit created by a do-work or for/work form, they require the module and search for the code to be executed. Be sure that workers won’t execute either of these forms when the module is required, or else your workers will try to create workunits of their own!
3.6 In which we describe the numerous kinds of workunits and how to create them
There’s more to riot than for/work and do-work. If those functions seem too magical, you may be more comfortable with these instead.
(do-work/call module exported-fun arg ...) → any/c module : module-path? exported-fun : symbol? arg : serializable?
Returns instantly (after one round-trip to the tracker) and returns a workunit ID, a string representing a promise to do the work. Pass this value to wait-until-done to return the value of the called function.
Example:
#lang racket ;; This is do-work-call.rkt (require (planet gcr/riot)) (provide double) (define (double x) (* x 2)) (define (run) (define running-workunits (for/list ([x (in-range 10)]) (do-work/call "do-work-call.rkt" 'double x))) (map wait-until-done running-workunits)) (module+ main (connect-to-riot-server! "localhost") (run))
(do-work/eval datum) → any/c datum : any/c
Returns instantly (after one round-trip to the tracker) and returns a workunit ID, a string representing a promise to do the work. Pass this value to wait-until-done to return the evaluated value of datum.
This is the only function that does not require workers to share any code with the master.
Example:
#lang racket (require (planet gcr/riot)) (connect-to-riot-server! "localhost") (wait-until-done (do-work/eval '(+ 3 5)))
(wait-until-done workunit-id) → any/c workunit-id : any/c
(call-when-done workunit-id thunk) → any/c workunit-id : any/c thunk : (-> boolean? any/c any/c any/c)
Riot will call (thunk error? client-name result). If the workunit succeeds, error? is #f, client-name is the ID of the client that finished the workunit, and result is the result of the workunit. If the workunit fails, error? is #t and result is the message of the exception that caused the workunit to fail.
(connect-to-riot-server! hostname [ port client-name]) → any/c hostname : string? port : exact-integer? = 2355 client-name : exact-integer? = (gethostname)
(current-client) → client? (current-client client-obj) → void? client-obj : client?
4 In which we present a lower-level client API for communicating with the tracker
(connect-to-tracker hostname [ port client-name]) → client? hostname : string? port : exact-integer? = 2355 client-name : exact-integer? = (gethostname)
(client-who-am-i client) → any/c client : client?
(client-workunit-info client workunit-id) → (list/c symbol? any/c any/c any/c) client : client? workunit-id : any/c
(list status wu-client result last-change)
(client-call-with-workunit-info client workunit-id thunk) → any/c client : client? workunit-id : any/c thunk : (-> symbol? any/c any/c any/c any/c)
(client-wait-for-work client) → (list/c wu-key? any/c) client : client?
(client-add-workunit client data) → any/c client : client? data : serializable?
(client-call-with-new-workunit client data thunk) → any/c client : client? data : serializable? thunk : (-> any/c any/c)
(client-wait-for-finished-workunit client workunit) → (list/c wu-key? symbol? any/c any/c) client : client? workunit : any/c
(client-call-with-finished-workunit client workunit thunk) → any/c client : client? workunit : any/c thunk : (-> wu-key? symbol? any/c any/c any/c)
(client-complete-workunit! client workunit error? result) → any/c client : client? workunit : any/c error? : boolean? result : serializable?
5 In which we outline licensing and copyrights
The code in this (planet gcr/riot) package and this documentation is under the zlib license, reproduced below.
Copyright (c) 2012 gcr |
|
This software is provided 'as-is', without any express or implied |
warranty. In no event will the authors be held liable for any damages |
arising from the use of this software. |
|
Permission is granted to anyone to use this software for any purpose, |
including commercial applications, and to alter it and redistribute it |
freely, subject to the following restrictions: |
|
1. The origin of this software must not be misrepresented; you must not |
claim that you wrote the original software. If you use this software |
in a product, an acknowledgment in the product documentation would be |
appreciated but is not required. |
|
2. Altered source versions must be plainly marked as such, and must not be |
misrepresented as being the original software. |
|
3. This notice may not be removed or altered from any source |
distribution. |