diff --git a/thot-epoll.lisp b/thot-epoll.lisp
index 1795100..15e4a5e 100644
--- a/thot-epoll.lisp
+++ b/thot-epoll.lisp
@@ -20,7 +20,10 @@
(defclass agent ()
((fd :initarg :fd
:reader agent-fd
- :type (unsigned-byte 31))))
+ :type (unsigned-byte 31))
+ (pending :initform nil
+ :accessor agent-pending
+ :type boolean)))
(defgeneric agent-epoll-events (agent))
(defgeneric agent-error (epoll agent))
@@ -50,8 +53,14 @@
(agent-epoll-events agent)
:data-fd fd)))
+(defun epoll-mod (epoll agent events)
+ (declare (type epoll-infos epoll))
+ (let ((fd (agent-fd agent)))
+ (epoll:mod (epoll-fd epoll) fd events :data-fd fd)))
+
(defun epoll-del (epoll agent)
(declare (type epoll-infos epoll))
+ (setf (agent-pending agent) t)
(let ((fd (agent-fd agent)))
(epoll:del (epoll-fd epoll) fd)
(socket:shutdown fd t t)
@@ -87,39 +96,33 @@
(defmethod agent-in ((epoll epoll-infos) (worker worker))
(let ((reader-cont (worker-reader-cont worker)))
- (when reader-cont
- (let ((result (handler-case (funcall reader-cont)
- (warning (x) (format t "~A~%" x) :eof))))
- (cond ((eq :eof result) (epoll-del epoll worker))
- ((eq nil result)
- (setf (worker-reader-cont worker) nil)
- (stream-flush-output (reply-stream (worker-reply worker))))
- ((eq :keep-alive result) (setf (worker-keep-alive worker) t
- (worker-reader-cont worker) nil)
- :keep-alive)
- ((functionp result) (setf (worker-reader-cont worker) result))
- (t (error "worker input error ~S" worker)))))))
+ (if reader-cont
+ (let ((result (funcall reader-cont)))
+ (cond ((null result) (setf (worker-reader-cont worker) nil))
+ ((eq :keep-alive result)
+ (let ((request (worker-request worker))
+ (reply (worker-reply worker)))
+ (setf (worker-reader-cont worker)
+ (request-reader (reset-request request)
+ (reset-reply reply)))))
+ ((functionp result) (setf (worker-reader-cont worker)
+ result))
+ ((eq :eof result) (epoll-del epoll worker))
+ (t (error "worker input error ~S" worker))))
+ (when (= 0 (stream-output-length
+ (reply-stream (worker-reply worker))))
+ (epoll-del epoll worker)))))
(defmethod agent-out ((epoll epoll-infos) (worker worker))
(let* ((request (worker-request worker))
(reply (worker-reply worker))
(babel-stream (reply-stream reply))
(stream (stream-underlying-stream babel-stream)))
- (cond ((and (null (worker-reader-cont worker))
- (= 0 (the integer (stream-output-length stream))))
- (cond ((worker-keep-alive worker)
- ;; read request body
- (setf (worker-reader-cont worker)
- (request-reader (reset-request request)
- (reset-reply reply))))
- (t
- (epoll-del epoll worker))))
- (t
- (case (stream-flush-output stream)
- ((nil) nil)
- ((:eof) (epoll-del epoll worker))
- ((:non-blocking) :non-blocking)
- (otherwise (error 'stream-output-error :stream stream)))))))
+ (case (stream-flush-output stream)
+ ((:eof) (epoll-del epoll worker)))
+ (unless (worker-reader-cont worker)
+ (when (= 0 (stream-output-length stream))
+ (epoll-del epoll worker)))))
;; Acceptor agent
@@ -153,7 +156,8 @@
(defmethod agent-in ((epoll epoll-infos) (acceptor acceptor))
(multiple-value-bind (fd addr) (socket:accept (agent-fd acceptor))
(unless (eq :non-blocking fd)
- (epoll-add epoll (make-worker fd addr)))))
+ (let ((worker (make-worker fd addr)))
+ (epoll-add epoll worker)))))
(defclass control (agent)
())
@@ -176,15 +180,17 @@
(loop
(when *stop*
(return))
- (epoll:wait (events fd epoll-fd)
+ (epoll:wait (events fd epoll-fd 10000 -1)
(let ((agent (get-agent epoll fd)))
(unless agent (error "bad epoll fd ~S" fd))
- (cond ((not (= 0 (logand epoll:+err+ events)))
- (agent-error epoll agent))
- ((not (= 0 (logand epoll:+in+ events)))
- (agent-in epoll agent))
- ((not (= 0 (logand epoll:+out+ events)))
- (agent-out epoll agent)))))))))
+ (unless (= 0 (logand epoll:+err+ events))
+ (agent-error epoll agent))
+ (unless (or (agent-pending agent)
+ (= 0 (logand epoll:+in+ events)))
+ (agent-in epoll agent))
+ (unless (or (agent-pending agent)
+ (= 0 (logand epoll:+out+ events)))
+ (agent-out epoll agent))))))))
(defun maybe-configure-epoll ()
(when (cffi:foreign-symbol-pointer "epoll_create")
@@ -193,4 +199,10 @@
(eval-when (:load-toplevel :execute)
(maybe-configure-epoll))
-;;(untrace socket:socket socket:bind socket:listen socket:accept unistd:close epoll:create epoll-add epoll-del acceptor-loop-epoll make-worker agent-in agent-out)
+#+nil
+(trace
+ epoll:create epoll-add epoll-del
+ acceptor-loop-epoll make-worker agent-in agent-out agent-error
+ stream-flush-output unistd:c-write
+ stream-output-index
+ )