Commit 40abbef24165af7602ed142971df21a63f64ecd5

Thomas de Grivel 2017-06-25T23:57:14

Allow for async reply.

diff --git a/thot.lisp b/thot.lisp
index bb7b1dd..60069a7 100644
--- a/thot.lisp
+++ b/thot.lisp
@@ -108,8 +108,14 @@
     `(labels ,(mapcar #'reader definitions)
        ,@body)))
 
-(defun request-reader (stream cont)
-  (let ((request (make-instance 'request :stream stream))
+(defun request-reader (request reply cont)
+  "Returns a HTTP request reader function which itself returns either :
+ :EOF if end of stream was reached, or
+ a closure if read would block, or
+ the return value of CONT, which should be either :
+  :KEEP-ALIVE if the connection is to be kept alive for next request, or
+  NIL if the request stream is to be closed after reply."
+  (let ((stream (request-stream request))
         (buffer (string-output-stream))
         (name "")
         (value ""))
@@ -172,7 +178,7 @@
                     (next-header))
                    (t (error "Missing header LF"))))
            (end-of-headers (char)
-             (cond ((char= #\Newline char) (funcall cont request))
+             (cond ((char= #\Newline char) (funcall cont request reply))
                    (t (error "Missing end of headers LF")))))
         #'method))))
 
@@ -185,7 +191,12 @@
             :accessor reply-headers%)
    (headers-sent :initform nil
                  :accessor reply-headers-sent%
-                 :type boolean)))
+                 :type boolean)
+   (cont-queue :initform (make-instance 'queue
+                                        :element-type 'function
+                                        :size 40)
+         :accessor reply-cont-queue%
+         :type queue)))
 
 (defvar *reply*)
 
@@ -203,8 +214,7 @@
 (defsetf reply-headers (&optional (reply '*reply*)) (value)
   `(setf (reply-headers% ,reply) ,value))
 
