#lang scheme/base
(require scheme/contract
scheme/match
scheme/async-channel
scheme/list)
(define-struct target (dep-count elt) #:mutable)
(define-struct tqueue (satisfied-deps dep-to-targets ready worker-channel worker-thread) #:mutable)
(define (new-tqueue)
(let ([a-tqueue
(make-tqueue (make-hash) (make-hash) (make-async-channel) (make-channel) #f )])
(set-tqueue-worker-thread!
a-tqueue
(thread (lambda () (worker-thread-loop a-tqueue))))
a-tqueue))
(define-struct cmd ())
(define-struct (cmd:add! cmd) (elt deps done))
(define-struct (cmd:satisfy! cmd) (dep done))
(define (tqueue-add! a-tqueue an-elt deps)
(keep-worker-thread-alive! a-tqueue)
(let ([sema (make-semaphore)])
(channel-put (tqueue-worker-channel a-tqueue)
(make-cmd:add! an-elt deps sema))
(sync sema)
(void)))
(define (tqueue-satisfy! a-tqueue a-dep)
(keep-worker-thread-alive! a-tqueue)
(let ([sema (make-semaphore)])
(channel-put (tqueue-worker-channel a-tqueue)
(make-cmd:satisfy! a-dep sema))
(sync sema)
(void)))
(define (tqueue-get a-tqueue)
(keep-worker-thread-alive! a-tqueue)
(async-channel-get (tqueue-ready a-tqueue)))
(define (tqueue-try-get a-tqueue)
(keep-worker-thread-alive! a-tqueue)
(async-channel-try-get (tqueue-ready a-tqueue)))
(define (tqueue-ready-channel a-tqueue)
(keep-worker-thread-alive! a-tqueue)
(tqueue-ready a-tqueue))
(define (keep-worker-thread-alive! a-tqueue)
(thread-resume (tqueue-worker-thread a-tqueue) (current-thread)))
(define (worker-thread-loop a-tqueue)
(let ([cmd (channel-get (tqueue-worker-channel a-tqueue))])
(match cmd
[(struct cmd:add! (elt deps done))
(internal-add! a-tqueue elt deps)
(semaphore-post done)]
[(struct cmd:satisfy! (dep done))
(internal-satisfy! a-tqueue dep)
(semaphore-post done)]))
(worker-thread-loop a-tqueue))
(define (internal-add! a-tqueue an-elt deps)
(let ([a-target (make-target (length deps) an-elt)])
(cond
[(empty? deps)
(when (target-can-execute? a-target)
(add-to-ready! a-tqueue a-target))]
[else
(for-each (lambda (a-dep)
(cond
[(dependency-satisifed? a-tqueue a-dep)
(target-decrement-dep-count!/maybe-add-to-ready a-tqueue a-target)]
[else
(register-dependency! a-tqueue a-target a-dep)]))
deps)])))
(define (internal-satisfy! a-tqueue a-dep)
(for-each (lambda (a-target)
(target-decrement-dep-count!/maybe-add-to-ready a-tqueue a-target))
(hash-ref (tqueue-dep-to-targets a-tqueue) a-dep '()))
(hash-remove! (tqueue-dep-to-targets a-tqueue) a-dep)
(hash-set! (tqueue-satisfied-deps a-tqueue) a-dep #t))
(define (dependency-satisifed? a-tqueue a-dep)
(hash-ref (tqueue-satisfied-deps a-tqueue) a-dep #f))
(define (target-decrement-dep-count!/maybe-add-to-ready a-tqueue a-target)
(unless (> (target-dep-count a-target) 0)
(error 'target-decrement-dep-count!
"Impossible to decrement past zero."))
(set-target-dep-count! a-target (sub1 (target-dep-count a-target)))
(when (target-can-execute? a-target)
(add-to-ready! a-tqueue a-target)))
(define (add-to-ready! a-tqueue a-target)
(async-channel-put (tqueue-ready a-tqueue) (target-elt a-target)))
(define (target-can-execute? a-target)
(= (target-dep-count a-target) 0))
(define (register-dependency! a-tqueue a-target a-dep)
(let ([ht (tqueue-dep-to-targets a-tqueue)])
(hash-set! ht a-dep (cons a-target (hash-ref ht a-dep '())))))
(define elt/c any/c)
(define dep/c any/c)
(provide/contract [new-tqueue (-> tqueue?)]
[tqueue? (any/c . -> . boolean?)]
[tqueue-add! (tqueue? elt/c (listof dep/c) . -> . any)]
[tqueue-satisfy! (tqueue? dep/c . -> . any)]
[tqueue-get (tqueue? . -> . elt/c)]
[tqueue-try-get (tqueue? . -> . (or/c elt/c false/c))]
[tqueue-ready-channel (tqueue? . -> . async-channel?)])