util/connect.rkt
;; Copyright 2011 Ryan Culpepper
;; Released under the terms of the LGPL version 3 or later.
;; See the file COPYRIGHT for details.

#lang racket/base
(require racket/contract
         racket/class
         "../private/generic/interfaces.rkt"
         (only-in "../private/generic/functions.rkt" connection?))

;; Kill-safe wrapper

;; Note: wrapper protects against kill-thread, but not from
;; custodian-shutdown of ports, etc.

(define kill-safe-connection%
  (class* object% (connection<%>)
    (init-private connection)

    (define req-channel (make-channel))

    (define safe-thread
      (thread/suspend-to-kill
       (lambda ()
         (let loop ()
           ((channel-get req-channel))
           (loop)))))

    (define/private (call proc)
      (thread-resume safe-thread)
      (let ([result #f]
            [sema (make-semaphore 0)])
        (channel-put req-channel
                     (lambda ()
                       (set! result
                             (with-handlers ([(lambda (e) #t)
                                              (lambda (e) (cons 'raise e))])
                               (cons 'values
                                     (call-with-values
                                         (lambda () (proc connection))
                                       list))))
                       (semaphore-post sema)))
        (semaphore-wait sema)
        (case (car result)
          ((values) (apply values (cdr result)))
          ((raise)  (raise (cdr result))))))

    (define-syntax-rule (define-forward (method arg ...) ...)
      (begin
        (define/public (method arg ...)
          (call (lambda (obj) (send obj method arg ...)))) ...))

    (define-forward
      (connected?)
      (disconnect)
      (get-dbsystem)
      (query fsym stmt)
      (prepare fsym stmt close-on-exec?)
      (free-statement stmt)
      (transaction-status fsym)
      (start-transaction fsym isolation)
      (end-transaction fsym mode))

    (super-new)))

;; ----

(define (kill-safe-connection connection)
  (new kill-safe-connection%
       (connection connection)))

;; ========================================

;; Virtual connection

(define virtual-connection%
  (class* object% (connection<%> no-cache-prepare<%>)
    (init-private connector     ;; called from client thread
                  get-key       ;; called from client thread
                  timeout)
    (super-new)

    (define custodian (current-custodian))
    (define req-channel (make-channel))

    ;; == methods called in manager thread ==

    ;; key=>conn : hasheq[key => connection]
    (define key=>conn (make-hasheq))

    ;; alarms : hasheq[connection => evt] (alarm wrapped to return key)
    (define alarms (make-hasheq))

    (define/private (get key) ;; also refreshes alarm
      (let ([c (hash-ref key=>conn key #f)])
        (when c (hash-set! alarms c (fresh-alarm-for key)))
        c))

    (define/private (put! key c)
      (hash-set! key=>conn key c)
      (hash-set! alarms c (fresh-alarm-for key)))

    (define/private (fresh-alarm-for key)
      (wrap-evt (alarm-evt (+ (current-inexact-milliseconds) timeout))
                (lambda (a) key)))

    (define/private (remove! key timeout?)
      ;; timeout? = if connection open, then wait longer
      (let* ([c (hash-ref key=>conn key #f)]
             [in-trans? (with-handlers ([exn:fail? (lambda (e) #f)])
                          (and c (send c transaction-status 'virtual-connection)))])
        (cond [(not c) (void)]
              [(and timeout? in-trans?)
               (hash-set! alarms c (fresh-alarm-for key timeout))]
              [else
               (hash-remove! key=>conn key)
               (hash-remove! alarms c)
               (send c disconnect)])))

    (define/private (manage)
      (sync (handle-evt req-channel (lambda (proc) (proc)))
            (let ([keys (hash-map key=>conn (lambda (k v) k))])
              (handle-evt (apply choice-evt keys)
                          ;; Assignment to key has expired: move to idle or disconnect.
                          (lambda (key)
                            (dbdebug "virtual-connection: key expiration: ~e" key)
                            (remove! key #f))))
            (let ([alarm-evts (hash-map alarms (lambda (k v) v))])
              (handle-evt (apply choice-evt alarm-evts)
                          ;; Disconnect idle connection.
                          (lambda (key)
                            (dbdebug "virtual-connection: timeout")
                            (remove! key #t)))))
      (manage))

    (define manager-thread
      (thread/suspend-to-kill (lambda () (manage))))

    ;; == methods called in client thread ==

    (define/private (get-connection create?)
      (thread-resume manager-thread)
      (let* ([key (get-key)]
             [c #f]
             [sema (make-semaphore 0)])
        (channel-put req-channel
                     (lambda ()
                       (set! c (get key))
                       (semaphore-post sema)))
        (semaphore-wait sema)
        (cond [(and c (send c connected?)) c]
              [create?
               (let ([c* (parameterize ((current-custodian custodian))
                           (connector))])
                 (channel-put req-channel
                              (lambda ()
                                (when c (remove! key #f))
                                (put! key c*)))
                 c*)]
              [else
               (when c  ;; got disconnected connection!
                 (channel-put req-channel (remove! key #f)))
               #f])))

    ;; ----

    (define-syntax-rule (define-forward (req-con? no-con (method arg ...)) ...)
      (begin (define/public (method arg ...)
               (let ([c (get-connection req-con?)])
                 (if c
                     (send c method arg ...)
                     no-con)))
             ...))

    (define-forward
      (#f #f     (connected?))
      (#t '_     (get-dbsystem))
      (#t '_     (query fsym stmt))
      (#t '_     (start-transaction fsym isolation))
      (#f (void) (end-transaction fsym mode))
      (#f #f     (transaction-status fsym)))

    (define/public (disconnect)
      (let ([c (get-connection #f)]
            [key (get-key)])
        (when c
          (send c disconnect)
          (thread-resume manager-thread)
          (channel-put req-channel
                       (lambda () (remove! key #f #f)))))
      (void))

    (define/public (prepare fsym stmt close-on-exec?)
      (unless close-on-exec?
        (error fsym "cannot prepare statement with virtual connection"))
      (send (get-connection #t) prepare fsym stmt close-on-exec?))

    (define/public (free-statement stmt)
      (error 'free-statement
             "internal error: virtual connection does not own statements"))))

;; ----

(define (virtual-connection connector
                            #:timeout [timeout +inf.0])
  (let ([connector
         (cond [(connection-pool? connector)
                (lambda () (connection-pool-lease connector))]
               [else connector])]
        [get-key (lambda () (thread-dead-evt (current-thread)))])
    (new virtual-connection%
         (connector connector)
         (get-key (lambda () (thread-dead-evt (current-thread))))
         (timeout (* 1000 timeout)))))

;; ========================================

;; Connection pool

(define connection-pool%
  (class* object% ()
    (init-private connector              ;; called from manager thread
                  max-connections
                  max-idle-connections)
    (super-new)

    ;; max-connections is either in [1, 10000] or +inf.0,
    ;; if leave-evt is sema, then counter = (max-connections - assigned connections)
    ;; ie, includes idle connections
    (define lease-evt
      (if (= max-connections +inf.0)
          always-evt
          (make-semaphore max-connections)))

    (define req-channel (make-channel))

    (define proxy-counter 1) ;; for debugging
    (define actual-counter 1) ;; for debugging
    (define actual=>number (make-weak-hasheq))

    ;; == methods called in manager thread ==

    ;; proxy=>evt : hasheq[proxy-connection => evt]
    (define proxy=>evt (make-hasheq))

    ;; idle-list : (listof raw-connection)
    (define idle-list null)

    (define/private (lease* key)
      (let* ([take-idle? (pair? idle-list)]
             [raw-c
              (cond [take-idle?
                     (begin0 (car idle-list)
                       (set! idle-list (cdr idle-list)))]
                    [else (new-connection)])]
             [proxy-number (begin0 proxy-counter (set! proxy-counter (add1 proxy-counter)))]
             [c (new proxy-connection% (pool this) (connection raw-c) (number proxy-number))])
        (dbdebug "connection-pool: leasing connection #~a (~a @~a)"
                 proxy-number
                 (if take-idle? "idle" "new")
                 (hash-ref actual=>number raw-c "???"))
        (hash-set! proxy=>evt c key)
        c))

    (define/private (release* proxy raw-c why)
      (dbdebug "connection-pool: releasing connection #~a (~a, ~a)"
               (send proxy get-number)
               (cond [(not raw-c) "no-op"]
                     [(< (length idle-list) max-idle-connections) "idle"]
                     [else "disconnect"])
               why)
      (hash-remove! proxy=>evt proxy)
      (when raw-c
        (with-handlers ([exn:fail? void])
          (send raw-c end-transaction 'connection-pool 'rollback))
        (cond [(< (length idle-list) max-idle-connections)
               (set! idle-list (cons raw-c idle-list))]
              [else (send raw-c disconnect)])
        (when (semaphore? lease-evt) (semaphore-post lease-evt))))

    (define/private (new-connection)
      (let ([c (connector)]
            [actual-number (begin0 actual-counter (set! actual-counter (add1 actual-counter)))])
        (when (or (hash-ref proxy=>evt c #f) (memq c idle-list))
          (uerror 'connection-pool "connect function did not produce a fresh connection"))
        (hash-set! actual=>number c actual-number)
        c))

    (define/private (manage)
      (sync (handle-evt req-channel (lambda (proc) (proc)))
            (let ([evts (hash-map proxy=>evt (lambda (k v) (wrap-evt v (lambda (e) k))))])
              (handle-evt (apply choice-evt evts)
                          (lambda (proxy)
                            (release* proxy (send proxy release-connection) "release-evt")))))
      (manage))

    (define manager-thread
      (thread/suspend-to-kill (lambda () (manage))))

    ;; == methods called in client thread ==

    (define/public (lease key)
      (wrap-evt lease-evt
                (lambda (_e)
                  (thread-resume manager-thread)
                  (let* ([result #f]
                         [sema (make-semaphore 0)])
                    (channel-put req-channel
                                 (lambda ()
                                   (set! result (lease* key))
                                   (semaphore-post sema)))
                    (semaphore-wait sema)
                    result))))

    (define/public (release proxy)
      (thread-resume manager-thread)
      (let ([raw-c (send proxy release-connection)])
        (channel-put req-channel (lambda () (release* proxy raw-c "proxy disconnect"))))
      (void))))

;; --

(define proxy-connection%
  (class* locking% (connection<%>)
    (init-private connection
                  pool
                  number)
    (inherit call-with-lock)
    (super-new)

    (define-syntax-rule (define-forward defmethod (method arg ...) ...)
      (begin
        (defmethod (method arg ...)
          (call-with-lock 'method
            (lambda ()
              (let ([c connection])
                (unless c (error/not-connected 'method))
                (send c method arg ...)))))
        ...))

    (define-forward define/public
      (get-dbsystem)
      (query fsym stmt)
      (prepare fsym stmt close-on-exec?)
      (free-statement stmt)
      (transaction-status fsym)
      (start-transaction fsym isolation)
      (end-transaction fsym mode))

    ;; (define-forward define/override (connected?))
    (define/override (connected?) (and connection #t))

    (define/public (disconnect)
      (send pool release this))

    (define/public (get-number) number)

    (define/public (release-connection)
      (begin0 connection
        (set! connection #f)))))

;; ----

(define (connection-pool connector
                         #:max-connections [max-connections +inf.0]
                         #:max-idle-connections [max-idle-connections 10])
  (new connection-pool%
       (connector connector)
       (max-connections max-connections)
       (max-idle-connections max-idle-connections)))

(define (connection-pool? x)
  (is-a? x connection-pool%))

(define (connection-pool-lease pool [key (current-thread)])
  (let* ([key
          (cond [(thread? key) (thread-dead-evt key)]
                [(custodian? key) (make-custodian-box key #t)]
                [else key])]
         [result (sync/timeout 0.1 (send pool lease key))])
    (unless result
      (uerror 'connection-pool-lease
              "cannot obtain connection; connection pool limit reached"))
    result))

;; ========================================

(provide/contract
 [kill-safe-connection
  (-> connection? connection?)]

 [virtual-connection
  (->* ((or/c (-> connection?) connection-pool?))
       () ;; (#:timeout (and/c real? positive?))
       connection?)]
 [rename virtual-connection connection-generator
  (->* ((-> connection?))
       (#:timeout (and/c real? positive?))
       connection?)]

 [connection-pool
  (->* ((-> connection?))
       (#:max-connections (or/c (integer-in 1 10000) +inf.0)
        #:max-idle-connections (or/c (integer-in 1 10000) +inf.0))
       connection-pool?)]
 [connection-pool?
  (-> any/c boolean?)]
 [connection-pool-lease
  (->* (connection-pool?)
       ((or/c custodian? evt?))
       connection?)])

(require "private/radsn.rkt")
(provide (all-from-out "private/radsn.rkt"))