diff --git a/bordeaux-queue.lisp b/bordeaux-queue.lisp
index e66fba9..b1d13ab 100644
--- a/bordeaux-queue.lisp
+++ b/bordeaux-queue.lisp
@@ -20,20 +20,31 @@
(length :initform 0
:accessor queue-length
:type fixnum+)
- (lock :initform (make-lock 'queue)
+ (lock :initform (make-lock "queue")
:reader queue-lock)))
+(defclass queue-blocking-read (queue)
+ ((blocking-read-cv :initform (make-condition-variable
+ :name "queue blocking read")
+ :reader queue-blocking-read-cv)))
+
(defclass queue-blocking-write (queue)
((blocking-write-cv :initform (make-condition-variable
:name "queue blocking write")
:reader queue-blocking-write-cv)))
-(defgeneric enqueue-full (queue))
+(defclass queue-blocking (queue-blocking-read queue-blocking-write)
+ ())
+(defgeneric queue-full (queue))
+(defgeneric on-enqueue (queue))
(defgeneric enqueue (queue item &optional blocking))
+(defgeneric queue-empty (queue))
+(defgeneric queue-peek (queue &optional blocking))
+
(defgeneric on-dequeue (queue))
-(defgeneric dequeue (queue))
+(defgeneric dequeue (queue &optional blocking))
(defgeneric dequeue-all (queue))
;; Initialization
@@ -51,18 +62,22 @@
(let ((vector (make-queue-vector size element-type initial-element)))
(apply #'call-next-method q (list* :vector vector initargs))))
-(defun make-queue (size)
- (make-instance 'queue :size size))
-
;; Enqueue
-(defmethod enqueue-full ((q queue))
+(defmethod queue-full ((q queue))
nil)
-(defmethod enqueue-full ((q queue-blocking-write))
+(defmethod queue-full ((q queue-blocking-write))
(with-accessors ((lock queue-lock)
(blocking-write-cv queue-blocking-write-cv)) q
- (condition-wait blocking-write-cv lock)))
+ (condition-wait blocking-write-cv lock)
+ t))
+
+(defmethod on-enqueue ((q queue))
+ nil)
+
+(defmethod on-enqueue ((q queue-blocking-read))
+ (condition-notify (queue-blocking-read-cv q)))
(defmethod enqueue ((q queue) item &optional blocking)
(assert (typep item (array-element-type (queue-vector q))))
@@ -71,44 +86,77 @@
(length queue-length)
(lock queue-lock)) q
(with-lock-held (lock)
- (labels ((fetch ()
+ (labels ((write-queue ()
(let ((vector-length (length vector))
(len length))
(cond ((= len vector-length)
- (when blocking
- (enqueue-full q)
- (fetch)))
+ (when (and blocking (queue-full q))
+ (write-queue)))
((< len vector-length)
- (setf write-index (mod (1+ write-index) vector-length)
+ (setf (aref vector write-index) item
+ write-index (mod (1+ write-index) vector-length)
length (1+ len))
+ (on-enqueue q)
t)
(t
(error "Invalid queue length"))))))
- (fetch)))))
+ (write-queue)))))
;; Dequeue
+(defmethod queue-empty ((q queue))
+ nil)
+
+(defmethod queue-empty ((q queue-blocking-read))
+ (with-accessors ((lock queue-lock)
+ (blocking-read-cv queue-blocking-read-cv)) q
+ (condition-wait blocking-read-cv lock)
+ t))
+
(defmethod on-dequeue ((q queue))
nil)
(defmethod on-dequeue ((q queue-blocking-write))
(condition-notify (queue-blocking-write-cv q)))
-(defmethod dequeue ((q queue))
+(defmethod dequeue ((q queue) &optional blocking)
+ (with-accessors ((vector queue-vector)
+ (read-index queue-read-index)
+ (length queue-length)
+ (lock queue-lock)) q
+ (with-lock-held (lock)
+ (labels ((read-queue ()
+ (let ((vector-length (length vector))
+ (len length))
+ (cond ((= 0 len)
+ (if (and blocking (queue-empty q))
+ (read-queue)
+ (values nil nil)))
+ ((< 0 len)
+ (let ((item (aref vector read-index)))
+ (setf read-index (mod (1+ read-index) vector-length)
+ length (1- len))
+ (on-dequeue q)
+ (values item t)))
+ (t
+ (error "Invalid queue length"))))))
+ (read-queue)))))
+
+(defmethod dequeue-all ((q queue))
(with-accessors ((vector queue-vector)
(read-index queue-read-index)
(length queue-length)
(lock queue-lock)) q
(with-lock-held (lock)
- (let ((vector-length (length vector))
- (len length))
- (cond ((= 0 len)
- (values nil nil))
- ((< 0 len)
- (let ((item (aref vector read-index)))
- (setf read-index (mod (1+ read-index) vector-length)
- length (1- len))
- (on-dequeue q)
- (values item t)))
- (t
- (error "Invalid queue length")))))))
+ (let* ((vector-length (length vector))
+ (len length)
+ (result (make-array `(len)
+ :element-type (array-element-type vector))))
+ (dotimes (i len)
+ (setf read-index (mod (1+ read-index) vector-length)
+ (aref result i) (aref vector read-index)))
+ (decf length len)
+ result))))
+
+#+nil
+(untrace enqueue dequeue on-enqueue on-dequeue queue-full queue-empty)
diff --git a/package.lisp b/package.lisp
index 7f73173..27a33d7 100644
--- a/package.lisp
+++ b/package.lisp
@@ -5,6 +5,9 @@
(:use :bordeaux-threads :common-lisp)
(:export
#:queue
+ #:queue-blocking-read
+ #:queue-blocking-write
+ #:queue-blocking
#:enqueue
#:dequeue
#:dequeue-all))