#lang scheme/base
(require "depend.ss" "port.ss")
(define (make-pipe-helper port)
(make-pipe (expt 2 15) (object-name port) (object-name port)))
(define (make-threaded-read in out ch)
(define read-helper (make-default-read in))
(lambda (bytes)
(let ((v (sync in ch)))
(if (eq? v in)
(read-helper bytes)
(raise v)))))
(define (make-threaded-peek in out ch)
(define peek-helper (make-default-peek in))
(lambda (bytes skip progress-evt)
(let ((v (sync in ch)))
(if (eq? v in)
(peek-helper bytes skip progress-evt)
(raise v)))))
(define (make-input-filter-port/1 input-port filter make-pipe close?)
(define (filter-helper)
(if (not filter) copy-port filter))
(define (pipe-helper)
(if (not make-pipe) make-pipe-helper make-pipe))
(let-values (((in out)
((pipe-helper) input-port)))
(let* ((ch (make-channel))
(thd (thread
(lambda ()
(with-handlers ((exn? (lambda (e)
(close-output-port out)
(channel-put ch e))))
(cond ((not close?)
((filter-helper) input-port out)
(close-output-port out))
(else
(call-with-input-port input-port
(lambda (in)
((filter-helper) in out)
(close-output-port out))))))))))
(make-input-port (object-name in)
(make-threaded-read in out ch)
(make-threaded-peek in out ch)
(lambda ()
(kill-thread thd)
(close-output-port out)
(unless (not close?)
(close-input-port input-port))
(close-input-port in))))))
(define (make-threaded-write out in ch)
(define write-helper (make-default-write out))
(lambda (bytes-out start end block? break?)
(let ((v (sync out ch)))
(if (eq? v out)
(write-helper bytes-out start end block? break?)
(begin
(close-input-port in)
(raise v))))))
(define (make-output-filter-port/1 output-port filter make-pipe)
(define (filter-helper)
(if (not filter) copy-port filter))
(define (pipe-helper)
(if (not make-pipe) make-pipe-helper make-pipe))
(let-values (((in out)
((pipe-helper) output-port)))
(let* ((ch (make-channel))
(thd (thread (lambda ()
(with-handlers ((exn? (lambda (e)
(close-input-port in)
(channel-put ch e))))
(call-with-output-port output-port
(lambda (out)
((filter-helper) in out)
(close-input-port in)))))))
(thd-dead? (thread-dead-evt thd)))
(make-output-port (object-name in)
always-evt
(make-threaded-write out in ch)
(lambda ()
(begin0 (close-output-port out)
(sync thd-dead?)))))))
(define (make-input-filter-port in filter make-pipe #:close? (close? #t). rest)
(define (helper in filter make-pipe rest)
(cond ((null? rest) (make-input-filter-port/1 in filter make-pipe close?))
((null? (cdr rest))
(error 'make-input-filter-port "uneven filter/pipe pairs: ~a" rest))
(else
(make-input-filter-port/1 (helper in (car rest) (cadr rest) (cddr rest))
filter make-pipe
close?))))
(helper in filter make-pipe rest))
(define (make-output-filter-port out filter make-pipe . rest)
(define (helper out filter make-pipe rest)
(cond ((null? rest) (make-output-filter-port/1 out filter make-pipe))
((null? (cdr rest))
(error 'make-output-filter-port "uneven filter/pipe pairs: ~a" rest))
(else
(make-output-filter-port/1 (helper out (car rest) (cadr rest) (cddr rest))
filter make-pipe))))
(helper out filter make-pipe rest))
(define make-pipe/c
(-> port? (values input-port? output-port?)))
(define port-filter/c
(-> input-port? output-port? any))
(provide/contract
(make-input-filter-port (->* (input-port? (or/c #f port-filter/c) (or/c #f make-pipe/c))
(#:close? boolean?)
#:rest (listof (or/c #f procedure?))
input-port?))
(make-output-filter-port (->* (output-port? (or/c #f port-filter/c) (or/c #f make-pipe/c))
()
#:rest (listof (or/c #f procedure?))
output-port?))
)
(provide make-pipe/c port-filter/c)