Commit 9cfb6c228753a752d9326a8b39a77b77f790263c

Thomas de Grivel 2019-04-05T14:08:37

enable epoll on linux; fix kqueue

diff --git a/package.lisp b/package.lisp
index d88baf3..bda29d4 100644
--- a/package.lisp
+++ b/package.lisp
@@ -23,6 +23,7 @@
    :bordeaux-threads
    :cffi
    :cffi-errno
+   #+openbsd :cffi-kqueue
    :cffi-stat
    :cl-debug
    :cl-stream
diff --git a/thot-kqueue.lisp b/thot-kqueue.lisp
index d391d29..9f77aca 100644
--- a/thot-kqueue.lisp
+++ b/thot-kqueue.lisp
@@ -5,6 +5,8 @@
 
 (in-package :thot)
 
+(use-package :kqueue)
+
 ;;  kqueue infos
 
 (defclass kqueue-infos ()
@@ -22,7 +24,7 @@
        :reader agent-fd
        :type (unsigned-byte 31))))
 
-(defgeneric agent-kqueue-events (agent))
+(defgeneric agent-kqueue-filters (agent))
 (defgeneric agent-error (kqueue agent))
 (defgeneric agent-in (kqueue agent))
 (defgeneric agent-out (kqueue agent))
@@ -34,29 +36,52 @@
 
 ;;  Adding an agent
 
-(defmacro get-agent (kqueue fd)
-  `(gethash ,fd (kqueue-agents ,kqueue)))
+(defmacro get-agent (kq fd)
+  `(gethash ,fd (kqueue-agents ,kq)))
 
-(defun remove-agent (kqueue fd)
-  (declare (type kqueue-infos kqueue))
-  (remhash fd (kqueue-agents kqueue)))
+(defun remove-agent (kq fd)
+  (declare (type kqueue-infos kq))
+  (remhash fd (kqueue-agents kq)))
 
