SDL: audio: pipewire: Always buffer source audio

From 9afd7570d602488c21afc6d875eb9cc874a224fe Mon Sep 17 00:00:00 2001
From: Frank Praznik <[EMAIL REDACTED]>
Date: Sat, 20 Feb 2021 13:33:12 -0500
Subject: [PATCH] audio: pipewire: Always buffer source audio

The latency of source nodes can change depending on the overall latency of the processing graph. Incoming audio must therefore always be buffered to ensure uninterrupted delivery.

The SDL_AudioStream path was removed in the input callback as the only thing it was used for was buffering audio outside of Pipewire's min/max period sizes, and that case is now handled by the omnipresent buffer.
---
 src/audio/pipewire/SDL_pipewire.c | 74 ++++++++++++++++---------------
 src/audio/pipewire/SDL_pipewire.h |  9 ++--
 2 files changed, 44 insertions(+), 39 deletions(-)

diff --git a/src/audio/pipewire/SDL_pipewire.c b/src/audio/pipewire/SDL_pipewire.c
index 220847e63..71d6a1837 100644
--- a/src/audio/pipewire/SDL_pipewire.c
+++ b/src/audio/pipewire/SDL_pipewire.c
@@ -829,6 +829,7 @@ input_callback(void *data)
   Uint8 *            src;
   _THIS                    = (SDL_AudioDevice *)data;
   struct pw_stream *stream = this->hidden->stream;
+  Uint32            offset, size;
 
   /* Shutting down, don't do anything */
   if (SDL_AtomicGet(&this->shutdown)) {
@@ -846,44 +847,31 @@ input_callback(void *data)
     return;
   }
 
