Commit bd3934ff9fe45fb640c4cb85c4016ef367a3a416

Thomas de Grivel 2017-06-21T11:26:28

queue-blocking

diff --git a/bordeaux-queue.lisp b/bordeaux-queue.lisp
index e66fba9..b1d13ab 100644
--- a/bordeaux-queue.lisp
+++ b/bordeaux-queue.lisp
@@ -20,20 +20,31 @@
    (length :initform 0
 	   :accessor queue-length
 	   :type fixnum+)
-   (lock :initform (make-lock 'queue)
+   (lock :initform (make-lock "queue")
 	 :reader queue-lock)))
 
+(defclass queue-blocking-read (queue)
+  ((blocking-read-cv :initform (make-condition-variable
+				:name "queue blocking read")
+		      :reader queue-blocking-read-cv)))
+
 (defclass queue-blocking-write (queue)
   ((blocking-write-cv :initform (make-condition-variable
 				 :name "queue blocking write")
 		      :reader queue-blocking-write-cv)))
 
-(defgeneric enqueue-full (queue))
+(defclass queue-blocking (queue-blocking-read queue-blocking-write)
+  ())
 
+(defgeneric queue-full (queue))
+(defgeneric on-enqueue (queue))
 (defgeneric enqueue (queue item &optional blocking))
 
+(defgeneric queue-empty (queue))
+(defgeneric queue-peek (queue &optional blocking))
+
 (defgeneric on-dequeue (queue))
-(defgeneric dequeue (queue))
+(defgeneric dequeue (queue &optional blocking))
 (defgeneric dequeue-all (queue))
 
 ;;  Initialization
@@ -51,18 +62,22 @@
   (let ((vector (make-queue-vector size element-type initial-element)))
     (apply #'call-next-method q (list* :vector vector initargs))))
 
-(defun make-queue (size)
-  (make-instance 'queue :size size))
-
 ;;  Enqueue
 
-(defmethod enqueue-full ((q queue))
+(defmethod queue-full ((q queue))
   nil)
 
-(defmethod enqueue-full ((q queue-blocking-write))
+(defmethod queue-full ((q queue-blocking-write))
   (with-accessors ((lock queue-lock)
 		   (blocking-write-cv queue-blocking-write-cv)) q
-    (condition-wait blocking-write-cv lock)))
+    (condition-wait blocking-write-cv lock)
+    t))
+
+(defmethod on-enqueue ((q queue))
+  nil)
+
+(defmethod on-enqueue ((q queue-blocking-read))
+  (condition-notify (queue-blocking-read-cv q)))
 
 (defmethod enqueue ((q queue) item &optional blocking)
   (assert (typep item (array-element-type (queue-vector q))))
@@ -71,44 +86,77 @@
 		   (length queue-length)
 		   (lock queue-lock)) q
     (with-lock-held (lock)
-      (labels ((fetch ()
+      (labels ((write-queue ()
 		 (let ((vector-length (length vector))
 		       (len length))
 		   (cond ((= len vector-length)
-			  (when blocking
-			    (enqueue-full q)
-			    (fetch)))
+			  (when (and blocking (queue-full q))
+			    (write-queue)))
 			 ((< len vector-length)
-			  (setf write-index (mod (1+ write-index) vector-length)
+			  (setf (aref vector write-index) item
+				write-index (mod (1+ write-index) vector-length)
 				length (1+ len))
+			  (on-enqueue q)
 			  t)
 			 (t
 			  (error "Invalid queue length"))))))
-	(fetch)))))
+	(write-queue)))))
 
 ;;  Dequeue
 
+(defmethod queue-empty ((q queue))
+  nil)
+
+(defmethod queue-empty ((q queue-blocking-read))
+  (with-accessors ((lock queue-lock)
+		   (blocking-read-cv queue-blocking-read-cv)) q
+    (condition-wait blocking-read-cv lock)
+    t))
+
 (defmethod on-dequeue ((q queue))
   nil)
 
 (defmethod on-dequeue ((q queue-blocking-write))
   (condition-notify (queue-blocking-write-cv q)))
 
-(defmethod dequeue ((q queue))
+(defmethod dequeue ((q queue) &optional blocking)
+  (with-accessors ((vector queue-vector)
+		   (read-index queue-read-index)
+		   (length queue-length)
+		   (lock queue-lock)) q
+    (with-lock-held (lock)
+      (labels ((read-queue ()
+		 (let ((vector-length (length vector))
+		       (len length))
+		   (cond ((= 0 len)
+			  (if (and blocking (queue-empty q))
+			      (read-queue)
+			      (values nil nil)))
+			 ((< 0 len)
+			  (let ((item (aref vector read-index)))
+			    (setf read-index (mod (1+ read-index) vector-length)
+				  length (1- len))
+			    (on-dequeue q)
+			    (values item t)))
+			 (t
+			  (error "Invalid queue length"))))))
+	(read-queue)))))
+
+(defmethod dequeue-all ((q queue))
   (with-accessors ((vector queue-vector)
 		   (read-index queue-read-index)
 		   (length queue-length)
 		   (lock queue-lock)) q
     (with-lock-held (lock)
-      (let ((vector-length (length vector))
-	    (len length))
-	(cond ((= 0 len)
-	       (values nil nil))
-	      ((< 0 len)
-	       (let ((item (aref vector read-index)))
-		 (setf read-index (mod (1+ read-index) vector-length)
-		       length (1- len))
-		 (on-dequeue q)
-		 (values item t)))
-	      (t
-	       (error "Invalid queue length")))))))
+      (let* ((vector-length (length vector))
+	     (len length)
+	     (result (make-array `(len)
+				 :element-type (array-element-type vector))))
+	(dotimes (i len)
+	  (setf read-index (mod (1+ read-index) vector-length)
+		(aref result i) (aref vector read-index)))
+	(decf length len)
+	result))))
+
+#+nil
+(untrace enqueue dequeue on-enqueue on-dequeue queue-full queue-empty)
diff --git a/package.lisp b/package.lisp
index 7f73173..27a33d7 100644
--- a/package.lisp
+++ b/package.lisp
@@ -5,6 +5,9 @@
   (:use :bordeaux-threads :common-lisp)
   (:export
    #:queue
+   #:queue-blocking-read
+   #:queue-blocking-write
+   #:queue-blocking
    #:enqueue
    #:dequeue
    #:dequeue-all))