Commit 543bad90f236960d44c632092e0b22df64cfbfd1

Thomas de Grivel 2017-06-25T23:58:09

rework thot-threaded

diff --git a/thot-threaded.lisp b/thot-threaded.lisp
index 5666039..99bcaf3 100644
--- a/thot-threaded.lisp
+++ b/thot-threaded.lisp
@@ -1,54 +1,63 @@
 
 (in-package :thot)
 
-(defvar *listen-fd*)
-
-(defun worker-thread ()
-  ;(format t "~&WORKER THREAD~%")
-  (loop
-     (when *stop*
-       (return))
-     (cffi-socket:with-accept (clientfd) *listen-fd*
-       (with-stream (stream (babel-io-stream (fd-io-stream clientfd)))
-         (request-loop stream)))))
+(defvar *listen-fds*)
+
+(defun worker-thread (fd &optional dup)
+  (let ((listen-fd (if dup (unistd:dup fd) fd)))
+    (when dup
+      (pushnew listen-fd *listen-fds*))
+    (labels ((worker-thread-fd ()
+               ;;(format t "~&WORKER THREAD~%")
+               (unwind-protect
+                    (loop
+                       (when *stop*
+                         (return))
+                       (ignore-errors
+                         (unistd:with-selected (`(,listen-fd) () () 1)
+                             (readable writable errors)
+                           (when readable
+                             (socket:with-accept (clientfd) listen-fd
+                               (with-stream (stream (babel-io-stream
+                                                     (fd-io-stream clientfd)))
+                                 (request-loop stream)))))))
+                 (when dup
+                   (unistd:close listen-fd)))))
+      #'worker-thread-fd)))
 
 (defparameter *init-threads* 8)
 
-(defvar *worker-threads*)
-(defvar *worker-sockfds*)
-
-(defun init-worker-threads (n)
-  (loop
-     (when (<= n (length *worker-threads*))
-       (return))
-     (let ((thread (bordeaux-threads:make-thread 'worker-thread
-                                                 :name "worker")))
-       (push thread *worker-threads*))))
+(defun make-worker-threads (fd n)
+  (let ((threads ()))
+    (dotimes (i n)
+      (push (bordeaux-threads:make-thread
+             (worker-thread fd t)
+             :name "worker")
+            threads))
+    threads))
 
-(defun join-worker-threads ()
+(defun join-worker-threads (threads)
   (setq *stop* t)
-  (bordeaux-set:set-each (lambda (sockfd)
-                           (cffi-socket:shutdown sockfd t t))
-                         *worker-sockfds*)
-  (loop
-     (when (endp *worker-threads*)
-       (return))
-     (let ((thread (pop *worker-threads*)))
-       (bordeaux-threads:join-thread thread))))
-
-(defmacro with-worker-threads (count &body body)
-  `(let ((*worker-threads* ())
-         (*worker-sockfds* (make-instance 'bordeaux-set:set)))
-     (init-worker-threads ,count)
-     (unwind-protect (progn ,@body)
-       (join-worker-threads))))
+  (dolist (thread threads)
+    (bordeaux-threads:join-thread thread)))
+
+(defmacro with-worker-threads ((fd count) &body body)
+  (let ((threads (gensym "THREADS-")))
+    `(let* ((*listen-fds* ())
+            (,threads (make-worker-threads ,fd ,count)))
+       (unwind-protect (progn ,@body)
+         (join-worker-threads ,threads)))))
 
 (defun acceptor-loop-threaded (fd)
   (declare (type (unsigned-byte 31) fd))
-  (setq *stop* nil
-        *listen-fd* fd)
-  (with-worker-threads (1- *init-threads*)
-    (worker-thread)))
+  (set-nonblocking fd)
+  (with-worker-threads (fd (1- *init-threads*))
+    (funcall (worker-thread fd))))
 
 (when bordeaux-threads:*supports-threads-p*
   (setq *acceptor-loop* 'acceptor-loop-threaded))
+
+(untrace start acceptor-loop-threaded request-loop read write
+       set-nonblocking
+       socket:socket socket:bind socket:listen socket:accept
+       unistd:close unistd:select)