Hash :
66c3a546
Author :
Thomas de Grivel
Date :
2018-06-03T23:04:09
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
(in-package :bordeaux-queue)
(deftype fixnum+ (&optional (low 0))
`(integer ,low ,most-positive-fixnum))
(deftype fixnum* (&optional (low 1))
`(integer ,low ,most-positive-fixnum))
(defclass queue ()
((vector :initarg :vector
:reader queue-vector
:type simple-vector)
(read-index :initform 0
:accessor queue-read-index
:type fixnum+)
(write-index :initform 0
:accessor queue-write-index
:type fixnum+)
(length :initform 0
:accessor queue-length
:type fixnum+)
(lock :initform (make-lock "queue")
:reader queue-lock)))
(defclass queue-blocking-read (queue)
((blocking-read-cv :initform (make-condition-variable
:name "queue blocking read")
:reader queue-blocking-read-cv)))
(defclass queue-blocking-write (queue)
((blocking-write-cv :initform (make-condition-variable
:name "queue blocking write")
:reader queue-blocking-write-cv)))
(defclass queue-blocking (queue-blocking-read queue-blocking-write)
())
(defgeneric queue-full (queue))
(defgeneric on-enqueue (queue))
(defgeneric enqueue (queue item &optional blocking))
(defgeneric queue-empty (queue))
(defgeneric queue-peek (queue &optional blocking))
(defgeneric on-dequeue (queue))
(defgeneric dequeue (queue &optional blocking))
(defgeneric dequeue-all (queue))
;; Initialization
(defun make-queue-vector (size &optional (element-type 't) initial-element)
(declare (type fixnum* size))
(assert (typep initial-element element-type))
(make-array `(,size)
:element-type element-type
:initial-element initial-element))
(defmethod initialize-instance ((q queue) &rest initargs
&key size (element-type 't)
initial-element &allow-other-keys)
(let ((vector (make-queue-vector size element-type initial-element)))
(apply #'call-next-method q (list* :vector vector initargs))))
;; Enqueue
(defmethod queue-full ((q queue))
nil)
(defmethod queue-full ((q queue-blocking-write))
(with-accessors ((lock queue-lock)
(blocking-write-cv queue-blocking-write-cv)) q
(condition-wait blocking-write-cv lock)
t))
(defmethod on-enqueue ((q queue))
nil)
(defmethod on-enqueue ((q queue-blocking-read))
(condition-notify (queue-blocking-read-cv q)))
(defmethod enqueue ((q queue) item &optional blocking)
(assert (typep item (array-element-type (queue-vector q))))
(with-accessors ((vector queue-vector)
(write-index queue-write-index)
(length queue-length)
(lock queue-lock)) q
(with-lock-held (lock)
(labels ((write-queue ()
(let ((vector-length (length vector))
(len length))
(cond ((= len vector-length)
(when (and blocking (queue-full q))
(write-queue)))
((< len vector-length)
(setf (aref vector write-index) item
write-index (mod (1+ write-index) vector-length)
length (1+ len))
(on-enqueue q)
t)
(t
(error "Invalid queue length"))))))
(write-queue)))))
;; Dequeue
(defmethod queue-empty ((q queue))
nil)
(defmethod queue-empty ((q queue-blocking-read))
(with-accessors ((lock queue-lock)
(blocking-read-cv queue-blocking-read-cv)) q
(condition-wait blocking-read-cv lock)
t))
(defmethod on-dequeue ((q queue))
nil)
(defmethod on-dequeue ((q queue-blocking-write))
(condition-notify (queue-blocking-write-cv q)))
(defmethod dequeue ((q queue) &optional blocking)
(with-accessors ((vector queue-vector)
(read-index queue-read-index)
(length queue-length)
(lock queue-lock)) q
(with-lock-held (lock)
(labels ((read-queue ()
(let ((vector-length (length vector))
(len length))
(cond ((= 0 len)
(if (and blocking (queue-empty q))
(read-queue)
(values nil nil)))
((< 0 len)
(let ((item (aref vector read-index)))
(setf read-index (mod (1+ read-index) vector-length)
length (1- len))
(on-dequeue q)
(values item t)))
(t
(error "Invalid queue length"))))))
(read-queue)))))
(defmethod dequeue-all ((q queue))
(with-accessors ((vector queue-vector)
(read-index queue-read-index)
(length queue-length)
(lock queue-lock)) q
(with-lock-held (lock)
(let* ((vector-length (length vector))
(len length)
(result (make-array `(,len)
:element-type (array-element-type vector))))
(dotimes (i len)
(setf read-index (mod (1+ read-index) vector-length)
(aref result i) (aref vector read-index)))
(decf length len)
result))))
#+nil
(untrace enqueue dequeue on-enqueue on-dequeue queue-full queue-empty)