Commit 96a40bd0007e22e512edbafbf851e7457fc30421

Thomas de Grivel 2017-06-25T23:53:12

WIP epoll

diff --git a/thot-epoll.lisp b/thot-epoll.lisp
index 4deee4b..00e6b3d 100644
--- a/thot-epoll.lisp
+++ b/thot-epoll.lisp
@@ -9,13 +9,16 @@
        :type (unsigned-byte 31))))
 
 (defgeneric agent-epoll-events (agent))
-(defgeneric agent-error (agent))
-(defgeneric agent-in (agent))
-(defgeneric agent-out (agent))
+(defgeneric agent-error (agent epoll-fd))
+(defgeneric agent-in (agent epoll-fd))
+(defgeneric agent-out (agent epoll-fd))
 
-;;  Adding an agent
+(define-condition agent-error (error)
+  ((agent :initarg :agent
+          :reader agent-error-agent
+          :type agent)))
 
-(defvar *epoll-fd*)
+;;  Adding an agent
 
 (defvar *epoll-agents*
   (make-hash-table))
@@ -23,80 +26,115 @@
 (defmacro get-agent (fd)
   `(gethash ,fd *epoll-agents*))
 
-(defun set-nonblocking (fd)
-  (let ((flags (fcntl:getfl fd)))
-    (fcntl:setfl fd (logior +o-nonblock+ flags))))
+(defun remove-agent (fd)
+  (remhash fd *epoll-agents*))
 
-(defun epoll-add (agent)
+(defun epoll-add (epoll-fd agent)
   (let ((fd (agent-fd agent)))
     (set-nonblocking fd)
     (setf (get-agent fd) agent)
-    (epoll:add *epoll-fd* fd
+    (epoll:add epoll-fd fd
                (agent-epoll-events agent)
                :data-fd fd)))
 
+(defun epoll-del (epoll-fd agent)
+  (let ((fd (agent-fd agent)))
+    (epoll:del epoll-fd fd)
+    (socket:shutdown fd t t)
+    (unistd:close fd)
+    (remove-agent fd)))
+
 ;;  Worker agent
 
 (defclass worker (agent)
-  ((stream :reader worker-stream
-           :type stream)
-   (addr :initarg :addr
-            :reader worker-addr)
-   (request :initform nil
+  ((addr :initarg :addr
+         :reader worker-addr)
+   (keep-alive :initform nil
+               :accessor worker-keep-alive
+               :type boolean)
+   (reader-cont :initarg :reader-cont
+                :accessor worker-reader-cont
+                :type (or null function))
+   (reply :initarg :reply
+          :accessor worker-reply
+          :type 'reply)
+   (request :initarg :request
             :accessor worker-request
-            :type (or null request))
-   (reply :initform nil
-          :accessor worker-reply)))
+            :type 'request)
+   (stream :initarg :stream
+           :reader worker-stream
+           :type stream)))
 
-(defmethod agent-epoll-events ((agent worker))
+(defmethod agent-epoll-events ((worker worker))
   (logior epoll:+in+ epoll:+out+ epoll:+err+))
 
-(defmethod agent-error ((agent worker))
-  (error "worker"))
+(define-condition worker-error (agent-error)
+  ())
 
-(defmethod agent-in ((agent worker))
-  )
+(defmethod agent-error ((worker worker) (epoll-fd fixnum))
+  (error 'worker-error :agent worker))
 
-(defmethod agent-out ((agent worker))
-  )
+(defmethod agent-in ((worker worker) (epoll-fd fixnum))
+  (let ((reader-cont (worker-reader-cont worker)))
+    (when reader-cont
+      (let ((result (funcall reader-cont)))
+        (cond ((eq :eof result) (epoll-del worker))
+              ((eq nil result) (setf (worker-reader-cont worker) nil))
+              ((eq :keep-alive result) (setf (worker-keep-alive worker) t))
+              ((functionp result) (setf (worker-reader-cont worker) result))
+              (t (error "worker input error ~S" worker)))))))
+
+(defmethod agent-out ((worker worker) (epoll-fd fixnum))
+  (let ((reply (worker-reply worker)))
+    ))
 
 ;;  Acceptor agent
 
-(defclass acceptor (agent) ())
+(defclass acceptor (agent)
+  ())
 
 (defmethod agent-epoll-events ((agent acceptor))
   (logior epoll:+in+ epoll:+err+))
 
-(define-condition accept-error (error)
-  ((acceptor :initarg acceptor
-             :reader accept-error-acceptor
-             :type acceptor)))
-
-(defmethod agent-error ((agent acceptor))
-  (error 'accept-error :acceptor agent))
-
-(defmethod agent-in ((agent acceptor))
-  (multiple-value-bind (clientfd clientaddr)
-      (cffi-sockets:accept (agent-fd agent))
-    (let ((worker (make-instance 'worker :fd clientfd :addr clientaddr)))
-      (epoll-add worker))))
+(define-condition acceptor-error (agent-error)
+  ())
+
+(defmethod agent-error ((acceptor acceptor) (epoll-fd fixnum))
+  (error 'acceptor-error :agent acceptor))
+
+(defmethod agent-in ((acceptor acceptor) (epoll-fd fixnum))
+  (multiple-value-bind (fd addr) (socket:accept (agent-fd acceptor))
+    (unless (eq :non-blocking fd)
+      (let* ((stream (babel-io-stream (fd-io-stream fd)))
+             (request (make-instance 'request :stream stream))
+             (reply (make-instance 'reply))
+             (worker (make-instance 'worker
+                                    :addr addr
+                                    :fd fd
+                                    :reader-cont (request-reader
+                                                  request reply #'request-cont)
+                                    :request request
+                                    :reply reply
+                                    :stream stream)))
+        (epoll-add epoll-fd worker)))))
 
 ;;  Thread event loop
 
 (defun event-loop-epoll (acceptfd)
-  (epoll:with (*epoll-fd*)
+  (epoll:with (epoll-fd)
     (let ((acceptor (make-instance 'acceptor :fd acceptfd)))
-      (epoll-add acceptor))
-    (epoll:wait (events fd *epoll-fd*)
+      (epoll-add epoll-fd acceptor))
+    (epoll:wait (events fd epoll-fd)
       (let ((agent (get-agent fd)))
         (cond ((not (= 0 (logand epoll:+err+ events)))
-               (agent-error agent))
+               (agent-error agent epoll-fd))
               ((not (= 0 (logand epoll:+in+ events)))
-               (agent-in agent))
+               (agent-in agent epoll-fd))
               ((not (= 0 (logand epoll:+out+ events)))
-               (agent-out agent)))))))
+               (agent-out agent epoll-fd)))))))
+
+;;
 
-;;  
 (defun acceptor-loop-epoll (fd)
   (declare (type (unsigned-byte 31) fd))
   )