(module rpc-server mzscheme
(require (planet "roos.scm" ("oesterholt" "roos.plt" 1 0)))
(require "rpc-function-definer.scm")
(require "rpc-io.scm")
(require "rpc-marshal.scm")
(require "rpc-log.scm")
(provide rpc-server
(all-from (planet "roos.scm" ("oesterholt" "roos.plt" 1 0))))
(define WELCOME (list "mzrpc" 0 1))
(def-class
(this (client-handler rpc-server from-client to-client))
(supers)
(private
(define _nr #f)
(define _connection-ended #f)
(define _logged-in #f)
(define _last-chalenge 0)
(define _autorized? (->> rpc-server autorized?))
(define _login-level #f)
)
(public
(define (notify-client argument)
(let ((R (write* (list '%rpc-notify% (rpc-marshal argument)) to-client )))
(not (io-error? R))))
(define (rpc-chalenge)
(set! _last-chalenge (+ (random 2000000000) 10000))
_last-chalenge)
(define (end-connection)
(if _connection-ended
'done
(begin
(set! _connection-ended #t)
(close-input-port from-client)
(close-output-port to-client)
'done)))
(define (server)
rpc-server)
(define (handle-wrong-input)
(if _connection-ended
'connection-ended
(let ((R (write* (list 'error (rpc-marshal "Wrong input")) to-client)))
(if (io-error? R)
(begin
(rpc-log "client: " _nr ":call-function: i/o error writing result to client.")
R)
'continue))))
(define (login user pass)
(let ((L (-> rpc-server login user pass _last-chalenge)))
(set! _logged-in (if (eq? L #f) #f L))
(rpc-log-info "client: " _nr ":login:user=" user ", result: " L)
L))
(define (my-rpc-call f-sym _args)
(rpc-log-debug "my-rpc-call:" f-sym _args)
(cond
((eq? f-sym 'rpc-connected) (-> rpc-server connected))
((eq? f-sym 'rpc-shutdown) (begin
(rpc-log-info "client: " _nr " _logged-in: " _logged-in " rpc-shutdown")
(if (eq? _logged-in 'admin)
(-> rpc-server shutdown)
'forbidden)))
((eq? f-sym 'rpc-force-shutdown) (begin
(rpc-log-info "client: " _nr " _logged-in: " _logged-in " rpc-shutdown")
(if (eq? _logged-in 'admin)
(-> rpc-server force-shutdown)
'forbidden)))
((eq? f-sym 'rpc-login) (let ((result (apply login _args)))
(set! _login-level result)
result))
((eq? f-sym 'rpc-chalenge) (rpc-chalenge))
(else (rpc-fcall this f-sym _args))))
(define (autorized? f-sym level)
(or (eq? f-sym 'rpc-login) (eq? f-sym 'rpc-end)
(eq? f-sym 'rpc-chalenge) (_autorized? f-sym level)))
(define (call-function rpc)
(let ((f-sym (if (and (list? rpc) (not (null? rpc))) (car rpc) '%rpc-wrong-input%))
(args (if (and (list? rpc) (not (null? rpc))) (cdr rpc) (list))))
(let ((_error #f)
(_result #f))
(let ((_args
(with-handlers ((exn:fail? (lambda (exn)
(set! _error (format "~a" (exn-message exn)))
#f)))
(map rpc-de-marshal args))))
(if (eq? _args #f)
(begin
(rpc-log-warn "client: " _nr ":call: " rpc " has marshaling problem: " _error)
(set! _result (list 'error (rpc-marshal "Marshaling problem."))))
(begin
(if (or (eq? f-sym 'rpc-login) (eq? f-sym 'rpc-end)
(eq? f-sym 'rpc-chalenge) (not (eq? _logged-in #f)) )
(if (eq? f-sym '%rpc-wrong-input%)
(begin
(rpc-log-warn "client: " _nr ": this is no function call: " rpc)
(set! _result (list 'error (rpc-marshal "This is no function call."))))
(if (autorized? f-sym _login-level)
(let ((C (rpc-check f-sym _args)))
(let ((result (if (eq? C #t)
(list 'rpc (rpc-marshal (my-rpc-call f-sym _args)))
(begin
(rpc-log-warn (format "~s" rpc) " : error : " C)
(list 'error (rpc-marshal C))))))
(set! _result result)))
(begin
(rpc-log-warn rpc " : client not autorized to call this function.")
(set! _result (list 'error (rpc-marshal "You are not autorized to call this function."))))
))
(if (eq? f-sym 'rpc-login)
(let ((C (rpc-check f-sym _args)))
(let ((result (if (eq? C #t)
(list 'rpc (rpc-marshal (my-rpc-call f-sym _args)))
(begin
(rpc-log-warn rpc " : error : " C)
(list 'error (rpc-marshal C))))))
(set! _result result)))
(begin
(rpc-log-warn "client: " _nr " not logged in:trying to execute function: " rpc)
(set! _result (list 'error (rpc-marshal "Login first")))))))))
(let ((R (write* _result to-client)))
(if (io-error? R)
(begin
(rpc-log "client: " _nr ":call-function: i/o error writing result to client.")
R)
(if (eq? f-sym 'rpc-end)
'stop
'continue))))))
(define (handle-rpc)
(let ((rpc (read* from-client)))
(if (eof-object? rpc)
(end-connection)
(if (wrong-input? rpc)
(if (eq? (handle-wrong-input) 'continue)
(handle-rpc)
(end-connection))
(if (eq? (call-function rpc) 'continue)
(handle-rpc)
(end-connection))))))
(define (number)
_nr)
(define (id)
_nr)
(define (run)
(set! _nr (-> rpc-server start-client this))
(write* WELCOME to-client)
(newline* to-client)
(-> rpc-server client-handler-started this)
(handle-rpc)
(-> rpc-server client-handler-ended this)
(-> rpc-server end-client this)
)
)
(constructor)
)
(def-class
(roos-doc (sp "This class provides a rpc server. It is initialized with a port number <port> "
"and a login provider <_login-provider>. The login provider is a function of 3 arguments: "
(s% "(lambda (user pass chalenge) ...)") ". The login provider must determine if the given "
"user <user> can login with the given password <pass>, given the given <chalenge>. It must "
"return #f, if the user cannot login and a symbol indicating the autorisation context otherwise.")
(s== "Example")
(sverb "(require (planet \"roos.scm\" (\"oesterholt\" \"roos.plt\" 1 0)))"
"(require \"mzrpc.scm\")"
""
";; define an function that can becalled over rpc"
"(rpc-define (plus (a number?) (b number?)) (+ a b))"
""
";; define an overridden class for rpc-server."
"(def-class"
" (this (my-rpc-server . args))"
" (supers (apply rpc-server args))"
" (private"
" (define _clients (make-hash-table))"
" (define (notifier c i)"
" (sleep 1)"
" (-> c notify-client i)"
" (notifier c (+ i 1)))"
" )"
" (public"
" (define (client-handler-started client)"
" (display \"client-handler-started called\\n\")"
" (hash-table-put! _clients client (thread (lambda ()"
" (notifier client 0)))))"
""
" (define (client-handler-ended client)"
" (display \"client-handler-ended called\\n\")"
" (let ((t (hash-table-get _clients client)))"
" (kill-thread t)))"
" )"
" (constructor)"
" )"
""
";; Define and call the server."
"(define S (my-rpc-server 4002 (lambda (user pass chalenge) 'admin)))"
"(-> S add '(admin) plus)"
"(-> S run)"
)
)
(this (rpc-server port _login-provider))
(supers)
(private
(define _rpc-functions (make-hash-table))
(define _stop-server #f)
(define _clients (make-hash-table))
(define _connected 0)
(define _nclients 0)
(define _listener #f)
(define (cleanup)
(hash-table-for-each _clients
(lambda (K C)
(-> C end-connection)))
(tcp-close _listener)
#t)
(define (connect-to-server)
(call-with-values
(lambda () (tcp-connect "localhost" port))
(lambda (from-server to-server)
(close-input-port from-server)
(close-output-port to-server))))
)
(public
(define (start-client C)
(hash-table-put! _clients C C)
(set! _connected (+ 1 _connected))
(set! _nclients (+ 1 _nclients))
(rpc-log-info "Client rpc handler " _nclients " started, currently handling " _connected " clients.")
_nclients)
(define (end-client C)
(hash-table-remove! _clients C)
(set! _connected (- _connected 1))
(rpc-log-info "Client rpc handler, ended client " (-> C number) " currently handling " _connected " clients.")
)
(define (connected)
_connected)
(define (shutdown)
(if (= _connected 1)
(begin
(set! _stop-server #t)
(connect-to-server)
'ok)
'clients-connected))
(define (force-shutdown)
(set! _stop-server #t)
(connect-to-server)
'ok)
(define (autorized? fsym type)
(let ((C (hash-table-get _rpc-functions fsym (lambda () #f))))
(if (eq? C #f)
#f
(let ((levels (cadr C)))
(if (eq? (memq type levels) #f)
#f
#t)))))
(define (login user pass chalenge)
(_login-provider user pass chalenge))
((define (sp "Adds the given rpc-functions, which have been defined using " (s% "rpc-define")
" to this server."))
(add levels . rpc-functions)
(for-each (lambda (rpc-f)
(let ((f-symbol (rpc-get-sym rpc-f)))
(if (eq? f-symbol #f)
(error (format "Function ~s is not defined as an RPC function." rpc-f))
(hash-table-put! _rpc-functions f-symbol (list rpc-f levels)))))
rpc-functions))
(define (do-run)
(if _stop-server
(cleanup)
(begin
(call-with-values
(lambda () (tcp-accept _listener))
(lambda (from-client to-client)
(if (not _stop-server)
(thread (lambda ()
(let ((obj (client-handler this from-client to-client)))
(-> obj run)))))))
(do-run))))
((define (sp "Run this server. It will start handling clients."))
(run)
(set! _listener (tcp-listen port 1000 #t))
(do-run))
((define (sp "This function can be overridden. It is called when a client connects with "
"the client object that handles the client."))
(client-handler-started client)
#t)
((define (sp "This function can be overridden. It is called when a client disconnects with "
"the client object that handles the client."))
(client-handler-ended client)
#t)
)
(constructor
)
)
)