-  if (SDL_AtomicGet(&this->enabled)) {
-    /* The first packet might be undersized, so pad the start with silence */
-    if (spa_buf->datas[0].chunk->size < this->spec.size) {
-      const Uint32 diff = this->spec.size - spa_buf->datas[0].chunk->size;
-      SDL_memmove(src + diff, src, diff);
-      SDL_memset(src, this->spec.silence, diff);
-    }
-  } else { /* Zero the buffer if the stream is disabled. */
-    SDL_memset(src, this->spec.silence, this->spec.size);
+  /* Calculate the offset and data size */
+  offset = SPA_MIN(spa_buf->datas[0].chunk->offset, spa_buf->datas[0].maxsize);
+  size   = SPA_MIN(spa_buf->datas[0].chunk->size, spa_buf->datas[0].maxsize - offset);
+
+  src += offset;
+
+  /* Fill the buffer with silence if the stream is disabled. */
+  if (!SDL_AtomicGet(&this->enabled)) {
+    SDL_memset(src, this->callbackspec.silence, size);
   }
 
-  if (!this->stream) {
-    /* No intermediate stream, call the application callback directly */
-    if (!SDL_AtomicGet(&this->paused)) {
+  /* Pipewire can vary the latency, so buffer all incoming data */
+  SDL_WriteToDataQueue(this->hidden->buffer, src, size);
+
+  if (!SDL_AtomicGet(&this->paused)) {
+    while (SDL_CountDataQueue(this->hidden->buffer) >= this->callbackspec.size) {
+      SDL_ReadFromDataQueue(this->hidden->buffer, this->work_buffer, this->callbackspec.size);
+
       SDL_LockMutex(this->mixer_lock);
-      this->callbackspec.callback(this->callbackspec.userdata, src, this->callbackspec.size);
+      this->callbackspec.callback(this->callbackspec.userdata, this->work_buffer, this->callbackspec.size);
       SDL_UnlockMutex(this->mixer_lock);
     }
-  } else {
-    SDL_AudioStreamPut(this->stream, src, spa_buf->datas[0].chunk->size);
-
-    if (!SDL_AtomicGet(&this->paused)) {
-      /* Fire the callback as long as we have enough data to do so. */
-      while (SDL_AudioStreamAvailable(this->stream) >= this->callbackspec.size) {
-        const Uint32 got = SDL_AudioStreamGet(this->stream, this->work_buffer, this->callbackspec.size);
-
-        SDL_assert(got == this->callbackspec.size);
-
-        SDL_LockMutex(this->mixer_lock);
-        this->callbackspec.callback(this->callbackspec.userdata, this->work_buffer, this->callbackspec.size);
-        SDL_UnlockMutex(this->mixer_lock);
-      }
-    } else {
-      /* Dummy loop to dump the buffered data while paused */
-      while (SDL_AudioStreamAvailable(this->stream) >= this->callbackspec.size) {
-        const Uint32 got = SDL_AudioStreamGet(this->stream, this->work_buffer, this->callbackspec.size);
-        SDL_assert(got == this->callbackspec.size);
-      }
+  } else { /* Keep data moving through the buffer while paused */
+    while (SDL_CountDataQueue(this->hidden->buffer) >= this->callbackspec.size) {
+      SDL_ReadFromDataQueue(this->hidden->buffer, this->work_buffer, this->callbackspec.size);
     }
   }
 
@@ -932,6 +920,7 @@ PIPEWIRE_OpenDevice(_THIS, void *handle, const char *devname, int iscapture)
   if (!stream_role || *stream_role == '\0') {
     stream_role = "Game";
   }
+
   /* Initialize the Pipewire stream info from the SDL audio spec */
   initialize_spa_info(&this->spec, &spa_info);
   params = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &spa_info);
@@ -952,11 +941,22 @@ PIPEWIRE_OpenDevice(_THIS, void *handle, const char *devname, int iscapture)
   /* Size of a single audio frame in bytes */
   priv->stride = (SDL_AUDIO_BITSIZE(this->spec.format) >> 3) * this->spec.channels;
 
-  if (this->spec.samples != adjusted_samples) {
+  if (this->spec.samples != adjusted_samples && !iscapture) {
     this->spec.samples = adjusted_samples;
     this->spec.size    = this->spec.samples * priv->stride;
   }
 
+  /* The latency of source nodes can change, so buffering is required. */
+  if (iscapture) {
+    const size_t period_size = adjusted_samples * priv->stride;
+
+    /* A packet size of 4 periods should be more than is ever needed (no more than 2 should be queued in practice). */
+    priv->buffer = SDL_NewDataQueue(period_size * 4, period_size * 2);
+    if (priv->buffer == NULL) {
+      return SDL_SetError("Pipewire: Failed to allocate source buffer");
+    }
+  }
+
   SDL_snprintf(thread_name, sizeof(thread_name), "SDLAudio%c%ld", (iscapture) ? 'C' : 'P', (long)handle);
   priv->loop = PIPEWIRE_pw_thread_loop_new(thread_name, NULL);
   if (priv->loop == NULL) {
@@ -986,7 +986,7 @@ PIPEWIRE_OpenDevice(_THIS, void *handle, const char *devname, int iscapture)
   PIPEWIRE_pw_properties_set(props, PW_KEY_MEDIA_ROLE, stream_role);
   PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_NAME, stream_name);
   PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, stream_name);
-  PIPEWIRE_pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%i", this->spec.samples, this->spec.freq);
+  PIPEWIRE_pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%i", adjusted_samples, this->spec.freq);
   PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_ALWAYS_PROCESS, "true");
 
   /*
@@ -1042,6 +1042,10 @@ static void PIPEWIRE_CloseDevice(_THIS)
     PIPEWIRE_pw_thread_loop_destroy(this->hidden->loop);
   }
 
+  if (this->hidden->buffer) {
+    SDL_FreeDataQueue(this->hidden->buffer);
+  }
+
   SDL_free(this->hidden);
 }
 
diff --git a/src/audio/pipewire/SDL_pipewire.h b/src/audio/pipewire/SDL_pipewire.h
index dde70aed9..289003b86 100644
--- a/src/audio/pipewire/SDL_pipewire.h
+++ b/src/audio/pipewire/SDL_pipewire.h
@@ -28,13 +28,14 @@
 #include <pipewire/pipewire.h>
 
 /* Hidden "this" pointer for the audio functions */
-#define _THIS SDL_AudioDevice* this
+#define _THIS SDL_AudioDevice *this
 
 struct SDL_PrivateAudioData
 {
-  struct pw_thread_loop* loop;
-  struct pw_stream*      stream;
-  struct pw_context*     context;
+  struct pw_thread_loop *loop;
+  struct pw_stream      *stream;
+  struct pw_context     *context;
+  struct SDL_DataQueue  *buffer;
 
   Sint32 stride; /* Bytes-per-frame */
 };