#lang racket/base (require (prefix-in sqlite: (planet jaymccarthy/sqlite:4)) (prefix-in log: (planet synx/log)) (planet synx/util/unwind-protect) (prefix-in finalize: (planet synx/util/finalize)) racket/class racket/contract racket/match) (define (remote-caller) (define channel (make-channel)) (thread (λ () (let loop () (with-handlers ((exn:fail? (λ (e) (log:error (exn-message e))))) (let ((result (thread-receive))) (match result ((list return func args ...) (call-with-values (λ () (apply func args)) return)) (else (log:error "Can't call remote ~s?" result))))) (loop))))) (define-syntax define/remote (syntax-rules () ((define/remote define/how remote (name arg args ... . rest) body ...) (begin (define (direct arg args ... . rest) body ...) (define return-channel (make-channel)) (define (return . ret-args) (channel-put return-channel ret-args)) (define/how (name arg args ... . rest) (if (eq? (current-thread) remote) (apply direct arg args ... rest) (begin (thread-send remote (list* return direct arg args ... rest)) (apply values (channel-get return-channel))))))) ((define/remote define/how remote (name) body ...) (begin (define direct (λ () body ...)) (define return-channel (make-channel)) (define (return . ret-args) (channel-put return-channel ret-args)) (define/how (name) (if (eq? (current-thread) remote) (direct) (begin (thread-send remote (list return direct)) (apply values (channel-get return-channel))))))) ((define/remote define/how remote (name . rest) body ...) (begin (define (direct . rest) body ...) (define return-channel (make-channel)) (define (return . ret-args) (channel-put return-channel ret-args)) (define/how (name . rest) (if (eq? (current-thread) remote) (apply direct rest) (begin (thread-send remote (list* return direct rest)) (apply values (channel-get return-channel))))))))) (define statement% (class object% (init-field get-context sql remote) (define statement #f) (define params #f) (define (get-statement) (or statement (begin (set! statement (sqlite:prepare (get-context) sql)) (when params (log:info "Loading params ~s ~s" sql params) (apply sqlite:load-params (get-statement) params)) statement))) (define (set-params! new-params) (set! params new-params) (when statement (log:info "Preloading params ~s" params) (apply sqlite:load-params statement params))) (super-new) (define/remote define/public remote (load . params) (set-params! params)) (define/remote define/public remote (finalize) (when statement (when (sqlite:open-statement? statement) (sqlite:finalize statement)) (set! statement #f))) (define (with-resetting params next) (set-params! params) (dynamic-wind do-reset next do-reset)) (define/remote define remote (do-reset) (when statement (sqlite:reset statement))) (define/public (reset) (do-reset)) (define/remote define/public remote (for-each proc . params) (with-resetting params (λ () (let loop () (log:info "foreach Stepping") (let ((row (sqlite:step (get-statement)))) (when row (apply proc (vector->list row))) (loop)))))) (define/remote define/public remote (fold proc init . params) (with-resetting params (λ () (let loop ((result init)) (let ((row (sqlite:step (get-statement)))) (if row (loop (proc row result)) result)))))) (define/remote define/public remote (once . params) (with-resetting params (λ () (sqlite:step (get-statement))))) (define/public (map proc) (begin0 (reverse (fold (λ (row result) (cons (apply proc (vector->list row)) result)) null)))))) (define (hash-for-each-value h proc) (let loop ((i (hash-iterate-first h))) (when i (proc (hash-iterate-value h i)) (loop (hash-iterate-next h i))))) (define connection% (class object% (init-field (path ':memory:)) (init (close-delay 5)) (super-new) (define context #f) (when (string? path) (set! path (string->path path))) (define (get-context) (or context (begin (set! context (sqlite:open path)) context))) (define remote (remote-caller)) (define statements (make-immutable-hash null)) (define/remote define/public remote (close) (when context (hash-for-each statements (λ (sql statement) (send statement finalize))) (sqlite:close context) (set! context #f))) (define/public (clear) (close) (set! statements (make-immutable-hash null))) (when close-delay (thread (λ () (let loop () (sleep close-delay) (let retry ((retries 0)) (when (with-handlers (((λ (e) (and (< retries 10) (exn:fail? e))) (λ (e) #t))) (close) #f) (log:info "Retrying close ~s" retries) (sleep 1) (retry (+ retries 1)))) (collect-garbage) (loop))))) (define/remote define/public remote (reset) (when context (hash-for-each-value statements (λ (statement) (send statement reset))))) (define/public (prepare sql) (define (new-statement) (let ((stmt (make-object statement% get-context sql remote))) (set! statements (hash-set statements sql stmt)) stmt)) (hash-ref statements sql new-statement)) (define transaction-level 0) (define transaction-lock (make-semaphore 1)) (define (adjust-transaction type) (semaphore-wait transaction-lock) (when (not (eq? type 'begin)) (set! transaction-level (- transaction-level 1))) (when (= transaction-level 0) (send (prepare (case type ((begin) "BEGIN") ((rollback) "ROLLBACK") (else "END"))) once)) (when (eq? type 'begin) (set! transaction-level (+ transaction-level 1))) (semaphore-post transaction-lock)) (define/public (with-transaction body) (adjust-transaction 'begin) (begin0 (call-with-exception-handler (λ (e) (log:info "rolling back boo ~s" (exn-message e)) (send this reset) (adjust-transaction 'rollback) (log:info "rolled") e) body) (adjust-transaction 'end))) (define (do-clear (myself #f)) (clear)) (finalize:register this do-clear) (define/public (with-clearing body) (rewind-protect body do-clear)) (define/public (errmsg) (sqlite:errmsg (get-context))) (define/public (changes-count) (sqlite:changes-count context)) (define/public (last-insert) (sqlite:last-insert-rowid context)) (define/public (map proc sql . params) (let ((stmt (prepare sql))) (send/apply stmt load params) (send stmt map proc))) (define/public (for-each proc sql . params) (let ((stmt (prepare sql))) (send/apply stmt load params) (send stmt for-each proc))) (define/public (fold proc init sql . params) (let ((stmt (prepare sql))) (send/apply stmt load params) (send stmt fold proc init))) (define/public (once sql . params) (let ((stmt (prepare sql))) (send/apply stmt once params))))) (define-syntax-rule (with-transaction c body ...) (send c with-transaction (λ () body ...))) (define-syntax-rule (with-resetting stmt body ...) (send stmt with-resetting null (λ () body ...))) (define-syntax-rule (with-clearing c body ...) (send c with-clearing (λ () body ...))) (provide connection% with-transaction with-resetting with-clearing) (define (test (path ':memory:)) (define c (new connection% (path path))) (send c clear) (with-clearing c (send c once "CREATE TABLE IF NOT EXISTS foo (id INTEGER PRIMARY KEY, bar TEXT)")) (send (send c prepare "INSERT INTO foo (bar) VALUES (?)") once "42") (send (send c prepare "SELECT id,bar FROM foo") fold cons null))