(module buffer mzscheme
(require (lib "plt-match.ss")
"require.ss"
"action.ss"
"action-util.ss"
"pool.ss")
(require-contracts)
(require-mz:class)
(require-list)
(define-struct buffer (pool thread action-channel request-channel))
(define-struct subscription (buffer count return-channel))
(define buffer/c (flat-named-contract "Sequence Trace Buffer" buffer?))
(define subscription/c (flat-named-contract "Buffer Subscription" subscription?))
(define (empty-buffer)
(define action-channel (make-channel))
(define request-channel (make-channel))
(define pool (make-pool))
(define actions (list))
(define count 0)
(define (manage-buffer)
(sync (wrap-evt action-channel handle-action)
(wrap-evt request-channel handle-request))
(manage-buffer))
(define (handle-action spec)
(let* ([prev (if (null? actions) #f (car actions))]
[action (spec->action pool prev spec)])
(set! count (+ count 1))
(set! actions (cons action actions))))
(define (handle-request subscription)
(let* ([sub-count (subscription-count subscription)]
[sub-channel (subscription-return-channel subscription)]
[current-actions actions]
[current-count count])
(set-subscription-count! subscription current-count)
(channel-put
sub-channel
(lambda ()
(reverse
(srfi1:take current-actions (- current-count sub-count)))))))
(make-buffer pool (thread manage-buffer) action-channel request-channel))
(define (buffer-add! buffer spec)
(thread-resume (buffer-thread buffer) (current-thread))
(channel-put (buffer-action-channel buffer) spec))
(define (buffer-subscribe buffer)
(make-subscription buffer 0 (make-channel)))
(define (buffer-update subscription)
(let* ([buffer (subscription-buffer subscription)])
(thread-resume (buffer-thread buffer) (current-thread))
(channel-put (buffer-request-channel buffer) subscription)
(let* ([thunk (channel-get (subscription-return-channel subscription))])
(thunk))))
(provide/contract
[buffer? (any/c . -> . boolean?)]
[rename empty-buffer make-buffer (-> buffer/c)]
[buffer-pool (buffer/c . -> . pool?)]
[buffer-add! (buffer/c spec/c . -> . void?)]
[buffer-subscribe (buffer/c . -> . subscription/c)]
[buffer-update (subscription/c . -> . (listof action?))]))