#lang scheme/base
(require (prefix-in sqlite: (planet jaymccarthy/sqlite:4))
(prefix-in log: (planet synx/log))
(planet synx/util/unwind-protect)
(prefix-in finalize: (planet synx/util/finalize))
scheme/class
scheme/contract
scheme/match)
(define (make-with-semaphore)
(define semaphore (make-semaphore 1))
(define locked? (make-parameter #f))
(λ (next)
(if (locked?)
(next)
(dynamic-wind
(λ () (semaphore-wait/enable-break semaphore))
(λ ()
(parameterize ((locked? #t))
(next)))
(λ () (semaphore-post semaphore))))))
(define statement%
(class object%
(init-field get-context sql with-semaphore)
(define statement #f)
(define params #f)
(define (get-statement)
(or statement
(with-semaphore
(λ ()
(set! statement (sqlite:prepare (get-context) sql))
(when params
(log:info "Loading params ~s ~s" sql params)
(apply sqlite:load-params (get-statement) params))
statement))))
(define (set-params! new-params)
(when (not (null? new-params))
(log:info "Setting params ~s" new-params)
(set! params new-params)
(when statement
(log:info "Preloading params ~s" params)
(apply sqlite:load-params statement params))))
(super-new)
(define/public (load . params)
(with-semaphore
(λ () (set-params! params))))
(define/public (finalize)
(with-semaphore
(λ ()
(when statement
(when (sqlite:open-statement? statement)
(sqlite:finalize statement))
(set! statement #f)))))
(define (do-reset)
(when statement
(sqlite:reset statement)))
(define (with-resetting params next)
(with-semaphore
(λ ()
(set-params! params)
(dynamic-wind
do-reset
next
do-reset))))
(define/public (reset)
(with-semaphore do-reset))
(define/public (for-each proc . params)
(with-resetting
params
(λ ()
(let loop ()
(log:info "foreach Stepping")
(let ([row (sqlite:step (get-statement))])
(when row
(apply proc (vector->list row)))
(loop))))))
(define/public (fold proc init . params)
(with-resetting
params
(λ ()
(let loop ([result init])
(let ([row (sqlite:step (get-statement))])
(if row
(loop (proc row result))
result))))))
(define/public (once . params)
(with-resetting
params
(λ () (sqlite:step (get-statement)))))
(define/public (map proc)
(begin0
(reverse
(fold (λ (row result) (cons (apply proc (vector->list row)) result)) null))))))
(define (hash-for-each-value h proc)
(let loop ((i (hash-iterate-first h)))
(when i
(proc (hash-iterate-value h i))
(loop (hash-iterate-next h i)))))
(define connection%
(class object%
(init-field (path ':memory:))
(init (close-delay 5))
(super-new)
(define context #f)
(when (string? path)
(set! path (string->path path)))
(define (get-context)
(or context
(begin
(set! context (sqlite:open path))
context)))
(define with-semaphore (make-with-semaphore))
(define statements (make-immutable-hash null))
(define/public (close)
(with-semaphore
(λ () (when context
(hash-for-each
statements
(λ (sql statement)
(send statement finalize)))
(sqlite:close context)
(set! context #f)))))
(define/public (clear)
(with-semaphore
(λ ()
(close)
(set! statements (make-immutable-hash null)))))
(when close-delay
(thread
(λ ()
(let loop ()
(sleep close-delay)
(let retry ((retries 0))
(when
(with-handlers
(((λ (e)
(and (< retries 10)
(exn:fail? e)))
(λ (e) #t)))
(close)
#f)
(log:info "Retrying close ~s" retries)
(sleep 1)
(retry (+ retries 1))))
(collect-garbage)
(loop)))))
(define/public (reset)
(with-semaphore
(λ ()
(when context
(hash-for-each-value
statements
(λ (statement)
(send statement reset)))))))
(define/public (prepare sql)
(define (new-statement)
(let ((stmt (make-object statement% get-context sql with-semaphore)))
(set! statements (hash-set statements sql stmt))
stmt))
(with-semaphore
(λ () (hash-ref statements sql new-statement))))
(define transaction-level 0)
(define with-transaction-semaphore (make-with-semaphore))
(define (adjust-transaction type)
(with-transaction-semaphore
(λ ()
(when (not (eq? type 'begin))
(set! transaction-level (- transaction-level 1)))
(when (= transaction-level 0)
(send (prepare
(case type
((begin) "BEGIN")
((rollback) "ROLLBACK")
(else "END")))
once))
(when (eq? type 'begin)
(set! transaction-level (+ transaction-level 1))))))
(define/public (with-transaction body)
(with-semaphore
(λ ()
(adjust-transaction 'begin)
(begin0
(call-with-exception-handler
(λ (e)
(log:info "rolling back boo ~s" (exn-message e))
(send this reset)
(adjust-transaction 'rollback)
(log:info "rolled")
e)
body)
(adjust-transaction 'end)))))
(define (do-clear (myself #f))
(clear))
(finalize:register this do-clear)
(define/public (with-clearing body)
(rewind-protect
body
do-clear))
(define/public (errmsg)
(with-semaphore
(λ () (sqlite:errmsg (get-context)))))
(define/public (changes-count)
(with-semaphore
(λ ()
(sqlite:changes-count context))))
(define/public (last-insert)
(with-semaphore
(λ () (sqlite:last-insert-rowid context))))
(define/public (map proc sql . params)
(let ([stmt (prepare sql)])
(send/apply stmt load params)
(send stmt map proc)))
(define/public (for-each proc sql . params)
(let ([stmt (prepare sql)])
(send/apply stmt load params)
(send stmt for-each proc)))
(define/public (fold proc init sql . params)
(let ([stmt (prepare sql)])
(send/apply stmt load params)
(send stmt fold proc init)))
(define/public (once sql . params)
(let ([stmt (prepare sql)])
(send/apply stmt once params)))
))
(define-syntax-rule (with-transaction c body ...)
(send c with-transaction (λ () body ...)))
(define-syntax-rule (with-resetting stmt body ...)
(send stmt with-resetting null (λ () body ...)))
(define-syntax-rule (with-clearing c body ...)
(send c with-clearing (λ () body ...)))
(provide connection% with-transaction with-resetting with-clearing)
(define (test (path "/tmp/test.sqlite"))
(define c (new connection% (path path)))
(send c clear)
(with-clearing
c
(send c once "CREATE TABLE IF NOT EXISTS foo (id INTEGER PRIMARY KEY, bar TEXT)"))
(send (send c prepare "INSERT INTO foo (bar) VALUES (?)") once "42")
(send (send c prepare "SELECT id,bar FROM foo") fold cons null))