diff --git a/thot-epoll.lisp b/thot-epoll.lisp
index 91e312f..28caa6d 100644
--- a/thot-epoll.lisp
+++ b/thot-epoll.lisp
@@ -75,15 +75,35 @@
(let ((reader-cont (worker-reader-cont worker)))
(when reader-cont
(let ((result (funcall reader-cont)))
- (cond ((eq :eof result) (epoll-del worker))
+ (cond ((eq :eof result) (epoll-del epoll-fd worker))
((eq nil result) (setf (worker-reader-cont worker) nil))
- ((eq :keep-alive result) (setf (worker-keep-alive worker) t))
+ ((eq :keep-alive result) (setf (worker-keep-alive worker) t
+ (worker-reader-cont worker) nil)
+ :keep-alive)
((functionp result) (setf (worker-reader-cont worker) result))
(t (error "worker input error ~S" worker)))))))
(defmethod agent-out ((worker worker) (epoll-fd fixnum))
- (let ((reply (worker-reply worker)))
- ))
+ (let* ((request (worker-request worker))
+ (reply (worker-reply worker))
+ (babel-stream (reply-stream reply))
+ (stream (stream-underlying-stream babel-stream)))
+ (cond ((= 0 (stream-output-length stream))
+ (cond ((worker-keep-alive worker)
+ ;; read request body
+ (setf (worker-reader-cont worker)
+ (request-reader (reset-request request)
+ (reset-reply reply)
+ #'request-cont))
+ (agent-in worker epoll-fd))
+ (t
+ (epoll-del epoll-fd worker))))
+ (t
+ (case (stream-flush-output-buffer stream)
+ ((nil) nil)
+ ((:eof) (epoll-del epoll-fd worker))
+ ((:non-blocking) :non-blocking)
+ (otherwise (error 'stream-output-error :stream stream)))))))
;; Acceptor agent
@@ -99,39 +119,65 @@
(defmethod agent-error ((acceptor acceptor) (epoll-fd fixnum))
(error 'acceptor-error :agent acceptor))
+(defun make-worker (fd addr)
+ (let* ((request-stream (babel-input-stream (fd-input-stream fd)))
+ (reply-stream (babel-output-stream
+ (multi-buffered-output-stream
+ (fd-output-stream fd))))
+ (request (make-instance 'request :stream request-stream))
+ (reply (make-instance 'reply :stream reply-stream))
+ (reader-cont (request-reader request reply #'request-cont)))
+ (make-instance 'worker
+ :addr addr
+ :fd fd
+ :reader-cont reader-cont
+ :request request
+ :reply reply)))
+
(defmethod agent-in ((acceptor acceptor) (epoll-fd fixnum))
(multiple-value-bind (fd addr) (socket:accept (agent-fd acceptor))
(unless (eq :non-blocking fd)
- (let* ((stream (babel-io-stream (fd-io-stream fd)))
- (request (make-instance 'request :stream stream))
- (reply (make-instance 'reply))
- (worker (make-instance 'worker
- :addr addr
- :fd fd
- :reader-cont (request-reader
- request reply #'request-cont)
- :request request
- :reply reply
- :stream stream)))
- (epoll-add epoll-fd worker)))))
+ (epoll-add epoll-fd (make-worker fd addr)))))
+
+(defclass control (agent)
+ ())
+
+(defmethod agent-epoll-events ((agent control))
+ epoll:+in+)
+
+(defmethod agent-in ((agent control) (epoll-fd fixnum))
+ (setq *stop* t))
;; Thread event loop
-(defun event-loop-epoll (acceptfd)
- (epoll:with (epoll-fd)
- (let ((acceptor (make-instance 'acceptor :fd acceptfd)))
- (epoll-add epoll-fd acceptor))
- (epoll:wait (events fd epoll-fd)
- (let ((agent (get-agent fd)))
- (cond ((not (= 0 (logand epoll:+err+ events)))
- (agent-error agent epoll-fd))
- ((not (= 0 (logand epoll:+in+ events)))
- (agent-in agent epoll-fd))
- ((not (= 0 (logand epoll:+out+ events)))
- (agent-out agent epoll-fd)))))))
-
-;;
-
-(defun acceptor-loop-epoll (fd)
- (declare (type (unsigned-byte 31) fd))
- )
+(defun acceptor-loop-epoll (listenfd &optional pipe)
+ (declare (type (unsigned-byte 31) listenfd))
+ (labels ((acceptor-loop-epoll-fun ()
+ (epoll:with (epoll-fd)
+ (epoll-add epoll-fd (make-instance 'acceptor :fd listenfd))
+ (when pipe
+ (epoll-add epoll-fd (make-instance 'control :fd pipe)))
+ (loop
+ (when *stop*
+ (return))
+ (epoll:wait (events fd epoll-fd)
+ (let ((agent (get-agent fd)))
+ (unless agent (error "bad epoll fd ~S" fd))
+ (cond ((not (= 0 (logand epoll:+err+ events)))
+ (agent-error agent epoll-fd))
+ ((not (= 0 (logand epoll:+in+ events)))
+ (agent-in agent epoll-fd))
+ ((not (= 0 (logand epoll:+out+ events)))
+ (agent-out agent epoll-fd)))))))))
+ #'acceptor-loop-epoll-fun))
+
+(when (cffi:foreign-symbol-pointer "epoll_create")
+ (cond ((eq *acceptor-loop* 'acceptor-loop-simple)
+ (setq *acceptor-loop* 'acceptor-loop-epoll))
+ ((eq *acceptor-loop* 'acceptor-loop-threaded)
+ (setq *worker-thread-for-fd* 'acceptor-loop-epoll))))
+
+(trace socket:socket socket:bind socket:listen socket:accept
+ unistd:close
+ epoll-add epoll-del
+ acceptor-loop-epoll make-worker agent-in agent-out)
diff --git a/thot.asd b/thot.asd
index 74cfc99..cca886c 100644
--- a/thot.asd
+++ b/thot.asd
@@ -21,4 +21,5 @@
((:file "package")
(:file "thot" :depends-on ("package"))
(:file "thot-simple" :depends-on ("thot"))
- (:file "thot-threaded" :depends-on ("thot-simple"))))
+ (:file "thot-threaded" :depends-on ("thot-simple"))
+ (:file "thot-epoll" :depends-on ("thot-threaded"))))
diff --git a/thot.lisp b/thot.lisp
index 119baf2..6f8c03f 100644
--- a/thot.lisp
+++ b/thot.lisp
@@ -32,6 +32,16 @@
:accessor request-query%
:type string)))
+(defun reset-request (&optional (request *request*))
+ (declare (type request request))
+ (setf (request-method% request) nil
+ (request-target% request) nil
+ (request-http-version% request) nil
+ (request-uri% request) nil
+ (request-query% request) nil)
+ (clrhash (request-headers% request))
+ request)
+
(defvar *request*)
(defun request-stream (&optional (request *request*))
@@ -196,6 +206,13 @@
:accessor reply-stream%
:type buffered-output-stream)))
+(defun reset-reply (&optional (reply *reply*))
+ (declare (type reply reply))
+ (setf (reply-status% reply) nil
+ (reply-headers% reply) nil
+ (reply-headers-sent% reply) nil)
+ reply)
+
(defvar *reply*)
(defun reply-status (&optional (reply *reply*))