diff --git a/thot-threaded.lisp b/thot-threaded.lisp
index 5666039..99bcaf3 100644
--- a/thot-threaded.lisp
+++ b/thot-threaded.lisp
@@ -1,54 +1,63 @@
(in-package :thot)
-(defvar *listen-fd*)
-
-(defun worker-thread ()
- ;(format t "~&WORKER THREAD~%")
- (loop
- (when *stop*
- (return))
- (cffi-socket:with-accept (clientfd) *listen-fd*
- (with-stream (stream (babel-io-stream (fd-io-stream clientfd)))
- (request-loop stream)))))
+(defvar *listen-fds*)
+
+(defun worker-thread (fd &optional dup)
+ (let ((listen-fd (if dup (unistd:dup fd) fd)))
+ (when dup
+ (pushnew listen-fd *listen-fds*))
+ (labels ((worker-thread-fd ()
+ ;;(format t "~&WORKER THREAD~%")
+ (unwind-protect
+ (loop
+ (when *stop*
+ (return))
+ (ignore-errors
+ (unistd:with-selected (`(,listen-fd) () () 1)
+ (readable writable errors)
+ (when readable
+ (socket:with-accept (clientfd) listen-fd
+ (with-stream (stream (babel-io-stream
+ (fd-io-stream clientfd)))
+ (request-loop stream)))))))
+ (when dup
+ (unistd:close listen-fd)))))
+ #'worker-thread-fd)))
(defparameter *init-threads* 8)
-(defvar *worker-threads*)
-(defvar *worker-sockfds*)
-
-(defun init-worker-threads (n)
- (loop
- (when (<= n (length *worker-threads*))
- (return))
- (let ((thread (bordeaux-threads:make-thread 'worker-thread
- :name "worker")))
- (push thread *worker-threads*))))
+(defun make-worker-threads (fd n)
+ (let ((threads ()))
+ (dotimes (i n)
+ (push (bordeaux-threads:make-thread
+ (worker-thread fd t)
+ :name "worker")
+ threads))
+ threads))
-(defun join-worker-threads ()
+(defun join-worker-threads (threads)
(setq *stop* t)
- (bordeaux-set:set-each (lambda (sockfd)
- (cffi-socket:shutdown sockfd t t))
- *worker-sockfds*)
- (loop
- (when (endp *worker-threads*)
- (return))
- (let ((thread (pop *worker-threads*)))
- (bordeaux-threads:join-thread thread))))
-
-(defmacro with-worker-threads (count &body body)
- `(let ((*worker-threads* ())
- (*worker-sockfds* (make-instance 'bordeaux-set:set)))
- (init-worker-threads ,count)
- (unwind-protect (progn ,@body)
- (join-worker-threads))))
+ (dolist (thread threads)
+ (bordeaux-threads:join-thread thread)))
+
+(defmacro with-worker-threads ((fd count) &body body)
+ (let ((threads (gensym "THREADS-")))
+ `(let* ((*listen-fds* ())
+ (,threads (make-worker-threads ,fd ,count)))
+ (unwind-protect (progn ,@body)
+ (join-worker-threads ,threads)))))
(defun acceptor-loop-threaded (fd)
(declare (type (unsigned-byte 31) fd))
- (setq *stop* nil
- *listen-fd* fd)
- (with-worker-threads (1- *init-threads*)
- (worker-thread)))
+ (set-nonblocking fd)
+ (with-worker-threads (fd (1- *init-threads*))
+ (funcall (worker-thread fd))))
(when bordeaux-threads:*supports-threads-p*
(setq *acceptor-loop* 'acceptor-loop-threaded))
+
+(untrace start acceptor-loop-threaded request-loop read write
+ set-nonblocking
+ socket:socket socket:bind socket:listen socket:accept
+ unistd:close unistd:select)