-(defun kqueue-add (kqueue agent)
-  (declare (type kqueue-infos kqueue))
-  (let ((fd (agent-fd agent)))
+(defun kqueue-add (kq agent)
+  (declare (type kqueue-infos kq))
+  (let* ((fd (agent-fd agent))
+         (filters (agent-kqueue-filters agent))
+         (filters-length (length filters)))
     (set-nonblocking fd)
-    (setf (get-agent kqueue fd) agent)
-    (kqueue:add (kqueue-fd kqueue) fd
-               (agent-kqueue-events agent)
-               :data-fd fd)))
-
-(defun kqueue-del (kqueue agent)
-  (declare (type kqueue-infos kqueue))
-  (let ((fd (agent-fd agent)))
-    (kqueue:del (kqueue-fd kqueue) fd)
+    (setf (get-agent kq fd) agent)
+    (cffi:with-foreign-object (change '(:struct kevent) filters-length)
+      (dotimes (i filters-length)
+        (let ((kev (mem-aptr change '(:struct kevent) i)))
+          (setf (kevent-ident kev) fd
+                (kevent-filter kev) (nth i filters)
+                (kevent-flags kev) +ev-add+
+                (kevent-fflags kev) 0
+                (kevent-data kev) 0
+                (kevent-udata kev) (null-pointer))))
+      (kqueue:kevent (kqueue-fd kq) :changes change
+                     :n-changes filters-length))))
+
+(defun kqueue-del (kq agent)
+  (declare (type kqueue-infos kq))
+  (let* ((fd (agent-fd agent))
+         (filters (agent-kqueue-filters agent))
+         (filters-length (length filters)))
+    (cffi:with-foreign-object (change '(:struct kevent) filters-length)
+      (dotimes (i filters-length)
+        (declare (type fixnum i))
+        (let ((kev (mem-aptr change '(:struct kevent) i)))
+          (setf (kevent-ident kev) fd
+                (kevent-filter kev) (nth i filters)
+                (kevent-flags kev) +ev-delete+
+                (kevent-fflags kev) 0
+                (kevent-data kev) 0
+                (kevent-udata kev) (cffi:null-pointer))))
+      (kqueue:kevent (kqueue-fd kq) :changes change
+                     :n-changes filters-length))
     (socket:shutdown fd t t)
     (unistd:close fd)
-    (remove-agent kqueue fd)))
+    (remove-agent kq fd)))
 
 ;;  Worker agent
 
@@ -76,21 +101,15 @@
             :accessor worker-request
             :type request)))
 
-(defmethod agent-kqueue-events ((worker worker))
-  (logior kqueue:+in+ kqueue:+out+ kqueue:+err+))
-
-(define-condition worker-error (agent-error)
-  ())
-
-(defmethod agent-error ((kqueue kqueue-infos) (worker worker))
-  (error 'worker-error :agent worker))
+(defmethod agent-kqueue-filters ((worker worker))
+  (list +evfilt-read+ +evfilt-write+))
 
-(defmethod agent-in ((kqueue kqueue-infos) (worker worker))
+(defmethod agent-in ((kq kqueue-infos) (worker worker))
   (let ((reader-cont (worker-reader-cont worker)))
     (when reader-cont
       (let ((result (handler-case (funcall reader-cont)
                       (warning (x) (format t "~A~%" x) :eof))))
-        (cond ((eq :eof result) (kqueue-del kqueue worker))
+        (cond ((eq :eof result) (kqueue-del kq worker))
               ((eq nil result) (setf (worker-reader-cont worker) nil))
               ((eq :keep-alive result) (setf (worker-keep-alive worker) t
                                              (worker-reader-cont worker) nil)
@@ -98,7 +117,7 @@
               ((functionp result) (setf (worker-reader-cont worker) result))
               (t (error "worker input error ~S" worker)))))))
 
-(defmethod agent-out ((kqueue kqueue-infos) (worker worker))
+(defmethod agent-out ((kq kqueue-infos) (worker worker))
   (let* ((request (worker-request worker))
          (reply (worker-reply worker))
          (babel-stream (reply-stream reply))
@@ -110,13 +129,13 @@
                   (setf (worker-reader-cont worker)
                         (request-reader (reset-request request)
                                         (reset-reply reply)))
-                  (agent-in kqueue worker))
+                  (agent-in kq worker))
                  (t
-                  (kqueue-del kqueue worker))))
+                  (kqueue-del kq worker))))
           (t
            (case (stream-flush-output stream)
              ((nil) nil)
-             ((:eof) (kqueue-del kqueue worker))
+             ((:eof) (kqueue-del kq worker))
              ((:non-blocking) :non-blocking)
              (otherwise (error 'stream-output-error :stream stream)))))))
 
@@ -125,24 +144,18 @@
 (defclass acceptor (agent)
   ())
 
-(defmethod agent-kqueue-events ((agent acceptor))
-  (logior kqueue:+in+ kqueue:+err+))
-
-(define-condition acceptor-error (agent-error)
-  ())
-
-(defmethod agent-error ((kqueue kqueue-infos) (acceptor acceptor))
-  (error 'acceptor-error :agent acceptor))
+(defmethod agent-kqueue-filters ((agent acceptor))
+  (list +evfilt-read+))
 
 (defun make-worker (fd addr)
   (let* ((request-stream (babel-input-stream (unistd-input-stream fd)))
          (reply-stream (babel-output-stream
                         (multi-buffered-output-stream
                          (unistd-output-stream fd))))
-         (request (make-instance 'request :stream request-stream))
-         (reply (make-instance 'reply :stream reply-stream
-                               :remote-addr (socket:sockaddr-to-string
-                                             addr)))
+         (request (make-instance 'request :stream request-stream
+                                 :remote-addr (socket:sockaddr-to-string
+                                               addr)))
+         (reply (make-instance 'reply :stream reply-stream))
          (reader-cont (request-reader request reply)))
     (make-instance 'worker
                    :addr addr
@@ -151,41 +164,49 @@
                    :request request
                    :reply reply)))
 
-(defmethod agent-in ((kqueue kqueue-infos) (acceptor acceptor))
+(defmethod agent-in ((kq kqueue-infos) (acceptor acceptor))
   (multiple-value-bind (fd addr) (socket:accept (agent-fd acceptor))
     (unless (eq :non-blocking fd)
-      (kqueue-add kqueue (make-worker fd addr)))))
+      (kqueue-add kq (make-worker fd addr)))))
 
 (defclass control (agent)
   ())
 
-(defmethod agent-kqueue-events ((agent control))
-  kqueue:+in+)
+(defmethod agent-kqueue-filters ((agent control))
+  (list +evfilt-read+))
 
-(defmethod agent-in ((kqueue kqueue-infos) (agent control))
+(defmethod agent-in ((kq kqueue-infos) (agent control))
   (setq *stop* t))
 
 ;;  Thread event loop
 
 (defun acceptor-loop-kqueue (listenfd &optional pipe)
   (declare (type unistd:file-descriptor listenfd))
-  (kqueue:with-kqueue (kqueue-fd)
-    (let ((kqueue (make-instance 'kqueue-infos :fd kqueue-fd)))
-      (kqueue-add kqueue (make-instance 'acceptor :fd listenfd))
+  (kqueue:with-kqueue (kq-fd)
+    (let ((kq (make-instance 'kqueue-infos :fd kq-fd)))
+      (kqueue-add kq (make-instance 'acceptor :fd listenfd))
       (when pipe
-        (kqueue-add kqueue (make-instance 'control :fd pipe)))
+        (kqueue-add kq (make-instance 'control :fd pipe)))
       (loop
          (when *stop*
            (return))
-         (kqueue:wait (events fd kqueue-fd)
-                     (let ((agent (get-agent kqueue fd)))
-                       (unless agent (error "bad kqueue fd ~S" fd))
-                       (cond ((not (= 0 (logand kqueue:+err+ events)))
-                              (agent-error kqueue agent))
-                             ((not (= 0 (logand kqueue:+in+ events)))
-                              (agent-in kqueue agent))
-                             ((not (= 0 (logand kqueue:+out+ events)))
-                              (agent-out kqueue agent)))))))))
+         (with-foreign-objects ((events '(:struct kqueue:kevent) 1000)
+                                (timeout '(:struct kqueue:timespec)))
+           (kqueue:seconds-to-timespec timeout 10)
+           (let ((n-events (kqueue:kevent kq-fd :events events
+                                          :n-events 1000
+                                          :timeout timeout)))
+             (declare (type fixnum n-events))
+             (dotimes (i n-events)
+               (let* ((event (mem-aptr events '(:struct kevent) i))
+                      (filter (kqueue:kevent-filter event))
+                      (fd (kqueue:kevent-ident event))
+                      (agent (get-agent kq fd)))
+                 (when agent
+                   (cond ((= filter kqueue:+evfilt-read+)
+                          (agent-in kq agent))
+                         ((= filter kqueue:+evfilt-write+)
+                          (agent-out kq agent))))))))))))
 
 (defun maybe-configure-kqueue ()
   (when (cffi:foreign-symbol-pointer "kqueue")
diff --git a/thot.asd b/thot.asd
index 6700201..a06e508 100644
--- a/thot.asd
+++ b/thot.asd
@@ -17,7 +17,8 @@
                "bordeaux-threads"
                "babel-stream"
                "cffi-dirent"
-               #+linux1 "cffi-epoll"
+               #+linux "cffi-epoll"
+               #+openbsd "cffi-kqueue"
                "cffi-socket"
                "cffi-stat"
                "cl-debug"
@@ -25,6 +26,7 @@
                "html-entities"
                "rol-uri"
                "str"
+               "trivial-utf-8"
                "unistd-stream")
   :components
   ((:file "package")
@@ -33,4 +35,5 @@
    (:file "thot" :depends-on ("mime"))
    (:file "thot-select" :depends-on ("thot"))
    (:file "thot-threaded" :depends-on ("thot"))
-   #+linux1 (:file "thot-epoll" :depends-on ("thot"))))
+   #+linux (:file "thot-epoll" :depends-on ("thot"))
+   #+openbsd (:file "thot-kqueue" :depends-on ("thot"))))