dataqueue: Make thread safe. Each data queue gets its own mutex and each function obtains it. Fixes #7390. (cherry picked from commit 8b9a9384132e39a49a14cf41b292018ce4f4f251)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
diff --git a/src/SDL_dataqueue.c b/src/SDL_dataqueue.c
index 8abe7f7..4f63fca 100644
--- a/src/SDL_dataqueue.c
+++ b/src/SDL_dataqueue.c
@@ -33,6 +33,7 @@ typedef struct SDL_DataQueuePacket
struct SDL_DataQueue
{
+ SDL_mutex *lock;
SDL_DataQueuePacket *head; /* device fed from here. */
SDL_DataQueuePacket *tail; /* queue fills to here. */
SDL_DataQueuePacket *pool; /* these are unused packets. */
@@ -49,24 +50,26 @@ static void SDL_FreeDataQueueList(SDL_DataQueuePacket *packet)
}
}
-/* this all expects that you managed thread safety elsewhere. */
-
SDL_DataQueue *
SDL_NewDataQueue(const size_t _packetlen, const size_t initialslack)
{
- SDL_DataQueue *queue = (SDL_DataQueue *)SDL_malloc(sizeof(SDL_DataQueue));
+ SDL_DataQueue *queue = (SDL_DataQueue *)SDL_calloc(1, sizeof(SDL_DataQueue));
if (queue == NULL) {
SDL_OutOfMemory();
- return NULL;
} else {
const size_t packetlen = _packetlen ? _packetlen : 1024;
const size_t wantpackets = (initialslack + (packetlen - 1)) / packetlen;
size_t i;
- SDL_zerop(queue);
queue->packet_size = packetlen;
+ queue->lock = SDL_CreateMutex();
+ if (!queue->lock) {
+ SDL_free(queue);
+ return NULL;
+ }
+
for (i = 0; i < wantpackets; i++) {
SDL_DataQueuePacket *packet = (SDL_DataQueuePacket *)SDL_malloc(sizeof(SDL_DataQueuePacket) + packetlen);
if (packet) { /* don't care if this fails, we'll deal later. */
@@ -86,6 +89,7 @@ void SDL_FreeDataQueue(SDL_DataQueue *queue)
if (queue) {
SDL_FreeDataQueueList(queue->head);
SDL_FreeDataQueueList(queue->pool);
+ SDL_DestroyMutex(queue->lock);
SDL_free(queue);
}
}
@@ -102,6 +106,8 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack)
return;
}
+ SDL_LockMutex(queue->lock);
+
packet = queue->head;
/* merge the available pool and the current queue into one list. */
@@ -129,9 +135,12 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack)
queue->pool = NULL;
}
+ SDL_UnlockMutex(queue->lock);
+
SDL_FreeDataQueueList(packet); /* free extra packets */
}
+/* You must hold queue->lock before calling this! */
static SDL_DataQueuePacket *AllocateDataQueuePacket(SDL_DataQueue *queue)
{
SDL_DataQueuePacket *packet;
@@ -178,6 +187,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
return SDL_InvalidParamError("queue");
}
+ SDL_LockMutex(queue->lock);
+
orighead = queue->head;
origtail = queue->tail;
origlen = origtail ? origtail->datalen : 0;
@@ -201,6 +212,7 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
queue->tail = origtail;
queue->pool = NULL;
+ SDL_UnlockMutex(queue->lock);
SDL_FreeDataQueueList(packet); /* give back what we can. */
return SDL_OutOfMemory();
}
@@ -214,6 +226,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
queue->queued_bytes += datalen;
}
+ SDL_UnlockMutex(queue->lock);
+
return 0;
}
@@ -229,6 +243,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
return 0;
}
+ SDL_LockMutex(queue->lock);
+
for (packet = queue->head; len && packet; packet = packet->next) {
const size_t avail = packet->datalen - packet->startpos;
const size_t cpy = SDL_min(len, avail);
@@ -239,6 +255,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
len -= cpy;
}
+ SDL_UnlockMutex(queue->lock);
+
return (size_t)(ptr - buf);
}
@@ -254,6 +272,8 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
return 0;
}
+ SDL_LockMutex(queue->lock);
+
while ((len > 0) && ((packet = queue->head) != NULL)) {
const size_t avail = packet->datalen - packet->startpos;
const size_t cpy = SDL_min(len, avail);
@@ -279,13 +299,27 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
queue->tail = NULL; /* in case we drained the queue entirely. */
}
+ SDL_UnlockMutex(queue->lock);
+
return (size_t)(ptr - buf);
}
size_t
SDL_CountDataQueue(SDL_DataQueue *queue)
{
- return queue ? queue->queued_bytes : 0;
+ size_t retval = 0;
+ if (queue) {
+ SDL_LockMutex(queue->lock);
+ retval = queue->queued_bytes;
+ SDL_UnlockMutex(queue->lock);
+ }
+ return retval;
+}
+
+SDL_mutex *
+SDL_GetDataQueueMutex(SDL_DataQueue *queue)
+{
+ return queue ? queue->lock : NULL;
}
/* vi: set ts=4 sw=4 expandtab: */
diff --git a/src/SDL_dataqueue.h b/src/SDL_dataqueue.h
index 45ef607..aee2cda 100644
--- a/src/SDL_dataqueue.h
+++ b/src/SDL_dataqueue.h
@@ -33,6 +33,7 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *data, const size_t le
size_t SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
size_t SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
size_t SDL_CountDataQueue(SDL_DataQueue *queue);
+SDL_mutex *SDL_GetDataQueueMutex(SDL_DataQueue *queue); /* don't destroy this, obviously. */
#endif /* SDL_dataqueue_h_ */