(module p3 mzscheme
(require "p3-msg.ss")
(provide protocol3:new
protocol3:new-exchange
protocol3:end-exchange
protocol3:close
protocol3:encode
protocol3:flush)
(provide stream:cons
stream:current
stream:next
stream:current+next
stream:done?
stream->list)
(define-struct protocol3 (inport outport stream ownerlock iolock))
(define (protocol3:new inport outport)
(make-protocol3 inport outport #f (make-semaphore 1) (make-semaphore 1)))
(define (protocol3:new-exchange protocol)
(semaphore-wait (protocol3-ownerlock protocol))
(stream:force-to-end (protocol3-stream protocol))
(let ([new-mg (stream:new protocol)])
(set-protocol3-stream! protocol new-mg)
new-mg))
(define (protocol3:end-exchange protocol)
(semaphore-post (protocol3-ownerlock protocol)))
(define (protocol3:close protocol)
(close-output-port (protocol3-outport protocol))
(close-input-port (protocol3-inport protocol)))
(define (protocol3:encode protocol message)
(write-message message (protocol3-outport protocol)))
(define (protocol3:flush protocol)
(flush-output (protocol3-outport protocol)))
(define (parse-message protocol)
(parse-server-message (protocol3-inport protocol)))
(define-struct stream (p3 promise done?))
(define (stream:current+next mg)
(let ([iolock (protocol3-iolock (stream-p3 mg))])
(dynamic-wind
(lambda () (semaphore-wait iolock))
(lambda () (stream:current+next/nosync mg))
(lambda () (semaphore-post iolock)))))
(define (stream:current+next/nosync mg)
(force (stream-promise mg)))
(define (stream:current mg)
(let-values [((current next) (stream:current+next mg))]
current))
(define (stream:next mg)
(let-values [((current next) (stream:current+next mg))]
next))
(define (stream:done? mg)
(or (not mg)
(stream-done? mg)))
(define (stream:new protocol)
(make-stream
protocol
(delay
(let [(next-message (parse-message protocol))]
(values next-message
(if (or (eof-object? next-message)
(end-of-exchange-message? next-message))
(make-stream protocol #f #t)
(stream:new protocol)))))
#f))
(define (stream:new/start-after protocol old-mg)
(if (stream:done? old-mg)
(stream:new protocol)
(make-stream protocol
(delay
(let-values ([(_msg next-old-mg)
(stream:current+next/nosync old-mg)])
(stream:current+next/nosync
(stream:new/start-after protocol next-old-mg))))
#f)))
(define (stream:cons r mg)
(make-stream
(stream-p3 mg)
(delay (values r mg))
#f))
(define (stream:force-to-end mg)
(unless (stream:done? mg)
(stream:force-to-end (stream:next mg))))
(define (stream->list mg)
(if (stream:done? mg)
null
(let-values ([(r mg) (stream:current+next mg)])
(cons r (stream->list mg)))))
(define (end-of-exchange-message? msg)
(ReadyForQuery? msg))
)