#lang racket/base
(require racket/contract
racket/class
"../private/generic/interfaces.rkt"
(only-in "../private/generic/functions.rkt" connection?))
(define kill-safe-connection%
(class* object% (connection<%>)
(init-private connection)
(define req-channel (make-channel))
(define safe-thread
(thread/suspend-to-kill
(lambda ()
(let loop ()
((channel-get req-channel))
(loop)))))
(define/private (call proc)
(thread-resume safe-thread)
(let ([result #f]
[sema (make-semaphore 0)])
(channel-put req-channel
(lambda ()
(set! result
(with-handlers ([(lambda (e) #t)
(lambda (e) (cons 'raise e))])
(cons 'values
(call-with-values
(lambda () (proc connection))
list))))
(semaphore-post sema)))
(semaphore-wait sema)
(case (car result)
((values) (apply values (cdr result)))
((raise) (raise (cdr result))))))
(define-syntax-rule (define-forward (method arg ...) ...)
(begin
(define/public (method arg ...)
(call (lambda (obj) (send obj method arg ...)))) ...))
(define-forward
(connected?)
(disconnect)
(get-dbsystem)
(query fsym stmt)
(prepare fsym stmt close-on-exec?)
(free-statement stmt)
(transaction-status fsym)
(start-transaction fsym isolation)
(end-transaction fsym mode))
(super-new)))
(define (kill-safe-connection connection)
(new kill-safe-connection%
(connection connection)))
(define virtual-connection%
(class* object% (connection<%> no-cache-prepare<%>)
(init-private connector get-key timeout)
(super-new)
(define custodian (current-custodian))
(define req-channel (make-channel))
(define key=>conn (make-hasheq))
(define alarms (make-hasheq))
(define/private (get key) (let ([c (hash-ref key=>conn key #f)])
(when c (hash-set! alarms c (fresh-alarm-for key)))
c))
(define/private (put! key c)
(hash-set! key=>conn key c)
(hash-set! alarms c (fresh-alarm-for key)))
(define/private (fresh-alarm-for key)
(wrap-evt (alarm-evt (+ (current-inexact-milliseconds) timeout))
(lambda (a) key)))
(define/private (remove! key timeout?)
(let* ([c (hash-ref key=>conn key #f)]
[in-trans? (with-handlers ([exn:fail? (lambda (e) #f)])
(and c (send c transaction-status 'virtual-connection)))])
(cond [(not c) (void)]
[(and timeout? in-trans?)
(hash-set! alarms c (fresh-alarm-for key timeout))]
[else
(hash-remove! key=>conn key)
(hash-remove! alarms c)
(send c disconnect)])))
(define/private (manage)
(sync (handle-evt req-channel (lambda (proc) (proc)))
(let ([keys (hash-map key=>conn (lambda (k v) k))])
(handle-evt (apply choice-evt keys)
(lambda (key)
(dbdebug "virtual-connection: key expiration: ~e" key)
(remove! key #f))))
(let ([alarm-evts (hash-map alarms (lambda (k v) v))])
(handle-evt (apply choice-evt alarm-evts)
(lambda (key)
(dbdebug "virtual-connection: timeout")
(remove! key #t)))))
(manage))
(define manager-thread
(thread/suspend-to-kill (lambda () (manage))))
(define/private (get-connection create?)
(thread-resume manager-thread)
(let* ([key (get-key)]
[c #f]
[sema (make-semaphore 0)])
(channel-put req-channel
(lambda ()
(set! c (get key))
(semaphore-post sema)))
(semaphore-wait sema)
(cond [(and c (send c connected?)) c]
[create?
(let ([c* (parameterize ((current-custodian custodian))
(connector))])
(channel-put req-channel
(lambda ()
(when c (remove! key #f))
(put! key c*)))
c*)]
[else
(when c (channel-put req-channel (remove! key #f)))
#f])))
(define-syntax-rule (define-forward (req-con? no-con (method arg ...)) ...)
(begin (define/public (method arg ...)
(let ([c (get-connection req-con?)])
(if c
(send c method arg ...)
no-con)))
...))
(define-forward
(#f #f (connected?))
(#t '_ (get-dbsystem))
(#t '_ (query fsym stmt))
(#t '_ (start-transaction fsym isolation))
(#f (void) (end-transaction fsym mode))
(#f #f (transaction-status fsym)))
(define/public (disconnect)
(let ([c (get-connection #f)]
[key (get-key)])
(when c
(send c disconnect)
(thread-resume manager-thread)
(channel-put req-channel
(lambda () (remove! key #f #f)))))
(void))
(define/public (prepare fsym stmt close-on-exec?)
(unless close-on-exec?
(error fsym "cannot prepare statement with virtual connection"))
(send (get-connection #t) prepare fsym stmt close-on-exec?))
(define/public (free-statement stmt)
(error 'free-statement
"internal error: virtual connection does not own statements"))))
(define (virtual-connection connector
#:timeout [timeout +inf.0])
(let ([connector
(cond [(connection-pool? connector)
(lambda () (connection-pool-lease connector))]
[else connector])]
[get-key (lambda () (thread-dead-evt (current-thread)))])
(new virtual-connection%
(connector connector)
(get-key (lambda () (thread-dead-evt (current-thread))))
(timeout (* 1000 timeout)))))
(define connection-pool%
(class* object% ()
(init-private connector max-connections
max-idle-connections)
(super-new)
(define lease-evt
(if (= max-connections +inf.0)
always-evt
(make-semaphore max-connections)))
(define req-channel (make-channel))
(define proxy-counter 1) (define actual-counter 1) (define actual=>number (make-weak-hasheq))
(define proxy=>evt (make-hasheq))
(define idle-list null)
(define/private (lease* key)
(let* ([take-idle? (pair? idle-list)]
[raw-c
(cond [take-idle?
(begin0 (car idle-list)
(set! idle-list (cdr idle-list)))]
[else (new-connection)])]
[proxy-number (begin0 proxy-counter (set! proxy-counter (add1 proxy-counter)))]
[c (new proxy-connection% (pool this) (connection raw-c) (number proxy-number))])
(dbdebug "connection-pool: leasing connection #~a (~a @~a)"
proxy-number
(if take-idle? "idle" "new")
(hash-ref actual=>number raw-c "???"))
(hash-set! proxy=>evt c key)
c))
(define/private (release* proxy raw-c why)
(dbdebug "connection-pool: releasing connection #~a (~a, ~a)"
(send proxy get-number)
(cond [(not raw-c) "no-op"]
[(< (length idle-list) max-idle-connections) "idle"]
[else "disconnect"])
why)
(hash-remove! proxy=>evt proxy)
(when raw-c
(with-handlers ([exn:fail? void])
(send raw-c end-transaction 'connection-pool 'rollback))
(cond [(< (length idle-list) max-idle-connections)
(set! idle-list (cons raw-c idle-list))]
[else (send raw-c disconnect)])
(when (semaphore? lease-evt) (semaphore-post lease-evt))))
(define/private (new-connection)
(let ([c (connector)]
[actual-number (begin0 actual-counter (set! actual-counter (add1 actual-counter)))])
(when (or (hash-ref proxy=>evt c #f) (memq c idle-list))
(uerror 'connection-pool "connect function did not produce a fresh connection"))
(hash-set! actual=>number c actual-number)
c))
(define/private (manage)
(sync (handle-evt req-channel (lambda (proc) (proc)))
(let ([evts (hash-map proxy=>evt (lambda (k v) (wrap-evt v (lambda (e) k))))])
(handle-evt (apply choice-evt evts)
(lambda (proxy)
(release* proxy (send proxy release-connection) "release-evt")))))
(manage))
(define manager-thread
(thread/suspend-to-kill (lambda () (manage))))
(define/public (lease key)
(wrap-evt lease-evt
(lambda (_e)
(thread-resume manager-thread)
(let* ([result #f]
[sema (make-semaphore 0)])
(channel-put req-channel
(lambda ()
(set! result (lease* key))
(semaphore-post sema)))
(semaphore-wait sema)
result))))
(define/public (release proxy)
(thread-resume manager-thread)
(let ([raw-c (send proxy release-connection)])
(channel-put req-channel (lambda () (release* proxy raw-c "proxy disconnect"))))
(void))))
(define proxy-connection%
(class* locking% (connection<%>)
(init-private connection
pool
number)
(inherit call-with-lock)
(super-new)
(define-syntax-rule (define-forward defmethod (method arg ...) ...)
(begin
(defmethod (method arg ...)
(call-with-lock 'method
(lambda ()
(let ([c connection])
(unless c (error/not-connected 'method))
(send c method arg ...)))))
...))
(define-forward define/public
(get-dbsystem)
(query fsym stmt)
(prepare fsym stmt close-on-exec?)
(free-statement stmt)
(transaction-status fsym)
(start-transaction fsym isolation)
(end-transaction fsym mode))
(define/override (connected?) (and connection #t))
(define/public (disconnect)
(send pool release this))
(define/public (get-number) number)
(define/public (release-connection)
(begin0 connection
(set! connection #f)))))
(define (connection-pool connector
#:max-connections [max-connections +inf.0]
#:max-idle-connections [max-idle-connections 10])
(new connection-pool%
(connector connector)
(max-connections max-connections)
(max-idle-connections max-idle-connections)))
(define (connection-pool? x)
(is-a? x connection-pool%))
(define (connection-pool-lease pool [key (current-thread)])
(let* ([key
(cond [(thread? key) (thread-dead-evt key)]
[(custodian? key) (make-custodian-box key #t)]
[else key])]
[result (sync/timeout 0.1 (send pool lease key))])
(unless result
(uerror 'connection-pool-lease
"cannot obtain connection; connection pool limit reached"))
result))
(provide/contract
[kill-safe-connection
(-> connection? connection?)]
[virtual-connection
(->* ((or/c (-> connection?) connection-pool?))
() connection?)]
[rename virtual-connection connection-generator
(->* ((-> connection?))
(#:timeout (and/c real? positive?))
connection?)]
[connection-pool
(->* ((-> connection?))
(#:max-connections (or/c (integer-in 1 10000) +inf.0)
#:max-idle-connections (or/c (integer-in 1 10000) +inf.0))
connection-pool?)]
[connection-pool?
(-> any/c boolean?)]
[connection-pool-lease
(->* (connection-pool?)
((or/c custodian? evt?))
connection?)])
(require "private/radsn.rkt")
(provide (all-from-out "private/radsn.rkt"))