Commit b929b26b747faceb0e96d3ae4b6ab695b3bc3cef

Thomas de Grivel 2017-04-07T15:08:31

Threaded server

diff --git a/package.lisp b/package.lisp
index 92a48f1..b064139 100644
--- a/package.lisp
+++ b/package.lisp
@@ -18,7 +18,6 @@
    :bordeaux-threads
    :cffi
    :cffi-errno
-   :cffi-posix
    :cl-debug
    :common-lisp)
   (:export
diff --git a/thot.lisp b/thot.lisp
index b87fa92..126b1f6 100644
--- a/thot.lisp
+++ b/thot.lisp
@@ -253,30 +253,24 @@ The requested url ~S was not found on this server."
 (defun acceptor-loop (fd)
   (declare (type (unsigned-byte 31) fd))
   (loop
-     (let ((clientfd (cffi-sockets:accept fd)))
-       (unless clientfd
-	 (return))
-       (let ((socket (cffi-sockets-flexi:make-socket-stream clientfd)))
-	 (unwind-protect (request-loop socket)
-	   (close socket))))))
+     (cffi-sockets:with-accept (clientfd fd)
+       (cffi-sockets-flexi:with-socket-stream (socket clientfd)
+	 (request-loop socket)))))
 
 (defun start (&key (address "0.0.0.0") (port 8000))
-  (let ((fd (cffi-sockets:socket cffi-sockets:+af-inet+
-				 cffi-sockets:+sock-stream+
-				 0)))
-    (unwind-protect
-	 (progn
-	   (cffi-sockets:bind-inet fd address port)
-	   (cffi-sockets:listen-sock fd 128)
-	   (acceptor-loop fd))
-      (cffi-sockets:close-sock fd))))
-
-(defvar *pipe-in*)
-(defvar *pipe-out*)
+  (cffi-sockets:with-socket (fd cffi-sockets:+af-inet+
+				cffi-sockets:+sock-stream+
+				0)
+    (cffi-sockets:bind-inet fd address port)
+    (cffi-sockets:listen-sock fd 128)
+    (acceptor-loop fd)))
+
+(defvar *pipe-in* -1)
+(defvar *pipe-out* -1)
 
 (defun read-fd ()
   (with-foreign-object (fd :int)
-    (let ((r (c-read *pipe-in* fd (foreign-type-size :int))))
+    (let ((r (cffi-posix:c-read *pipe-in* fd (foreign-type-size :int))))
       (when (< r 0)
 	(error-errno "read"))
       (when (< 0 r)
@@ -285,32 +279,32 @@ The requested url ~S was not found on this server."
 (defun write-fd (fd)
   (with-foreign-object (fd% :int)
     (setf (mem-aref fd% :int) fd)
-    (let ((w (c-write *pipe-out* fd% (foreign-type-size :int))))
+    (let ((w (cffi-posix:c-write *pipe-out* fd% (foreign-type-size :int))))
       (when (< w 0)
 	(error-errno "write"))
       (when (< 0 w)
 	fd))))
 
-(trace read-fd write-fd)
-
 (defun worker-thread ()
+  ;(format t "~&WORKER THREAD~%")
   (loop
      (let ((sockfd (read-fd)))
        (unless sockfd
 	 (return))
-       (let ((socket (cffi-sockets-flexi:make-socket-stream sockfd)))
-	 (unwind-protect (request-loop socket)
-	   (close socket))))))
+       (cffi-sockets-flexi:with-socket-stream (socket sockfd)
+	 (request-loop socket)))))
 
-(defparameter *init-threads* 2)
+(defparameter *init-threads* 8)
 
 (defvar *threads*)
 
 (defun init-threads (n)
   (loop
-     (when (<= (length *threads*) n)
+     (when (<= n (length *threads*))
        (return))
-     (push (bordeaux-threads:make-thread #'worker-thread) *threads*)))
+     (let* ((*thread-id* (decf n))
+	    (thread (bordeaux-threads:make-thread 'worker-thread)))
+       (push thread *threads*))))
 
 (defun join-threads ()
   (loop
@@ -318,27 +312,30 @@ The requested url ~S was not found on this server."
        (return))
      (let ((thread (pop *threads*)))
        (bordeaux-threads:join-thread thread))))
-  
+
+(defmacro with-threads (n &body body)
+  `(let ((*threads* ()))
+     (init-threads ,n)
+     (unwind-protect (progn ,@body)
+       (join-threads))))
+
 (defun acceptor-loop-threaded (fd)
   (declare (type (unsigned-byte 31) fd))
   (loop
-     (let ((clientfd (ignore-errors (cffi-sockets:accept fd))))
-       (unless clientfd
-	 (return))
-       (format t "~&accept ~D~%" clientfd)
+     (let ((clientfd (cffi-sockets:accept fd)))
        (write-fd clientfd))))
 
 (defun start-threaded (&key (address "0.0.0.0") (port 8000))
-  (let ((fd (cffi-sockets:socket cffi-sockets:+af-inet+
-				 cffi-sockets:+sock-stream+
-				 0)))
-    (unwind-protect
-	 (progn
-	   (cffi-sockets:bind-inet fd address port)
-	   (cffi-sockets:listen-sock fd 128)
-	   (multiple-value-bind (*pipe-in* *pipe-out*) (pipe)
-	     (let ((*threads*))
-	       (init-threads *init-threads*)
-	       (unwind-protect (acceptor-loop-threaded fd)
-		 (join-threads)))))
-      (cffi-sockets:close-sock fd))))
+  (cffi-sockets:with-socket (fd cffi-sockets:+af-inet+
+				cffi-sockets:+sock-stream+
+				0)
+    (cffi-sockets:bind-inet fd address port)
+    (cffi-sockets:listen-sock fd 128)
+    (cffi-posix:with-pipe (in out)
+      (setq *pipe-in* in *pipe-out* out)
+      (with-threads *init-threads*
+	(acceptor-loop-threaded fd)))))
+
+(untrace read-fd write-fd worker-thread init-threads join-threads acceptor-loop-threaded)
+(untrace cffi-sockets:socket cffi-sockets:accept)
+(untrace cffi-posix:pipe cffi-posix:close)