From 8b9a9384132e39a49a14cf41b292018ce4f4f251 Mon Sep 17 00:00:00 2001
From: "Ryan C. Gordon" <[EMAIL REDACTED]>
Date: Thu, 2 Mar 2023 16:27:32 -0500
Subject: [PATCH] dataqueue: Make thread safe.
Each data queue gets its own mutex and each function obtains it.
Fixes #7390.
---
src/SDL_dataqueue.c | 46 +++++++++++++++++++++++++++++++++++++++------
src/SDL_dataqueue.h | 1 +
2 files changed, 41 insertions(+), 6 deletions(-)
diff --git a/src/SDL_dataqueue.c b/src/SDL_dataqueue.c
index 67dccd8f09eb..4596101ba85f 100644
--- a/src/SDL_dataqueue.c
+++ b/src/SDL_dataqueue.c
@@ -32,6 +32,7 @@ typedef struct SDL_DataQueuePacket
struct SDL_DataQueue
{
+ SDL_mutex *lock;
SDL_DataQueuePacket *head; /* device fed from here. */
SDL_DataQueuePacket *tail; /* queue fills to here. */
SDL_DataQueuePacket *pool; /* these are unused packets. */
@@ -48,24 +49,26 @@ static void SDL_FreeDataQueueList(SDL_DataQueuePacket *packet)
}
}
-/* this all expects that you managed thread safety elsewhere. */
-
SDL_DataQueue *
SDL_CreateDataQueue(const size_t _packetlen, const size_t initialslack)
{
- SDL_DataQueue *queue = (SDL_DataQueue *)SDL_malloc(sizeof(SDL_DataQueue));
+ SDL_DataQueue *queue = (SDL_DataQueue *)SDL_calloc(1, sizeof(SDL_DataQueue));
if (queue == NULL) {
SDL_OutOfMemory();
- return NULL;
} else {
const size_t packetlen = _packetlen ? _packetlen : 1024;
const size_t wantpackets = (initialslack + (packetlen - 1)) / packetlen;
size_t i;
- SDL_zerop(queue);
queue->packet_size = packetlen;
+ queue->lock = SDL_CreateMutex();
+ if (!queue->lock) {
+ SDL_free(queue);
+ return NULL;
+ }
+
for (i = 0; i < wantpackets; i++) {
SDL_DataQueuePacket *packet = (SDL_DataQueuePacket *)SDL_malloc(sizeof(SDL_DataQueuePacket) + packetlen);
if (packet) { /* don't care if this fails, we'll deal later. */
@@ -85,6 +88,7 @@ void SDL_DestroyDataQueue(SDL_DataQueue *queue)
if (queue) {
SDL_FreeDataQueueList(queue->head);
SDL_FreeDataQueueList(queue->pool);
+ SDL_DestroyMutex(queue->lock);
SDL_free(queue);
}
}
@@ -101,6 +105,8 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack)
return;
}
+ SDL_LockMutex(queue->lock);
+
packet = queue->head;
/* merge the available pool and the current queue into one list. */
@@ -128,9 +134,12 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack)
queue->pool = NULL;
}
+ SDL_UnlockMutex(queue->lock);
+
SDL_FreeDataQueueList(packet); /* free extra packets */
}
+/* You must hold queue->lock before calling this! */
static SDL_DataQueuePacket *AllocateDataQueuePacket(SDL_DataQueue *queue)
{
SDL_DataQueuePacket *packet;
@@ -177,6 +186,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
return SDL_InvalidParamError("queue");
}
+ SDL_LockMutex(queue->lock);
+
orighead = queue->head;
origtail = queue->tail;
origlen = origtail ? origtail->datalen : 0;
@@ -200,6 +211,7 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
queue->tail = origtail;
queue->pool = NULL;
+ SDL_UnlockMutex(queue->lock);
SDL_FreeDataQueueList(packet); /* give back what we can. */
return SDL_OutOfMemory();
}
@@ -213,6 +225,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
queue->queued_bytes += datalen;
}
+ SDL_UnlockMutex(queue->lock);
+
return 0;
}
@@ -228,6 +242,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
return 0;
}
+ SDL_LockMutex(queue->lock);
+
for (packet = queue->head; len && packet; packet = packet->next) {
const size_t avail = packet->datalen - packet->startpos;
const size_t cpy = SDL_min(len, avail);
@@ -238,6 +254,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
len -= cpy;
}
+ SDL_UnlockMutex(queue->lock);
+
return (size_t)(ptr - buf);
}
@@ -253,6 +271,8 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
return 0;
}
+ SDL_LockMutex(queue->lock);
+
while ((len > 0) && ((packet = queue->head) != NULL)) {
const size_t avail = packet->datalen - packet->startpos;
const size_t cpy = SDL_min(len, avail);
@@ -278,12 +298,26 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
queue->tail = NULL; /* in case we drained the queue entirely. */
}
+ SDL_UnlockMutex(queue->lock);
+
return (size_t)(ptr - buf);
}
size_t
SDL_GetDataQueueSize(SDL_DataQueue *queue)
{
- return queue ? queue->queued_bytes : 0;
+ size_t retval = 0;
+ if (queue) {
+ SDL_LockMutex(queue->lock);
+ retval = queue->queued_bytes;
+ SDL_UnlockMutex(queue->lock);
+ }
+ return retval;
+}
+
+SDL_mutex *
+SDL_GetDataQueueMutex(SDL_DataQueue *queue)
+{
+ return queue ? queue->lock : NULL;
}
diff --git a/src/SDL_dataqueue.h b/src/SDL_dataqueue.h
index cb9b9aa7eaaa..51b3641e3ec0 100644
--- a/src/SDL_dataqueue.h
+++ b/src/SDL_dataqueue.h
@@ -33,5 +33,6 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *data, const size_t le
size_t SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
size_t SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
size_t SDL_GetDataQueueSize(SDL_DataQueue *queue);
+SDL_mutex *SDL_GetDataQueueMutex(SDL_DataQueue *queue); /* don't destroy this, obviously. */
#endif /* SDL_dataqueue_h_ */