-(defun reply-header (name &optional (reply *reply*))
-  (declare (type reply reply))
+(defun reply-header (name &optional (reply *reply*))  (declare (type reply reply))
   (let ((header (assoc name (reply-headers reply) :test #'string-equal)))
     (when header
       (rest header))))
@@ -216,20 +226,64 @@
 (defsetf reply-headers-sent (&optional (reply '*reply*)) (value)
   `(setf (reply-headers-sent% ,reply) ,value))
 
+(defun reply-cont-queue (&optional (reply *reply*))
+  (declare (type reply reply))
+  (reply-cont-queue% reply))
+
+(defsetf reply-cont-queue (&optional (reply '*reply*)) (value)
+  `(setf (reply-cont-queue% ,reply) ,value))
+
 (defun reply-content-length (&optional (reply *reply*))
   (declare (type reply reply))
   (parse-integer (reply-header 'content-length reply)))
 
+(defun reply-stream ()
+  (request-stream))
+
 (defun status (line)
   (let ((status (reply-status)))
     (when status
       (error 'status-already-sent status line)))
   (setf (reply-status) line)
-  (let ((stream (request-stream)))
-    (write-sequence stream (request-http-version))
-    (write stream #\Space)
-    (write-sequence stream line)
-    (write-sequence stream +crlf+)))
+  (let ((stream (reply-stream)))
+    (labels ((version (&optional (start 0))
+               (multiple-value-bind (count state)
+                   (write-sequence stream (request-http-version) :start start)
+                 (case state
+                   ((nil) (space))
+                   ((:eof) (error 'stream-end-error :stream stream))
+                   ((:non-blocking) (write (reply-cont-queue)
+                                           (lambda ()
+                                             (version (+ start count)))))
+                   (otherwise (error 'stream-output-error :stream stream)))))
+             (space ()
+               (case (write stream #\Space)
+                 ((nil) (line))
+                 ((:eof) (error 'stream-end-error :stream stream))
+                 ((:non-blocking) (write (reply-cont-queue) #'space))
+                 (otherwise (error 'stream-output-error :stream stream))))
+             (line (&optional (start 0))
+               (multiple-value-bind (count state)
+                   (write-sequence stream line :start start)
+                 (case state
+                   ((nil) (crlf))
+                   ((:eof) (error 'stream-end-error :stream stream))
+                   ((:non-blocking) (write (reply-cont-queue)
+                                           (lambda () (line (+ start count)))))
+                   (otherwise (error 'stream-output-error :stream stream)))))
+             (crlf (&optional (start 0))
+               (multiple-value-bind (count state)
+                   (write-sequence stream +crlf+)
+                 (case state
+                   ((nil) nil)
+                   ((:eof) (error 'stream-end-error :stream stream))
+                   ((:non-blocking) (write (reply-cont-queue)
+                                           (lambda () (crlf (+ start count)))))
+                   (otherwise (error 'stream-output-error
+                                     :stream stream))))))
+      (if (= 0 (queue-length (reply-cont-queue)))
+          (version)
+          (write (reply-cont-queue) #'version)))))
 
 (defun header (line)
   (unless (reply-status)
@@ -243,8 +297,29 @@
              (return))
            (pop headers))))
   (let ((stream (request-stream)))
-    (write-sequence stream line)
-    (write-sequence stream +crlf+)))
+    (labels ((line (&optional (start 0))
+               (multiple-value-bind (count state)
+                   (write-sequence stream line)
+                 (case state
+                   ((nil) (crlf))
+                   ((:eof) (error 'stream-end-error :stream stream))
+                   ((:non-blocking) (write (reply-cont-queue)
+                                           (lambda () (line (+ start count)))))
+                   (otherwise (error 'stream-output-error
+                                     :stream stream)))))
+             (crlf (&optional (start 0))
+               (multiple-value-bind (count state)
+                   (write-sequence stream +crlf+)
+                 (case state
+                   ((nil) nil)
+                   ((:eof) (error 'stream-end-error :stream stream))
+                   ((:non-blocking) (write (reply-cont-queue)
+                                           (lambda () (crlf (+ start count)))))
+                   (otherwise (error 'stream-output-error
+                                     :stream stream))))))
+      (if (= 0 (queue-length (reply-cont-queue)))
+          (line)
+          (write (reply-cont-queue) #'line)))))
 
 (defun end-headers ()
   (unless (reply-headers-sent)
@@ -253,13 +328,27 @@
 
 (defun content (string)
   (end-headers)
-  (write-sequence (request-stream) string))
+  (let ((stream (reply-stream)))
+    (labels ((content-string (&optional (start 0))
+               (multiple-value-bind (count state)
+                   (write-sequence (request-stream) string)
+                 (case state
+                   ((nil) nil)
+                   (:eof) (error 'stream-end-error :stream stream)
+                   ((:non-blocking) (write (reply-cont-queue)
+                                           (lambda ()
+                                             (content-string (+ start count)))))
+                   (otherwise (error 'stream-output-error
+                                     :stream stream))))))
+      (if (= 0 (queue-length (reply-cont-queue)))
+          (content-string)
+          (write (reply-cont-queue) #'content-string)))))
 
 (defun 404-not-found ()
   (status "404 Not found")
   (header "Content-Type: text/plain")
   (content (format nil "404 Not found
-  
+
 The requested url ~S was not found on this server."
                    (request-target))))
 
@@ -269,10 +358,10 @@ The requested url ~S was not found on this server."
 (defvar *url-handlers*
   '(404-not-found-handler))
 
-(defun request-cont (request)
+(defun request-cont (request reply)
   (let ((handlers *url-handlers*)
         (*request* request)
-        (*reply* (make-instance 'reply)))
+        (*reply* reply))
     (loop
        (when (endp handlers)
          (return))
@@ -294,9 +383,11 @@ The requested url ~S was not found on this server."
   (loop
      (when *stop*
        (return))
-     (unless (eq :keep-alive
-                 (funcall (request-reader stream #'request-cont)))
-       (return))))
+     (let* ((request (make-instance 'request :stream stream))
+            (reply (make-instance 'reply))
+            (result (funcall (request-reader request reply #'request-cont))))
+       (unless (eq :keep-alive result)
+         (return)))))
 
 (defvar *acceptor-loop*)