Commit 9afd7570d602488c21afc6d875eb9cc874a224fe

Frank Praznik 2021-02-20T13:33:12

audio: pipewire: Always buffer source audio The latency of source nodes can change depending on the overall latency of the processing graph. Incoming audio must therefore always be buffered to ensure uninterrupted delivery. The SDL_AudioStream path was removed in the input callback as the only thing it was used for was buffering audio outside of Pipewire's min/max period sizes, and that case is now handled by the omnipresent buffer.

diff --git a/src/audio/pipewire/SDL_pipewire.c b/src/audio/pipewire/SDL_pipewire.c
index 220847e..71d6a18 100644
--- a/src/audio/pipewire/SDL_pipewire.c
+++ b/src/audio/pipewire/SDL_pipewire.c
@@ -829,6 +829,7 @@ input_callback(void *data)
   Uint8 *            src;
   _THIS                    = (SDL_AudioDevice *)data;
   struct pw_stream *stream = this->hidden->stream;
+  Uint32            offset, size;
 
   /* Shutting down, don't do anything */
   if (SDL_AtomicGet(&this->shutdown)) {
@@ -846,44 +847,31 @@ input_callback(void *data)
     return;
   }
 
-  if (SDL_AtomicGet(&this->enabled)) {
-    /* The first packet might be undersized, so pad the start with silence */
-    if (spa_buf->datas[0].chunk->size < this->spec.size) {
-      const Uint32 diff = this->spec.size - spa_buf->datas[0].chunk->size;
-      SDL_memmove(src + diff, src, diff);
-      SDL_memset(src, this->spec.silence, diff);
-    }
-  } else { /* Zero the buffer if the stream is disabled. */
-    SDL_memset(src, this->spec.silence, this->spec.size);
+  /* Calculate the offset and data size */
+  offset = SPA_MIN(spa_buf->datas[0].chunk->offset, spa_buf->datas[0].maxsize);
+  size   = SPA_MIN(spa_buf->datas[0].chunk->size, spa_buf->datas[0].maxsize - offset);
+
+  src += offset;
+
+  /* Fill the buffer with silence if the stream is disabled. */
+  if (!SDL_AtomicGet(&this->enabled)) {
+    SDL_memset(src, this->callbackspec.silence, size);
   }
 
-  if (!this->stream) {
-    /* No intermediate stream, call the application callback directly */
-    if (!SDL_AtomicGet(&this->paused)) {
+  /* Pipewire can vary the latency, so buffer all incoming data */
+  SDL_WriteToDataQueue(this->hidden->buffer, src, size);
+
+  if (!SDL_AtomicGet(&this->paused)) {
+    while (SDL_CountDataQueue(this->hidden->buffer) >= this->callbackspec.size) {
+      SDL_ReadFromDataQueue(this->hidden->buffer, this->work_buffer, this->callbackspec.size);
+
       SDL_LockMutex(this->mixer_lock);
-      this->callbackspec.callback(this->callbackspec.userdata, src, this->callbackspec.size);
+      this->callbackspec.callback(this->callbackspec.userdata, this->work_buffer, this->callbackspec.size);
       SDL_UnlockMutex(this->mixer_lock);
     }
-  } else {
-    SDL_AudioStreamPut(this->stream, src, spa_buf->datas[0].chunk->size);
-
-    if (!SDL_AtomicGet(&this->paused)) {
-      /* Fire the callback as long as we have enough data to do so. */
-      while (SDL_AudioStreamAvailable(this->stream) >= this->callbackspec.size) {
-        const Uint32 got = SDL_AudioStreamGet(this->stream, this->work_buffer, this->callbackspec.size);
-
-        SDL_assert(got == this->callbackspec.size);
-
-        SDL_LockMutex(this->mixer_lock);
-        this->callbackspec.callback(this->callbackspec.userdata, this->work_buffer, this->callbackspec.size);
-        SDL_UnlockMutex(this->mixer_lock);
-      }
-    } else {
-      /* Dummy loop to dump the buffered data while paused */
-      while (SDL_AudioStreamAvailable(this->stream) >= this->callbackspec.size) {
-        const Uint32 got = SDL_AudioStreamGet(this->stream, this->work_buffer, this->callbackspec.size);
-        SDL_assert(got == this->callbackspec.size);
-      }
+  } else { /* Keep data moving through the buffer while paused */
+    while (SDL_CountDataQueue(this->hidden->buffer) >= this->callbackspec.size) {
+      SDL_ReadFromDataQueue(this->hidden->buffer, this->work_buffer, this->callbackspec.size);
     }
   }
 
@@ -932,6 +920,7 @@ PIPEWIRE_OpenDevice(_THIS, void *handle, const char *devname, int iscapture)
   if (!stream_role || *stream_role == '\0') {
     stream_role = "Game";
   }
+
   /* Initialize the Pipewire stream info from the SDL audio spec */
   initialize_spa_info(&this->spec, &spa_info);
   params = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &spa_info);
@@ -952,11 +941,22 @@ PIPEWIRE_OpenDevice(_THIS, void *handle, const char *devname, int iscapture)
   /* Size of a single audio frame in bytes */
   priv->stride = (SDL_AUDIO_BITSIZE(this->spec.format) >> 3) * this->spec.channels;
 
-  if (this->spec.samples != adjusted_samples) {
+  if (this->spec.samples != adjusted_samples && !iscapture) {
     this->spec.samples = adjusted_samples;
     this->spec.size    = this->spec.samples * priv->stride;
   }
 
+  /* The latency of source nodes can change, so buffering is required. */
+  if (iscapture) {
+    const size_t period_size = adjusted_samples * priv->stride;
+
+    /* A packet size of 4 periods should be more than is ever needed (no more than 2 should be queued in practice). */
+    priv->buffer = SDL_NewDataQueue(period_size * 4, period_size * 2);
+    if (priv->buffer == NULL) {
+      return SDL_SetError("Pipewire: Failed to allocate source buffer");
+    }
+  }
+
   SDL_snprintf(thread_name, sizeof(thread_name), "SDLAudio%c%ld", (iscapture) ? 'C' : 'P', (long)handle);
   priv->loop = PIPEWIRE_pw_thread_loop_new(thread_name, NULL);
   if (priv->loop == NULL) {
@@ -986,7 +986,7 @@ PIPEWIRE_OpenDevice(_THIS, void *handle, const char *devname, int iscapture)
   PIPEWIRE_pw_properties_set(props, PW_KEY_MEDIA_ROLE, stream_role);
   PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_NAME, stream_name);
   PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, stream_name);
-  PIPEWIRE_pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%i", this->spec.samples, this->spec.freq);
+  PIPEWIRE_pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%i", adjusted_samples, this->spec.freq);
   PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_ALWAYS_PROCESS, "true");
 
   /*
@@ -1042,6 +1042,10 @@ static void PIPEWIRE_CloseDevice(_THIS)
     PIPEWIRE_pw_thread_loop_destroy(this->hidden->loop);
   }
 
+  if (this->hidden->buffer) {
+    SDL_FreeDataQueue(this->hidden->buffer);
+  }
+
   SDL_free(this->hidden);
 }
 
diff --git a/src/audio/pipewire/SDL_pipewire.h b/src/audio/pipewire/SDL_pipewire.h
index dde70ae..289003b 100644
--- a/src/audio/pipewire/SDL_pipewire.h
+++ b/src/audio/pipewire/SDL_pipewire.h
@@ -28,13 +28,14 @@
 #include <pipewire/pipewire.h>
 
 /* Hidden "this" pointer for the audio functions */
-#define _THIS SDL_AudioDevice* this
+#define _THIS SDL_AudioDevice *this
 
 struct SDL_PrivateAudioData
 {
-  struct pw_thread_loop* loop;
-  struct pw_stream*      stream;
-  struct pw_context*     context;
+  struct pw_thread_loop *loop;
+  struct pw_stream      *stream;
+  struct pw_context     *context;
+  struct SDL_DataQueue  *buffer;
 
   Sint32 stride; /* Bytes-per-frame */
 };