diff --git a/package.lisp b/package.lisp
index d88baf3..bda29d4 100644
--- a/package.lisp
+++ b/package.lisp
@@ -23,6 +23,7 @@
:bordeaux-threads
:cffi
:cffi-errno
+ #+openbsd :cffi-kqueue
:cffi-stat
:cl-debug
:cl-stream
diff --git a/thot-kqueue.lisp b/thot-kqueue.lisp
index d391d29..9f77aca 100644
--- a/thot-kqueue.lisp
+++ b/thot-kqueue.lisp
@@ -5,6 +5,8 @@
(in-package :thot)
+(use-package :kqueue)
+
;; kqueue infos
(defclass kqueue-infos ()
@@ -22,7 +24,7 @@
:reader agent-fd
:type (unsigned-byte 31))))
-(defgeneric agent-kqueue-events (agent))
+(defgeneric agent-kqueue-filters (agent))
(defgeneric agent-error (kqueue agent))
(defgeneric agent-in (kqueue agent))
(defgeneric agent-out (kqueue agent))
@@ -34,29 +36,52 @@
;; Adding an agent
-(defmacro get-agent (kqueue fd)
- `(gethash ,fd (kqueue-agents ,kqueue)))
+(defmacro get-agent (kq fd)
+ `(gethash ,fd (kqueue-agents ,kq)))
-(defun remove-agent (kqueue fd)
- (declare (type kqueue-infos kqueue))
- (remhash fd (kqueue-agents kqueue)))
+(defun remove-agent (kq fd)
+ (declare (type kqueue-infos kq))
+ (remhash fd (kqueue-agents kq)))
-(defun kqueue-add (kqueue agent)
- (declare (type kqueue-infos kqueue))
- (let ((fd (agent-fd agent)))
+(defun kqueue-add (kq agent)
+ (declare (type kqueue-infos kq))
+ (let* ((fd (agent-fd agent))
+ (filters (agent-kqueue-filters agent))
+ (filters-length (length filters)))
(set-nonblocking fd)
- (setf (get-agent kqueue fd) agent)
- (kqueue:add (kqueue-fd kqueue) fd
- (agent-kqueue-events agent)
- :data-fd fd)))
-
-(defun kqueue-del (kqueue agent)
- (declare (type kqueue-infos kqueue))
- (let ((fd (agent-fd agent)))
- (kqueue:del (kqueue-fd kqueue) fd)
+ (setf (get-agent kq fd) agent)
+ (cffi:with-foreign-object (change '(:struct kevent) filters-length)
+ (dotimes (i filters-length)
+ (let ((kev (mem-aptr change '(:struct kevent) i)))
+ (setf (kevent-ident kev) fd
+ (kevent-filter kev) (nth i filters)
+ (kevent-flags kev) +ev-add+
+ (kevent-fflags kev) 0
+ (kevent-data kev) 0
+ (kevent-udata kev) (null-pointer))))
+ (kqueue:kevent (kqueue-fd kq) :changes change
+ :n-changes filters-length))))
+
+(defun kqueue-del (kq agent)
+ (declare (type kqueue-infos kq))
+ (let* ((fd (agent-fd agent))
+ (filters (agent-kqueue-filters agent))
+ (filters-length (length filters)))
+ (cffi:with-foreign-object (change '(:struct kevent) filters-length)
+ (dotimes (i filters-length)
+ (declare (type fixnum i))
+ (let ((kev (mem-aptr change '(:struct kevent) i)))
+ (setf (kevent-ident kev) fd
+ (kevent-filter kev) (nth i filters)
+ (kevent-flags kev) +ev-delete+
+ (kevent-fflags kev) 0
+ (kevent-data kev) 0
+ (kevent-udata kev) (cffi:null-pointer))))
+ (kqueue:kevent (kqueue-fd kq) :changes change
+ :n-changes filters-length))
(socket:shutdown fd t t)
(unistd:close fd)
- (remove-agent kqueue fd)))
+ (remove-agent kq fd)))
;; Worker agent
@@ -76,21 +101,15 @@
:accessor worker-request
:type request)))
-(defmethod agent-kqueue-events ((worker worker))
- (logior kqueue:+in+ kqueue:+out+ kqueue:+err+))
-
-(define-condition worker-error (agent-error)
- ())
-
-(defmethod agent-error ((kqueue kqueue-infos) (worker worker))
- (error 'worker-error :agent worker))
+(defmethod agent-kqueue-filters ((worker worker))
+ (list +evfilt-read+ +evfilt-write+))
-(defmethod agent-in ((kqueue kqueue-infos) (worker worker))
+(defmethod agent-in ((kq kqueue-infos) (worker worker))
(let ((reader-cont (worker-reader-cont worker)))
(when reader-cont
(let ((result (handler-case (funcall reader-cont)
(warning (x) (format t "~A~%" x) :eof))))
- (cond ((eq :eof result) (kqueue-del kqueue worker))
+ (cond ((eq :eof result) (kqueue-del kq worker))
((eq nil result) (setf (worker-reader-cont worker) nil))
((eq :keep-alive result) (setf (worker-keep-alive worker) t
(worker-reader-cont worker) nil)
@@ -98,7 +117,7 @@
((functionp result) (setf (worker-reader-cont worker) result))
(t (error "worker input error ~S" worker)))))))
-(defmethod agent-out ((kqueue kqueue-infos) (worker worker))
+(defmethod agent-out ((kq kqueue-infos) (worker worker))
(let* ((request (worker-request worker))
(reply (worker-reply worker))
(babel-stream (reply-stream reply))
@@ -110,13 +129,13 @@
(setf (worker-reader-cont worker)
(request-reader (reset-request request)
(reset-reply reply)))
- (agent-in kqueue worker))
+ (agent-in kq worker))
(t
- (kqueue-del kqueue worker))))
+ (kqueue-del kq worker))))
(t
(case (stream-flush-output stream)
((nil) nil)
- ((:eof) (kqueue-del kqueue worker))
+ ((:eof) (kqueue-del kq worker))
((:non-blocking) :non-blocking)
(otherwise (error 'stream-output-error :stream stream)))))))
@@ -125,24 +144,18 @@
(defclass acceptor (agent)
())
-(defmethod agent-kqueue-events ((agent acceptor))
- (logior kqueue:+in+ kqueue:+err+))
-
-(define-condition acceptor-error (agent-error)
- ())
-
-(defmethod agent-error ((kqueue kqueue-infos) (acceptor acceptor))
- (error 'acceptor-error :agent acceptor))
+(defmethod agent-kqueue-filters ((agent acceptor))
+ (list +evfilt-read+))
(defun make-worker (fd addr)
(let* ((request-stream (babel-input-stream (unistd-input-stream fd)))
(reply-stream (babel-output-stream
(multi-buffered-output-stream
(unistd-output-stream fd))))
- (request (make-instance 'request :stream request-stream))
- (reply (make-instance 'reply :stream reply-stream
- :remote-addr (socket:sockaddr-to-string
- addr)))
+ (request (make-instance 'request :stream request-stream
+ :remote-addr (socket:sockaddr-to-string
+ addr)))
+ (reply (make-instance 'reply :stream reply-stream))
(reader-cont (request-reader request reply)))
(make-instance 'worker
:addr addr
@@ -151,41 +164,49 @@
:request request
:reply reply)))
-(defmethod agent-in ((kqueue kqueue-infos) (acceptor acceptor))
+(defmethod agent-in ((kq kqueue-infos) (acceptor acceptor))
(multiple-value-bind (fd addr) (socket:accept (agent-fd acceptor))
(unless (eq :non-blocking fd)
- (kqueue-add kqueue (make-worker fd addr)))))
+ (kqueue-add kq (make-worker fd addr)))))
(defclass control (agent)
())
-(defmethod agent-kqueue-events ((agent control))
- kqueue:+in+)
+(defmethod agent-kqueue-filters ((agent control))
+ (list +evfilt-read+))
-(defmethod agent-in ((kqueue kqueue-infos) (agent control))
+(defmethod agent-in ((kq kqueue-infos) (agent control))
(setq *stop* t))
;; Thread event loop
(defun acceptor-loop-kqueue (listenfd &optional pipe)
(declare (type unistd:file-descriptor listenfd))
- (kqueue:with-kqueue (kqueue-fd)
- (let ((kqueue (make-instance 'kqueue-infos :fd kqueue-fd)))
- (kqueue-add kqueue (make-instance 'acceptor :fd listenfd))
+ (kqueue:with-kqueue (kq-fd)
+ (let ((kq (make-instance 'kqueue-infos :fd kq-fd)))
+ (kqueue-add kq (make-instance 'acceptor :fd listenfd))
(when pipe
- (kqueue-add kqueue (make-instance 'control :fd pipe)))
+ (kqueue-add kq (make-instance 'control :fd pipe)))
(loop
(when *stop*
(return))
- (kqueue:wait (events fd kqueue-fd)
- (let ((agent (get-agent kqueue fd)))
- (unless agent (error "bad kqueue fd ~S" fd))
- (cond ((not (= 0 (logand kqueue:+err+ events)))
- (agent-error kqueue agent))
- ((not (= 0 (logand kqueue:+in+ events)))
- (agent-in kqueue agent))
- ((not (= 0 (logand kqueue:+out+ events)))
- (agent-out kqueue agent)))))))))
+ (with-foreign-objects ((events '(:struct kqueue:kevent) 1000)
+ (timeout '(:struct kqueue:timespec)))
+ (kqueue:seconds-to-timespec timeout 10)
+ (let ((n-events (kqueue:kevent kq-fd :events events
+ :n-events 1000
+ :timeout timeout)))
+ (declare (type fixnum n-events))
+ (dotimes (i n-events)
+ (let* ((event (mem-aptr events '(:struct kevent) i))
+ (filter (kqueue:kevent-filter event))
+ (fd (kqueue:kevent-ident event))
+ (agent (get-agent kq fd)))
+ (when agent
+ (cond ((= filter kqueue:+evfilt-read+)
+ (agent-in kq agent))
+ ((= filter kqueue:+evfilt-write+)
+ (agent-out kq agent))))))))))))
(defun maybe-configure-kqueue ()
(when (cffi:foreign-symbol-pointer "kqueue")
diff --git a/thot.asd b/thot.asd
index 6700201..a06e508 100644
--- a/thot.asd
+++ b/thot.asd
@@ -17,7 +17,8 @@
"bordeaux-threads"
"babel-stream"
"cffi-dirent"
- #+linux1 "cffi-epoll"
+ #+linux "cffi-epoll"
+ #+openbsd "cffi-kqueue"
"cffi-socket"
"cffi-stat"
"cl-debug"
@@ -25,6 +26,7 @@
"html-entities"
"rol-uri"
"str"
+ "trivial-utf-8"
"unistd-stream")
:components
((:file "package")
@@ -33,4 +35,5 @@
(:file "thot" :depends-on ("mime"))
(:file "thot-select" :depends-on ("thot"))
(:file "thot-threaded" :depends-on ("thot"))
- #+linux1 (:file "thot-epoll" :depends-on ("thot"))))
+ #+linux (:file "thot-epoll" :depends-on ("thot"))
+ #+openbsd (:file "thot-kqueue" :depends-on ("thot"))))