#lang scheme/base
(require "../base.ss")
(require scheme/async-channel
scheme/class
scheme/dict
scheme/match
(planet untyped/unlib:3/match)
(planet untyped/unlib:3/log)
"database.ss")
(define-syntax-rule (add1! id)
(set! id (add1 id)))
(define-syntax-rule (sub1! id)
(set! id (sub1 id)))
(define-struct acquisition (id dead-evt start-time))
(define connection-pooled-database<%>
(interface ()))
(define connection-pool-mixin
(mixin (database<%>) (connection-pooled-database<%>)
(inherit reset-connection)
(init-field min-connections)
(init-field max-connections)
(field [manager-channel (make-async-channel 1)])
(field [available-connections (make-async-channel max-connections)])
(field [acquired-count 0])
(field [available-count 0])
(super-new)
(field [manager-thread (thread (cut manage-connections))])
(define initialized? #f)
(define/private (manage-connections)
(define gain 0.5)
(define (clip x)
(let* ([current (+ available-count acquired-count)]
[goal (+ x current)])
(cond
[(< goal min-connections) (- min-connections current)]
[(< max-connections goal) (- max-connections current)]
[else x])))
(define (create-connection)
(async-channel-put available-connections (super connect))
(add1! available-count))
(define (free-connection)
(let ([conn (async-channel-try-get available-connections)])
(if conn
(begin (super disconnect conn) (sub1! available-count))
(log-debug* "Snooze connection pool attempted to free connection but none were available"
available-count acquired-count))))
(define (initialise)
(with-handlers
([exn? (lambda (exn)
(log-error*
"Snooze connection pool manager caused exception on initialisation" exn)
(raise exn))])
(unless initialized?
(for ([index (in-range min-connections)])
(create-connection))
(set! initialized? #t)
(log-debug* "Snooze connection pool initialised"))))
(define (free-connections n)
(for ([i (in-range n)])
(free-connection)))
(define (create-connections n)
(for ([i (in-range n)])
(create-connection)))
(let loop ([acquired-connections null]
[waiting 0])
(let* ([demand (+ waiting acquired-count)]
[total (+ available-count acquired-count)]
[error (- demand total)]
[change (clip (round (* error gain)))])
(log-debug* "Snooze connection pool decision variables"
demand total waiting available-count acquired-count)
(cond
[(zero? change)
#t]
[(< change 0)
(let ([n-to-free (- change)])
(log-debug* "Snooze connection pool decided to free connections" n-to-free)
(free-connections n-to-free))]
[(> change 0)
(let ([n-to-create change])
(log-debug* "Snooze connection pool decided to create connections" n-to-create)
(create-connections n-to-create))]))
(match (sync manager-channel
(wrap-evt
(apply choice-evt (dict-map acquired-connections
(lambda (conn acq)
(acquisition-dead-evt acq))))
(lambda (evt)
(list 'unclaim evt))))
[(list 'connect time thd)
(unless initialized?
(initialise))
(let ([id (gensym)])
(with-handlers ([exn? (lambda (exn) (loop acquired-connections waiting))])
(thread-send thd id)
(log-debug* "Snooze connection pool connection attempt started"
id time (add1 waiting) available-count acquired-count))
(loop acquired-connections (add1 waiting)))]
[(list 'connected time thd id conn)
(sub1! available-count)
(add1! acquired-count)
(log-debug* "Snooze connection pool connection attempt succeeded"
id time (sub1 waiting) available-count acquired-count)
(loop (dict-set acquired-connections conn
(make-acquisition id (thread-dead-evt thd) time))
(sub1 waiting))]
[(list 'disconnect conn time)
(let ([acq (dict-ref acquired-connections conn #f)])
(if acq
(begin
(async-channel-put available-connections conn)
(add1! available-count)
(sub1! acquired-count)
(log-debug* "Snooze connection pool accepted disconnect"
(acquisition-id acq) (current-inexact-milliseconds)
waiting available-count acquired-count)
(loop (dict-remove acquired-connections conn)
waiting))
(begin
(log-warning* "Snooze connection pool refused disconnect"
waiting available-count acquired-count)
(loop acquired-connections
waiting))))]
[(list 'unclaim evt)
(let-values ([(conn acq)
(for/fold ([conn #f] [acq #f])
([(the-conn the-acq) (in-dict acquired-connections)])
(if (equal? evt (acquisition-dead-evt the-acq))
(values the-conn the-acq)
(values conn acq)))])
(if conn
(begin
(reset-connection conn)
(async-channel-put available-connections conn)
(add1! available-count)
(sub1! acquired-count)
(log-debug* "Snooze connection pool retrieved connection on thread death"
(acquisition-id acq) (current-inexact-milliseconds)
waiting available-count acquired-count)
(loop (dict-remove acquired-connections conn)
waiting))
(begin
(log-warning* "Snooze connection pool had thread death event for thread we aren't monitoring"
waiting available-count acquired-count)
(loop acquired-connections
waiting))))])))
(define/override (connect)
(let* ([me (current-thread)]
[start (current-inexact-milliseconds)]
[id (begin
(async-channel-put
manager-channel
(list 'connect start me))
(thread-receive))]
[conn (async-channel-get available-connections)]
[stop (current-inexact-milliseconds)])
(async-channel-put
manager-channel
(list 'connected stop me id conn))
(log-info* "PROFILE" "Connection pool connect" (- stop start))
conn))
(define/override (disconnect conn)
(let ([now (current-inexact-milliseconds)])
(async-channel-put manager-channel (list 'disconnect conn now))
(log-info* "PROFILE" "Connection pool disconnect" (- (current-inexact-milliseconds) now))))))
(provide connection-pooled-database<%>
connection-pool-mixin)