#lang racket/base
(require racket/contract
racket/tcp
racket/match
racket/function
mzlib/os
racket/serialize)
(define wu-key? any/c)
(struct client (out lock pending-actions) #:mutable)
(provide/contract
[client? (-> any/c boolean?)]
[connect-to-tracker (->* (string?) (exact-integer? string?) client?)]
[client-who-am-i (-> client? any/c)]
[client-workunit-info (-> client? string?
(list/c symbol? any/c any/c any/c))]
[client-call-with-workunit-info (-> client? string?
(-> symbol? any/c any/c any/c any/c)
any/c)]
[client-wait-for-work (-> client?
(list/c wu-key? any/c))]
[client-call-with-work (-> client?
(-> wu-key? any/c any/c) any/c)]
[client-add-workunit (-> client? serializable? wu-key?)]
[client-call-with-new-workunit (-> client? serializable?
(-> wu-key? any/c) any/c)]
[client-wait-for-finished-workunit (-> client? wu-key?
(list/c wu-key? symbol? any/c any/c))]
[client-call-with-finished-workunit (-> client? wu-key?
(-> wu-key? symbol? any/c any/c any/c)
any/c)]
[client-complete-workunit! (-> client? wu-key? boolean? serializable?
any/c)])
(define (client-react client datum)
(call-with-semaphore (client-lock client)
(λ()
(define eaten-proc
(for/first ([proc (in-list (client-pending-actions client))]
#:when (proc datum))
proc))
(cond
[eaten-proc
(set-client-pending-actions! client
(remove eaten-proc (client-pending-actions client)))]
[else
(error "Server sent us something we weren't expecting:" datum)]))))
(define (client-register-expector client proc)
(call-with-semaphore (client-lock client)
(λ()
(set-client-pending-actions! client
(append (client-pending-actions client) (list proc))))))
(define-syntax-rule (client-request-response client send pattern value)
(begin
(define chan (make-channel))
(client-register-expector client
(λ(datum)
(match datum
[pattern (channel-put chan value) #t]
[else #f])))
(client-send client send)
(channel-get chan)))
(define-syntax-rule (client-expect/callback client pattern value)
(begin
(client-register-expector client
(λ(datum)
(match datum
[pattern (thread (λ() value)) #t]
[else #f])))))
(define (client-send client datum)
(write datum (client-out client))
(flush-output (client-out client)))
(define (connect-to-tracker host [port 2355] [client-name (gethostname)])
(define-values (in out) (tcp-connect host port))
(define cl (client out (make-semaphore 1) '()))
(client-send cl (list 'hello-from client-name))
(thread (λ() (let loop ()
(define datum (read in))
(client-react cl datum)
(loop))))
cl)
(define (client-who-am-i client)
(client-request-response client
(list 'who-am-i)
(list 'you-are name)
name))
(define (client-workunit-info client key)
(client-request-response client
(list 'workunit-info key)
(list 'workunit (? (curry equal? key)) status wu-client result last-change)
(list status wu-client result last-change)))
(define (client-call-with-workunit-info client key thunk)
(client-expect/callback client
(list 'workunit (? (curry equal? key)) status wu-client result last-change)
(thunk status wu-client result last-change))
(client-send client (list 'workunit-info key)))
(define (client-wait-for-work client)
(client-request-response client
(list 'wait-for-work)
(list 'assigned-workunit key data)
(list key data)))
(define (client-call-with-work client thunk)
(client-expect/callback client
(list 'assigned-workunit key data)
(thunk key data))
(client-send client (list 'wait-for-work)))
(define (client-add-workunit client data)
(client-request-response client
(list 'add-workunit! data)
(list 'added-workunit key)
key))
(define (client-call-with-new-workunit client data thunk)
(client-expect/callback client
(list 'added-workunit key)
(thunk key))
(client-send client (list 'add-workunit! data)))
(define (client-wait-for-finished-workunit client key)
(client-request-response client
(list 'monitor-workunit-completion key)
(list 'workunit-complete
(? (curry equal? key) wu-key) status client result)
(list wu-key status client result)))
(define (client-call-with-finished-workunit client key thunk)
(client-expect/callback client
(list 'workunit-complete
(? (curry equal? key) wu-key) status client result)
(thunk wu-key status client result))
(client-send client (list 'monitor-workunit-completion key)))
(define (client-complete-workunit! client key error? result)
(client-send client (list 'complete-workunit! key error? result)))