Commit 1a79c9bba21fc0e16bd1d9014a9c6a3ada974f6c

Thomas de Grivel 2018-06-20T15:35:31

epoll

diff --git a/thot-epoll.lisp b/thot-epoll.lisp
index 1795100..15e4a5e 100644
--- a/thot-epoll.lisp
+++ b/thot-epoll.lisp
@@ -20,7 +20,10 @@
 (defclass agent ()
   ((fd :initarg :fd
        :reader agent-fd
-       :type (unsigned-byte 31))))
+       :type (unsigned-byte 31))
+   (pending :initform nil
+            :accessor agent-pending
+            :type boolean)))
 
 (defgeneric agent-epoll-events (agent))
 (defgeneric agent-error (epoll agent))
@@ -50,8 +53,14 @@
                (agent-epoll-events agent)
                :data-fd fd)))
 
+(defun epoll-mod (epoll agent events)
+  (declare (type epoll-infos epoll))
+  (let ((fd (agent-fd agent)))
+    (epoll:mod (epoll-fd epoll) fd events :data-fd fd)))
+
 (defun epoll-del (epoll agent)
   (declare (type epoll-infos epoll))
+  (setf (agent-pending agent) t)
   (let ((fd (agent-fd agent)))
     (epoll:del (epoll-fd epoll) fd)
     (socket:shutdown fd t t)
@@ -87,39 +96,33 @@
 
 (defmethod agent-in ((epoll epoll-infos) (worker worker))
   (let ((reader-cont (worker-reader-cont worker)))
-    (when reader-cont
-      (let ((result (handler-case (funcall reader-cont)
-                      (warning (x) (format t "~A~%" x) :eof))))
-        (cond ((eq :eof result) (epoll-del epoll worker))
-              ((eq nil result)
-               (setf (worker-reader-cont worker) nil)
-               (stream-flush-output (reply-stream (worker-reply worker))))
-              ((eq :keep-alive result) (setf (worker-keep-alive worker) t
-                                             (worker-reader-cont worker) nil)
-               :keep-alive)
-              ((functionp result) (setf (worker-reader-cont worker) result))
-              (t (error "worker input error ~S" worker)))))))
+    (if reader-cont
+        (let ((result (funcall reader-cont)))
+          (cond ((null result) (setf (worker-reader-cont worker) nil))
+                ((eq :keep-alive result)
+                 (let ((request (worker-request worker))
+                       (reply (worker-reply worker)))
+                   (setf (worker-reader-cont worker)
+                         (request-reader (reset-request request)
+                                         (reset-reply reply)))))
+                ((functionp result) (setf (worker-reader-cont worker)
+                                          result))
+                ((eq :eof result) (epoll-del epoll worker))
+                (t (error "worker input error ~S" worker))))
+        (when (= 0 (stream-output-length
+                    (reply-stream (worker-reply worker))))
+          (epoll-del epoll worker)))))
 
 (defmethod agent-out ((epoll epoll-infos) (worker worker))
   (let* ((request (worker-request worker))
          (reply (worker-reply worker))
          (babel-stream (reply-stream reply))
          (stream (stream-underlying-stream babel-stream)))
-    (cond ((and (null (worker-reader-cont worker))
-                (= 0 (the integer (stream-output-length stream))))
-           (cond ((worker-keep-alive worker)
-                  ;; read request body
-                  (setf (worker-reader-cont worker)
-                        (request-reader (reset-request request)
-                                        (reset-reply reply))))
-                 (t
-                  (epoll-del epoll worker))))
-          (t
-           (case (stream-flush-output stream)
-             ((nil) nil)
-             ((:eof) (epoll-del epoll worker))
-             ((:non-blocking) :non-blocking)
-             (otherwise (error 'stream-output-error :stream stream)))))))
+    (case (stream-flush-output stream)
+      ((:eof) (epoll-del epoll worker)))
+    (unless (worker-reader-cont worker)
+      (when (= 0 (stream-output-length stream))
+        (epoll-del epoll worker)))))
 
 ;;  Acceptor agent
 
@@ -153,7 +156,8 @@
 (defmethod agent-in ((epoll epoll-infos) (acceptor acceptor))
   (multiple-value-bind (fd addr) (socket:accept (agent-fd acceptor))
     (unless (eq :non-blocking fd)
-      (epoll-add epoll (make-worker fd addr)))))
+      (let ((worker (make-worker fd addr)))
+        (epoll-add epoll worker)))))
 
 (defclass control (agent)
   ())
@@ -176,15 +180,17 @@
       (loop
          (when *stop*
            (return))
-         (epoll:wait (events fd epoll-fd)
+         (epoll:wait (events fd epoll-fd 10000 -1)
                      (let ((agent (get-agent epoll fd)))
                        (unless agent (error "bad epoll fd ~S" fd))
-                       (cond ((not (= 0 (logand epoll:+err+ events)))
-                              (agent-error epoll agent))
-                             ((not (= 0 (logand epoll:+in+ events)))
-                              (agent-in epoll agent))
-                             ((not (= 0 (logand epoll:+out+ events)))
-                              (agent-out epoll agent)))))))))
+                       (unless (= 0 (logand epoll:+err+ events))
+                         (agent-error epoll agent))
+                       (unless (or (agent-pending agent)
+                                   (= 0 (logand epoll:+in+ events)))
+                         (agent-in epoll agent))
+                       (unless (or (agent-pending agent)
+                                   (= 0 (logand epoll:+out+ events)))
+                         (agent-out epoll agent))))))))
 
 (defun maybe-configure-epoll ()
   (when (cffi:foreign-symbol-pointer "epoll_create")
@@ -193,4 +199,10 @@
 (eval-when (:load-toplevel :execute)
   (maybe-configure-epoll))
 
-;;(untrace socket:socket socket:bind socket:listen socket:accept unistd:close epoll:create epoll-add epoll-del acceptor-loop-epoll make-worker agent-in agent-out)
+#+nil
+(trace
+ epoll:create epoll-add epoll-del
+ acceptor-loop-epoll make-worker agent-in agent-out agent-error
+ stream-flush-output unistd:c-write
+ stream-output-index
+ )