(module concurrent mzscheme
(require (planet "test.ss" ("schematics" "schemeunit.plt" 2 7))
(lib "list.ss")
(lib "etc.ss")
(lib "class.ss")
"config.ss"
"../spgsql.ss")
(provide concurrent-test)
(define (make-slow-output-port out pause? limit)
(make-output-port 'slow-port
out
(lambda (buf start end ok-buffer? ok-break?)
(when pause? (sleep 0.01))
(let ([end (min end (+ start limit))])
(write-bytes-avail buf out start end)))
(lambda () (close-output-port out))))
(define (((mk-worker c iterations) tid))
(define insert
(send c prepare-exec "insert into play_numbers (n) values ($1)"))
(define (add-to-max n)
(insert (+ n (send c query-value "select max(n) from play_numbers"))))
(for-each insert (build-list iterations add1))
(for-each add-to-max (build-list iterations add1))
(printf "~s: ~s\n"
tid
(send c query-value "select max(n) from play_numbers"))
(send c query "select * from pg_type"))
(define concurrent-test
(test-suite "Concurrency"
(test-case "lots of threads"
(call-with-connection
(lambda (c)
(send c exec "create temporary table play_numbers (n integer)")
(for-each thread-wait
(map thread
(map (mk-worker c 100) (build-list 20 add1)))))))
(test-case "threads with pausing ports"
(parameterize ((testing-connection-mixin
(lambda (%)
(class %
(define/override (attach-to-ports in out)
(super attach-to-ports
in
(make-slow-output-port out #t 256)))
(super-new)))))
(call-with-connection
(lambda (c)
(send c exec "create temporary table play_numbers (n integer)")
(for-each thread-wait
(map thread
(map (mk-worker c 5) (build-list 4 add1))))))))
(test-case "threads with small-chunk ports"
(parameterize ((testing-connection-mixin
(lambda (%)
(class %
(define/override (attach-to-ports in out)
(super attach-to-ports
in
(make-slow-output-port out #f 1)))
(super-new)))))
(call-with-connection
(lambda (c)
(send c exec "create temporary table play_numbers (n integer)")
(for-each thread-wait
(map thread
(map (mk-worker c 5) (build-list 4 add1)))))))))))