diff --git a/thot-threaded.lisp b/thot-threaded.lisp
index a13b90b..8ac5880 100644
--- a/thot-threaded.lisp
+++ b/thot-threaded.lisp
@@ -5,20 +5,23 @@
(defparameter *init-threads* 8)
-(defun make-worker-threads (fd n)
+(defun make-worker-threads (fd n pipe-in)
(let ((threads ())
(listen-fds ()))
(dotimes (i n)
(let ((thread-fd (unistd:dup fd)))
(push thread-fd listen-fds)
(push (bordeaux-threads:make-thread
- (funcall *worker-thread-for-fd* thread-fd)
+ (funcall *worker-thread-for-fd* thread-fd pipe-in)
:name "worker")
threads)))
(values threads listen-fds)))
-(defun join-worker-threads (threads listen-fds)
+(defun join-worker-threads (threads listen-fds pipe-out)
(setq *stop* t)
+ (cffi:with-foreign-object (out :char)
+ (setf (cffi:mem-aref out :char) 0)
+ (unistd:write pipe-out out 1))
(dolist (fd listen-fds)
(unistd:close fd))
(dolist (thread threads)
@@ -26,11 +29,14 @@
(defmacro with-worker-threads ((fd count) &body body)
(let ((threads (gensym "THREADS-"))
- (listen-fds (gensym "LISTEN-FDS-")))
- `(multiple-value-bind (,threads ,listen-fds)
- (make-worker-threads ,fd ,count)
- (unwind-protect (progn ,@body)
- (join-worker-threads ,threads ,listen-fds)))))
+ (listen-fds (gensym "LISTEN-FDS-"))
+ (pipe-in (gensym "PIPE-IN-"))
+ (pipe-out (gensym "PIPE-OUT-")))
+ `(unistd:with-pipe (,pipe-in ,pipe-out)
+ (multiple-value-bind (,threads ,listen-fds)
+ (make-worker-threads ,fd ,count ,pipe-in)
+ (unwind-protect (progn ,@body)
+ (join-worker-threads ,threads ,listen-fds ,pipe-out))))))
(defun acceptor-loop-threaded (fd)
(declare (type (unsigned-byte 31) fd))