Commit d6c465c3a48dae7976bc77c24da02a989297450a

Thomas de Grivel 2017-06-28T02:36:46

WIP epoll

diff --git a/thot-epoll.lisp b/thot-epoll.lisp
index 91e312f..28caa6d 100644
--- a/thot-epoll.lisp
+++ b/thot-epoll.lisp
@@ -75,15 +75,35 @@
   (let ((reader-cont (worker-reader-cont worker)))
     (when reader-cont
       (let ((result (funcall reader-cont)))
-        (cond ((eq :eof result) (epoll-del worker))
+        (cond ((eq :eof result) (epoll-del epoll-fd worker))
               ((eq nil result) (setf (worker-reader-cont worker) nil))
-              ((eq :keep-alive result) (setf (worker-keep-alive worker) t))
+              ((eq :keep-alive result) (setf (worker-keep-alive worker) t
+                                             (worker-reader-cont worker) nil)
+               :keep-alive)
               ((functionp result) (setf (worker-reader-cont worker) result))
               (t (error "worker input error ~S" worker)))))))
 
 (defmethod agent-out ((worker worker) (epoll-fd fixnum))
-  (let ((reply (worker-reply worker)))
-    ))
+  (let* ((request (worker-request worker))
+         (reply (worker-reply worker))
+         (babel-stream (reply-stream reply))
+         (stream (stream-underlying-stream babel-stream)))
+    (cond ((= 0 (stream-output-length stream))
+           (cond ((worker-keep-alive worker)
+                  ;; read request body
+                  (setf (worker-reader-cont worker)
+                        (request-reader (reset-request request)
+                                        (reset-reply reply)
+                                        #'request-cont))
+                  (agent-in worker epoll-fd))
+                 (t
+                  (epoll-del epoll-fd worker))))
+          (t
+           (case (stream-flush-output-buffer stream)
+             ((nil) nil)
+             ((:eof) (epoll-del epoll-fd worker))
+             ((:non-blocking) :non-blocking)
+             (otherwise (error 'stream-output-error :stream stream)))))))
 
 ;;  Acceptor agent
 
@@ -99,39 +119,65 @@
 (defmethod agent-error ((acceptor acceptor) (epoll-fd fixnum))
   (error 'acceptor-error :agent acceptor))
 
+(defun make-worker (fd addr)
+  (let* ((request-stream (babel-input-stream (fd-input-stream fd)))
+         (reply-stream (babel-output-stream
+                        (multi-buffered-output-stream
+                         (fd-output-stream fd))))
+         (request (make-instance 'request :stream request-stream))
+         (reply (make-instance 'reply :stream reply-stream))
+         (reader-cont (request-reader request reply #'request-cont)))
+    (make-instance 'worker
+                   :addr addr
+                   :fd fd
+                   :reader-cont reader-cont
+                   :request request
+                   :reply reply)))
+
 (defmethod agent-in ((acceptor acceptor) (epoll-fd fixnum))
   (multiple-value-bind (fd addr) (socket:accept (agent-fd acceptor))
     (unless (eq :non-blocking fd)
-      (let* ((stream (babel-io-stream (fd-io-stream fd)))
-             (request (make-instance 'request :stream stream))
-             (reply (make-instance 'reply))
-             (worker (make-instance 'worker
-                                    :addr addr
-                                    :fd fd
-                                    :reader-cont (request-reader
-                                                  request reply #'request-cont)
-                                    :request request
-                                    :reply reply
-                                    :stream stream)))
-        (epoll-add epoll-fd worker)))))
+      (epoll-add epoll-fd (make-worker fd addr)))))
+
+(defclass control (agent)
+  ())
+
+(defmethod agent-epoll-events ((agent control))
+  epoll:+in+)
+
+(defmethod agent-in ((agent control) (epoll-fd fixnum))
+  (setq *stop* t))
 
 ;;  Thread event loop
 
-(defun event-loop-epoll (acceptfd)
-  (epoll:with (epoll-fd)
-    (let ((acceptor (make-instance 'acceptor :fd acceptfd)))
-      (epoll-add epoll-fd acceptor))
-    (epoll:wait (events fd epoll-fd)
-      (let ((agent (get-agent fd)))
-        (cond ((not (= 0 (logand epoll:+err+ events)))
-               (agent-error agent epoll-fd))
-              ((not (= 0 (logand epoll:+in+ events)))
-               (agent-in agent epoll-fd))
-              ((not (= 0 (logand epoll:+out+ events)))
-               (agent-out agent epoll-fd)))))))
-
-;;
-
-(defun acceptor-loop-epoll (fd)
-  (declare (type (unsigned-byte 31) fd))
-  )
+(defun acceptor-loop-epoll (listenfd &optional pipe)
+  (declare (type (unsigned-byte 31) listenfd))
+  (labels ((acceptor-loop-epoll-fun ()
+             (epoll:with (epoll-fd)
+               (epoll-add epoll-fd (make-instance 'acceptor :fd listenfd))
+               (when pipe
+                 (epoll-add epoll-fd (make-instance 'control :fd pipe)))
+               (loop
+                  (when *stop*
+                    (return))
+                  (epoll:wait (events fd epoll-fd)
+                    (let ((agent (get-agent fd)))
+                      (unless agent (error "bad epoll fd ~S" fd))
+                      (cond ((not (= 0 (logand epoll:+err+ events)))
+                             (agent-error agent epoll-fd))
+                            ((not (= 0 (logand epoll:+in+ events)))
+                             (agent-in agent epoll-fd))
+                            ((not (= 0 (logand epoll:+out+ events)))
+                             (agent-out agent epoll-fd)))))))))
+    #'acceptor-loop-epoll-fun))
+
+(when (cffi:foreign-symbol-pointer "epoll_create")
+  (cond ((eq *acceptor-loop* 'acceptor-loop-simple)
+         (setq *acceptor-loop* 'acceptor-loop-epoll))
+        ((eq *acceptor-loop* 'acceptor-loop-threaded)
+         (setq *worker-thread-for-fd* 'acceptor-loop-epoll))))
+
+(trace socket:socket socket:bind socket:listen socket:accept
+       unistd:close
+       epoll-add epoll-del
+       acceptor-loop-epoll make-worker agent-in agent-out)
diff --git a/thot.asd b/thot.asd
index 74cfc99..cca886c 100644
--- a/thot.asd
+++ b/thot.asd
@@ -21,4 +21,5 @@
   ((:file "package")
    (:file "thot" :depends-on ("package"))
    (:file "thot-simple" :depends-on ("thot"))
-   (:file "thot-threaded" :depends-on ("thot-simple"))))
+   (:file "thot-threaded" :depends-on ("thot-simple"))
+   (:file "thot-epoll" :depends-on ("thot-threaded"))))
diff --git a/thot.lisp b/thot.lisp
index 119baf2..6f8c03f 100644
--- a/thot.lisp
+++ b/thot.lisp
@@ -32,6 +32,16 @@
           :accessor request-query%
           :type string)))
 
+(defun reset-request (&optional (request *request*))
+  (declare (type request request))
+  (setf (request-method% request) nil
+        (request-target% request) nil
+        (request-http-version% request) nil
+        (request-uri% request) nil
+        (request-query% request) nil)
+  (clrhash (request-headers% request))
+  request)
+
 (defvar *request*)
 
 (defun request-stream (&optional (request *request*))
@@ -196,6 +206,13 @@
            :accessor reply-stream%
            :type buffered-output-stream)))
 
+(defun reset-reply (&optional (reply *reply*))
+  (declare (type reply reply))
+  (setf (reply-status% reply) nil
+        (reply-headers% reply) nil
+        (reply-headers-sent% reply) nil)
+  reply)
+
 (defvar *reply*)
 
 (defun reply-status (&optional (reply *reply*))