diff --git a/package.lisp b/package.lisp
index b064139..4e82d73 100644
--- a/package.lisp
+++ b/package.lisp
@@ -15,11 +15,15 @@
(defpackage :thot
(:use
+ :babel-stream
:bordeaux-threads
:cffi
:cffi-errno
:cl-debug
+ :cl-stream
+ :fd-stream
:common-lisp)
+ #.(cl-stream:shadowing-import-from)
(:export
#:request
#:*request*
@@ -41,4 +45,4 @@
#:end-headers
#:content
#:start
- #:stop))
+ #:start-threaded))
diff --git a/thot-epoll.lisp b/thot-epoll.lisp
new file mode 100644
index 0000000..21aa89a
--- /dev/null
+++ b/thot-epoll.lisp
@@ -0,0 +1,102 @@
+
+(in-package :thot)
+
+;; Generic epoll agent class
+
+(defclass agent ()
+ ((fd :initarg :fd
+ :reader agent-fd
+ :type (unsigned-byte 31))))
+
+(defgeneric agent-epoll-events (agent))
+(defgeneric agent-error (agent))
+(defgeneric agent-in (agent))
+(defgeneric agent-out (agent))
+
+;; Adding an agent
+
+(defvar *epoll-fd*)
+
+(defvar *epoll-agents*
+ (make-hash-table))
+
+(defmacro get-agent (fd)
+ `(gethash ,fd *epoll-agents*))
+
+(defun set-nonblocking (fd)
+ (let ((flags (fcntl:getfl fd)))
+ (fcntl:setfl fd (logior +o-nonblock+ flags))))
+
+(defun epoll-add (agent)
+ (let ((fd (agent-fd agent)))
+ (set-nonblocking fd)
+ (setf (get-agent fd) agent)
+ (epoll:add *epoll-fd* fd
+ (agent-epoll-events agent)
+ :data-fd fd)))
+
+;; Worker agent
+
+(defclass worker (agent)
+ ((stream :reader worker-stream
+ :type stream)
+ (addr :initarg :addr
+ :reader worker-addr)
+ (request :initform nil
+ :accessor worker-request
+ :type (or null request))
+ (reply :initform nil
+ :accessor worker-reply)))
+
+(defmethod agent-epoll-events ((agent worker))
+ (logior epoll:+in+ epoll:+out+ epoll:+err+))
+
+(defmethod agent-error ((agent worker))
+ (error "worker"))
+
+(defmethod agent-in ((agent worker))
+ )
+
+(defmethod agent-out ((agent worker))
+ )
+
+;; Acceptor agent
+
+(defclass acceptor (agent) ())
+
+(defmethod agent-epoll-events ((agent acceptor))
+ (logior epoll:+in+ epoll:+err+))
+
+(define-condition accept-error (error)
+ ((acceptor :initarg acceptor
+ :reader accept-error-acceptor
+ :type acceptor)))
+
+(defmethod agent-error ((agent acceptor))
+ (error 'accept-error :acceptor agent))
+
+(defmethod agent-in ((agent acceptor))
+ (multiple-value-bind (clientfd clientaddr)
+ (cffi-sockets:accept (agent-fd agent))
+ (let ((worker (make-instance 'worker :fd clientfd :addr clientaddr)))
+ (epoll-add worker))))
+
+;; Thread event loop
+
+(defun event-loop-epoll (acceptfd)
+ (epoll:with (*epoll-fd*)
+ (let ((acceptor (make-instance 'acceptor :fd acceptfd)))
+ (epoll-add acceptor))
+ (epoll:wait (events fd *epoll-fd*)
+ (let ((agent (get-agent fd)))
+ (cond ((not (= 0 (logand epoll:+err+ events)))
+ (agent-error agent))
+ ((not (= 0 (logand epoll:+in+ events)))
+ (agent-in agent))
+ ((not (= 0 (logand epoll:+out+ events)))
+ (agent-out agent)))))))
+
+;;
+(defun acceptor-loop-epoll (fd)
+ (declare (type (unsigned-byte 31) fd))
+ )
diff --git a/thot-single.lisp b/thot-single.lisp
new file mode 100644
index 0000000..1c28085
--- /dev/null
+++ b/thot-single.lisp
@@ -0,0 +1,13 @@
+
+(in-package :thot)
+
+(defun acceptor-loop (fd)
+ (declare (type (unsigned-byte 31) fd))
+ (loop
+ (when *stop*
+ (return))
+ (cffi-socket:with-accept (clientfd) fd
+ (with-stream (stream (babel-io-stream (fd-io-stream clientfd)))
+ (request-loop stream)))))
+
+(setq *acceptor-loop* 'acceptor-loop)
diff --git a/thot-threaded.lisp b/thot-threaded.lisp
new file mode 100644
index 0000000..7ddc2ee
--- /dev/null
+++ b/thot-threaded.lisp
@@ -0,0 +1,54 @@
+
+(in-package :thot)
+
+(defvar *listen-fd*)
+
+(defun worker-thread ()
+ ;(format t "~&WORKER THREAD~%")
+ (loop
+ (when *stop*
+ (return))
+ (cffi-socket:with-accept (clientfd) *listen-fd*
+ (with-stream (stream (babel-io-stream (fd-io-stream clientfd)))
+ (request-loop stream)))))
+
+(defparameter *init-threads* 8)
+
+(defvar *worker-threads*)
+(defvar *worker-sockfds*)
+
+(defun init-worker-threads (n)
+ (loop
+ (when (<= n (length *worker-threads*))
+ (return))
+ (let ((thread (bordeaux-threads:make-thread 'worker-thread
+ :name "worker")))
+ (push thread *worker-threads*))))
+
+(defun join-worker-threads ()
+ (setq *stop* t)
+ (bordeaux-set:set-each (lambda (sockfd)
+ (cffi-socket:shutdown sockfd t t))
+ *worker-sockfds*)
+ (loop
+ (when (endp *worker-threads*)
+ (return))
+ (let ((thread (pop *worker-threads*)))
+ (bordeaux-threads:join-thread thread))))
+
+(defmacro with-worker-threads (count &body body)
+ `(let ((*worker-threads* ())
+ (*worker-sockfds* (make-instance 'bordeaux-set:set)))
+ (init-worker-threads ,count)
+ (unwind-protect (progn ,@body)
+ (join-worker-threads))))
+
+(defun acceptor-loop-threaded (fd)
+ (declare (type (unsigned-byte 31) fd))
+ (setq *stop* nil
+ *listen-fd* fd)
+ (with-worker-threads *init-threads*
+ (worker-thread)))
+
+(when bordeaux-threads:*supports-threads-p*
+ (setq *acceptor-loop* 'acceptor-loop-threaded))
diff --git a/thot.asd b/thot.asd
index a91c913..c828758 100644
--- a/thot.asd
+++ b/thot.asd
@@ -8,10 +8,16 @@
(defsystem "thot"
:depends-on ("babel"
+ "bordeaux-queue"
+ "bordeaux-set"
"bordeaux-threads"
- "cffi-posix"
- "cffi-sockets-flexi"
- "cl-debug")
+ "babel-stream"
+ "cffi-socket"
+ "cl-debug"
+ "cl-stream"
+ "fd-stream")
:components
((:file "package")
- (:file "thot" :depends-on ("package"))))
+ (:file "thot" :depends-on ("package"))
+ (:file "thot-single" :depends-on ("thot"))
+ (:file "thot-threaded" :depends-on ("thot-single"))))
diff --git a/thot.lisp b/thot.lisp
index 126b1f6..6658e3c 100644
--- a/thot.lisp
+++ b/thot.lisp
@@ -1,93 +1,50 @@
(in-package :thot)
-(setf (debug-p :thot) t)
+(setf (debug-p :thot) nil)
(eval-when (:compile-toplevel :load-toplevel :execute)
(unless (boundp '+crlf+)
(defconstant +crlf+
(coerce '(#\Return #\Newline) 'string))))
-(defun read-line-crlf (stream)
- (with-output-to-string (out)
- (let ((cr))
- (loop
- (let ((c (read-char stream)))
- (when cr
- (when (char= #\Newline c)
- (return))
- (write-char cr out)
- (setf cr nil))
- (if (char= #\Return c)
- (setf cr c)
- (write-char c out)))))))
-
-(defun read-until (end-char stream)
- "Reads stream into a string until END-CHAR is read.
-END-CHAR is not returned as part of the string."
- (with-output-to-string (out)
- (loop
- (let ((c (read-char stream)))
- (when (char= end-char c)
- (return))
- (write-char c out)))))
+;; Request
(defclass request ()
- ((socket :initarg :socket
- :reader request-socket%
+ ((stream :initarg :stream
+ :reader request-stream%
:type stream)
(method :initarg :method
- :reader request-method%
+ :accessor request-method%
:type symbol)
- (url :initarg :url
- :reader request-url%
- :type string)
- (uri :initform nil
- :accessor request-uri%
- :type string)
- (query :initform nil
- :accessor request-query%
- :type string)
+ (target :initarg :target
+ :accessor request-target%
+ :type string)
(http-version :initarg :http-version
- :reader request-http-version%
- :type string)
+ :accessor request-http-version%
+ :type string)
(headers :initform (make-hash-table :test 'equalp :size 32)
:reader request-headers%
:type hash-table)
- (data :initform nil
- :accessor request-data%
- :type string)))
+ (uri :accessor request-uri%
+ :type string)
+ (query :initform nil
+ :accessor request-query%
+ :type string)))
(defvar *request*)
-(defun request-socket (&optional (request *request*))
+(defun request-stream (&optional (request *request*))
(declare (type request request))
- (request-socket% request))
+ (request-stream% request))
(defun request-method (&optional (request *request*))
(declare (type request request))
(request-method% request))
-(defun request-url (&optional (request *request*))
+(defun request-target (&optional (request *request*))
(declare (type request request))
- (request-url% request))
-
-(defun split-uri-query (&optional (request *request*))
- (let* ((url (thot:request-url request))
- (url-? (position #\? url))
- (uri (if url-?
- (subseq url 0 url-?)
- url))
- (query (if url-?
- (subseq url url-?)
- nil)))
- (setf (request-uri% request) uri
- (request-query% request) query)))
-
-(defun request-uri (&optional (request *request*))
- (unless (request-uri% request)
- (split-uri-query request)
- (request-uri% request)))
+ (request-target% request))
(defun request-http-version (&optional (request *request*))
(declare (type request request))
@@ -97,50 +54,129 @@ END-CHAR is not returned as part of the string."
(declare (type request request))
(request-headers% request))
-(defun request-header (name &optional (request *request*))
- (gethash name (request-headers request)))
+(defun split-request-uri-and-query (request)
+ (declare (type request request))
+ (let* ((target (request-target% request))
+ (target-? (position #\? target))
+ (uri (if target-?
+ (subseq target 0 target-?)
+ target))
+ (query (if target-?
+ (subseq target target-?)
+ nil)))
+ (setf (request-uri% request) uri
+ (request-query% request) query)))
+
+(defun request-uri (&optional (request *request*))
+ (declare (type request request))
+ (unless (slot-boundp request 'uri)
+ (split-request-uri-and-query request))
+ (slot-value request 'uri))
+
+(defun request-query (&optional (request *request*))
+ (declare (type request request))
+ (unless (slot-boundp request 'query)
+ (split-request-uri-and-query request))
+ (slot-value request 'query))
+
+(defun request-header (header-name &optional (request *request*))
+ (declare (type request request))
+ (gethash header-name (request-headers% request)))
+
+(defsetf request-header (header-name &optional (request '*request*)) (value)
+ `(setf (gethash ,header-name (request-headers ,request)) ,value))
(defun request-content-length (&optional (request *request*))
(parse-integer (request-header "Content-Length" request)))
-(defun request-data (&optional (request *request*))
- (let* ((length (request-content-length request))
- (data (make-array length :element-type '(unsigned-byte 8)))
- (socket (request-socket request))
- (encoding (flexi-streams:flexi-stream-external-format socket)))
- (read-sequence data socket)
- (babel:octets-to-string data :encoding encoding)))
-
-(defun read-spaces (socket)
- (loop
- (unless (char= #\Space (peek-char nil socket))
- (return))
- (read-char socket)))
-
-(defun read-header (socket headers)
- (unless (and (char= #\Return (peek-char nil socket))
- (read-char socket)
- (char= #\Newline (read-char socket)))
- (let ((name (read-until #\: socket)))
- (read-spaces socket)
- (let ((value (read-line-crlf socket)))
- (when (debug-p (or :thot :http))
- (format t "~&thot: ~A: ~A~%" name value))
- (setf (gethash name headers) value)))))
-
-(defun read-request (socket)
- (let* ((method (find-symbol (read-until #\Space socket) :http-method))
- (url (read-until #\Space socket))
- (http-version (read-line-crlf socket))
- (request (make-instance 'request :socket socket :method method
- :url url :http-version http-version))
- (headers (request-headers request)))
- (when (debug-p (or :thot :http))
- (format t "~&thot: ~A ~A ~A~%" method url http-version))
- (loop
- (unless (read-header socket headers)
- (return)))
- request))
+;; HTTP parser
+
+(defmacro with-readers-for (stream definitions &body body)
+ (declare (type symbol stream))
+ (flet ((reader (definition)
+ (destructuring-bind (name (element) &rest body) definition
+ (declare (type symbol name element))
+ (let ((state (gensym "STATE-")))
+ `(,name ()
+ (multiple-value-bind (,element ,state) (read ,stream)
+ (case ,state
+ ((nil) ,@body)
+ ((:eof) :eof)
+ ((:non-blocking) #',name)
+ (otherwise (error 'stream-input-error
+ :stream ,stream)))))))))
+ `(labels ,(mapcar #'reader definitions)
+ ,@body)))
+
+(defun request-reader (stream cont)
+ (let ((*request* (make-instance 'request))
+ (buffer (string-output-stream))
+ (name "")
+ (value ""))
+ (flet ((get-buffer ()
+ (prog1 (string-output-stream-string buffer)
+ (sequence-output-stream-reset buffer))))
+ (with-readers-for stream
+ ((method (char)
+ (cond ((char= #\Space char)
+ (setf (request-method% *request*) (get-buffer))
+ (target))
+ (t (write buffer char)
+ (method))))
+ (target (char)
+ (cond ((char= #\Space char)
+ (setf (request-target% *request*) (get-buffer))
+ (version))
+ (t (write buffer char)
+ (target))))
+ (version (char)
+ (cond ((char= #\Return char)
+ (setf (request-http-version% *request*) (get-buffer))
+ (version-lf))
+ (t (write buffer char)
+ (version))))
+ (version-lf (char)
+ (cond ((char= #\Newline char)
+ (when (debug-p (or :thot :http))
+ (format t "~&thot: ~A ~A ~A~%"
+ (request-method)
+ (request-target)
+ (request-http-version)))
+ (next-header))
+ (t (error "Missing request line LF"))))
+ (next-header (char)
+ (cond ((char= #\Return char) (end-of-headers))
+ (t (write buffer char)
+ (header-name))))
+ (header-name (char)
+ (cond ((char= #\: char)
+ (setq name (get-buffer))
+ (header-spaces))
+ (t (write buffer char)
+ (header-name))))
+ (header-spaces (char)
+ (cond ((char= #\Space char) (header-spaces))
+ (t (write buffer char)
+ (header-value))))
+ (header-value (char)
+ (cond ((char= #\Return char)
+ (setq value (get-buffer))
+ (header-lf))
+ (t (write buffer char)
+ (header-value))))
+ (header-lf (char)
+ (cond ((char= #\Newline char)
+ (when (debug-p (or :thot :http))
+ (format t "~&thot: ~A: ~A~%" name value))
+ (setf (request-header name *request*) value)
+ (next-header))
+ (t (error "Missing header LF"))))
+ (end-of-headers (char)
+ (cond ((char= #\Newline char) (funcall cont))
+ (t (error "Missing end of headers LF")))))
+ #'method))))
+
+;; Reply
(defclass reply ()
((status :initform nil
@@ -148,7 +184,8 @@ END-CHAR is not returned as part of the string."
(headers :initform nil
:accessor reply-headers%)
(headers-sent :initform nil
- :accessor reply-headers-sent%)))
+ :accessor reply-headers-sent%
+ :type boolean)))
(defvar *reply*)
@@ -188,7 +225,11 @@ END-CHAR is not returned as part of the string."
(when status
(error 'status-already-sent status line)))
(setf (reply-status) line)
- (format (request-socket) "~A ~A~A" (request-http-version) line +crlf+))
+ (let ((stream (request-stream)))
+ (write-sequence stream (request-http-version))
+ (write stream #\Space)
+ (write-sequence stream line)
+ (write-sequence stream +crlf+)))
(defun header (line)
(unless (reply-status)
@@ -201,7 +242,9 @@ END-CHAR is not returned as part of the string."
(setf (rest headers) (list line))
(return))
(pop headers))))
- (format (request-socket) "~A~A" line +crlf+))
+ (let ((stream (request-stream)))
+ (write-sequence stream line)
+ (write-sequence stream +crlf+)))
(defun end-headers ()
(unless (reply-headers-sent)
@@ -210,7 +253,7 @@ END-CHAR is not returned as part of the string."
(defun content (string)
(end-headers)
- (write-string string (request-socket)))
+ (write-sequence (request-stream) string))
(defun 404-not-found ()
(status "404 Not found")
@@ -218,7 +261,7 @@ END-CHAR is not returned as part of the string."
(content (format nil "404 Not found
The requested url ~S was not found on this server."
- (request-url *request*))))
+ (request-target))))
(defun 404-not-found-handler ()
'404-not-found)
@@ -226,9 +269,8 @@ The requested url ~S was not found on this server."
(defvar *url-handlers*
'(404-not-found-handler))
-(defun handle-request (request)
+(defun handle-request ()
(let ((handlers *url-handlers*)
- (*request* request)
(*reply* (make-instance 'reply)))
(loop
(when (endp handlers)
@@ -239,103 +281,28 @@ The requested url ~S was not found on this server."
(when (debug-p (or :thot :http))
(format t "~&~S -> ~S~%" handler-func handler))
(funcall handler)
- (return))))))
+ (return))))
+ (if (string-equal "keep-alive" (request-header 'connection))
+ :keep-alive
+ nil)))
-(defun request-loop (socket)
- (loop
- (let ((request (read-request socket)))
- (unless request
+(defvar *stop* nil)
+
+(defun request-loop (stream)
+ (ignore-errors
+ (loop
+ (when *stop*
(return))
- (handle-request request)
- (unless (string-equal "keep-alive" (request-header 'connection request))
- (return)))))
+ (unless (eq :keep-alive
+ (funcall (request-reader stream #'handle-request)))
+ (return)))))
-(defun acceptor-loop (fd)
- (declare (type (unsigned-byte 31) fd))
- (loop
- (cffi-sockets:with-accept (clientfd fd)
- (cffi-sockets-flexi:with-socket-stream (socket clientfd)
- (request-loop socket)))))
+(defvar *acceptor-loop*)
(defun start (&key (address "0.0.0.0") (port 8000))
- (cffi-sockets:with-socket (fd cffi-sockets:+af-inet+
- cffi-sockets:+sock-stream+
- 0)
- (cffi-sockets:bind-inet fd address port)
- (cffi-sockets:listen-sock fd 128)
- (acceptor-loop fd)))
-
-(defvar *pipe-in* -1)
-(defvar *pipe-out* -1)
-
-(defun read-fd ()
- (with-foreign-object (fd :int)
- (let ((r (cffi-posix:c-read *pipe-in* fd (foreign-type-size :int))))
- (when (< r 0)
- (error-errno "read"))
- (when (< 0 r)
- (mem-aref fd :int)))))
-
-(defun write-fd (fd)
- (with-foreign-object (fd% :int)
- (setf (mem-aref fd% :int) fd)
- (let ((w (cffi-posix:c-write *pipe-out* fd% (foreign-type-size :int))))
- (when (< w 0)
- (error-errno "write"))
- (when (< 0 w)
- fd))))
-
-(defun worker-thread ()
- ;(format t "~&WORKER THREAD~%")
- (loop
- (let ((sockfd (read-fd)))
- (unless sockfd
- (return))
- (cffi-sockets-flexi:with-socket-stream (socket sockfd)
- (request-loop socket)))))
-
-(defparameter *init-threads* 8)
-
-(defvar *threads*)
-
-(defun init-threads (n)
- (loop
- (when (<= n (length *threads*))
- (return))
- (let* ((*thread-id* (decf n))
- (thread (bordeaux-threads:make-thread 'worker-thread)))
- (push thread *threads*))))
-
-(defun join-threads ()
- (loop
- (when (endp *threads*)
- (return))
- (let ((thread (pop *threads*)))
- (bordeaux-threads:join-thread thread))))
-
-(defmacro with-threads (n &body body)
- `(let ((*threads* ()))
- (init-threads ,n)
- (unwind-protect (progn ,@body)
- (join-threads))))
-
-(defun acceptor-loop-threaded (fd)
- (declare (type (unsigned-byte 31) fd))
- (loop
- (let ((clientfd (cffi-sockets:accept fd)))
- (write-fd clientfd))))
-
-(defun start-threaded (&key (address "0.0.0.0") (port 8000))
- (cffi-sockets:with-socket (fd cffi-sockets:+af-inet+
- cffi-sockets:+sock-stream+
- 0)
- (cffi-sockets:bind-inet fd address port)
- (cffi-sockets:listen-sock fd 128)
- (cffi-posix:with-pipe (in out)
- (setq *pipe-in* in *pipe-out* out)
- (with-threads *init-threads*
- (acceptor-loop-threaded fd)))))
-
-(untrace read-fd write-fd worker-thread init-threads join-threads acceptor-loop-threaded)
-(untrace cffi-sockets:socket cffi-sockets:accept)
-(untrace cffi-posix:pipe cffi-posix:close)
+ (cffi-socket:with-socket (fd cffi-socket:+af-inet+
+ cffi-socket:+sock-stream+
+ 0)
+ (cffi-socket:bind-inet fd address port)
+ (cffi-socket:listen fd 128)
+ (funcall *acceptor-loop* fd)))