Commit 53091e36a3b418e33133bae2f018954c006f86b8

Frank Praznik 2022-02-02T12:30:34

audio: pipewire: Remove the hard upper bound on rates and buffer sizes Remove the hard upper limit of 8192 samples and instead use the buffer sizes provided by Pipewire to determine the size of the intermediate input buffer and whether double buffering is required for output streams. This allows for higher latency streams to potentially avoid double-buffering in the output case, and we can guarantee that the intermediate input buffer will always be large enough to handle whatever Pipewire may deliver. As the buffer size calculations occur in a callback in the Pipewire processing thread itself, the stream readiness check has been modified to wait on two distinct flags set when the buffers have been configured and when the stream is ready and running.

diff --git a/src/audio/pipewire/SDL_pipewire.c b/src/audio/pipewire/SDL_pipewire.c
index 9c3d0b2..637b904 100644
--- a/src/audio/pipewire/SDL_pipewire.c
+++ b/src/audio/pipewire/SDL_pipewire.c
@@ -54,19 +54,22 @@
 #endif
 
 /*
- * These seem to be sane limits as Pipewire
- * uses them in several of it's own modules.
- *
- * NOTE: 8192 is a hard upper limit in Pipewire and
- * increasing this value can lead to buffer overflows.
+ * This seems to be a sane lower limit as Pipewire
+ * uses it in several of it's own modules.
  */
 #define PW_MIN_SAMPLES     32   /* About 0.67ms at 48kHz */
-#define PW_MAX_SAMPLES     8192 /* About 170.6ms at 48kHz */
 #define PW_BASE_CLOCK_RATE 48000
 
 #define PW_POD_BUFFER_LENGTH         1024
 #define PW_THREAD_NAME_BUFFER_LENGTH 128
 
+enum PW_READY_FLAGS
+{
+    PW_READY_FLAG_BUFFER_ADDED = 0x1,
+    PW_READY_FLAG_STREAM_READY = 0x2,
+    PW_READY_FLAG_ALL_BITS     = 0x3
+};
+
 #define PW_ID_TO_HANDLE(x) (void *)((uintptr_t)x)
 #define PW_HANDLE_TO_ID(x) (uint32_t)((uintptr_t)x)
 
@@ -997,9 +1000,9 @@ input_callback(void *data)
             this->callbackspec.callback(this->callbackspec.userdata, this->work_buffer, this->callbackspec.size);
             SDL_UnlockMutex(this->mixer_lock);
         }
-    } else { /* Flush the buffer when paused */
+    } else if (this->hidden->buffer) { /* Flush the buffer when paused */
         if (SDL_CountDataQueue(this->hidden->buffer) != 0) {
-            SDL_ClearDataQueue(this->hidden->buffer, this->hidden->buffer_period_size * 2);
+            SDL_ClearDataQueue(this->hidden->buffer, this->hidden->input_buffer_packet_size);
         }
     }
 
@@ -1007,21 +1010,57 @@ input_callback(void *data)
 }
 
 static void
+stream_add_buffer_callback(void *data, struct pw_buffer *buffer)
+{
+    _THIS = data;
+
+    if (this->iscapture == SDL_FALSE) {
+        /*
+         * Clamp the output spec samples and size to the max size of the Pipewire buffer.
+         * If they exceed the maximum size of the Pipewire buffer, double buffering will be used.
+         */
+        if (this->spec.size > buffer->buffer->datas[0].maxsize) {
+           this->spec.samples = buffer->buffer->datas[0].maxsize / this->hidden->stride;
+           this->spec.size    = buffer->buffer->datas[0].maxsize;
+        }
+    } else if (this->hidden->buffer == NULL) {
+        /*
+         * The latency of source nodes can change, so buffering is always required.
+         *
+         * Ensure that the intermediate input buffer is large enough to hold the requested
+         * application packet size or a full buffer of data from Pipewire, whichever is larger.
+         *
+         * A packet size of 2 periods should be more than is ever needed.
+         */
+        this->hidden->input_buffer_packet_size = SPA_MAX(this->spec.size, buffer->buffer->datas[0].maxsize) * 2;
+        this->hidden->buffer                   = SDL_NewDataQueue(this->hidden->input_buffer_packet_size, this->hidden->input_buffer_packet_size);
+    }
+
+    this->hidden->stream_init_status |= PW_READY_FLAG_BUFFER_ADDED;
+    PIPEWIRE_pw_thread_loop_signal(this->hidden->loop, false);
+}
+
+static void
 stream_state_changed_callback(void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error)
 {
     _THIS = data;
 
+    if (state == PW_STREAM_STATE_STREAMING) {
+        this->hidden->stream_init_status |= PW_READY_FLAG_STREAM_READY;
+    }
+
     if (state == PW_STREAM_STATE_STREAMING || state == PW_STREAM_STATE_ERROR) {
-        this->hidden->stream_initialized = 1;
         PIPEWIRE_pw_thread_loop_signal(this->hidden->loop, false);
     }
 }
 
 static const struct pw_stream_events stream_output_events = { PW_VERSION_STREAM_EVENTS,
                                                               .state_changed = stream_state_changed_callback,
+                                                              .add_buffer    = stream_add_buffer_callback,
                                                               .process       = output_callback };
 static const struct pw_stream_events stream_input_events  = { PW_VERSION_STREAM_EVENTS,
                                                              .state_changed = stream_state_changed_callback,
+                                                             .add_buffer    = stream_add_buffer_callback,
                                                              .process       = input_callback };
 
 static int
@@ -1044,14 +1083,12 @@ PIPEWIRE_OpenDevice(_THIS, const char *devname)
     struct SDL_PrivateAudioData *priv;
     struct pw_properties *       props;
     const char *                 app_name, *stream_name, *stream_role, *error;
-    const Uint32                 node_id = this->handle == NULL ? PW_ID_ANY : PW_HANDLE_TO_ID(this->handle);
-    enum pw_stream_state         state;
+    const Uint32                 node_id   = this->handle == NULL ? PW_ID_ANY : PW_HANDLE_TO_ID(this->handle);
     SDL_bool                     iscapture = this->iscapture;
     int                          res;
 
     /* Clamp the period size to sane values */
-    const int min_period       = PW_MIN_SAMPLES * SPA_MAX(this->spec.freq / PW_BASE_CLOCK_RATE, 1);
-    const int adjusted_samples = SPA_CLAMP(this->spec.samples, min_period, PW_MAX_SAMPLES);
+    const int min_period = PW_MIN_SAMPLES * SPA_MAX(this->spec.freq / PW_BASE_CLOCK_RATE, 1);
 
     /* Get the hints for the application name, stream name and role */
     app_name = SDL_GetHint(SDL_HINT_AUDIO_DEVICE_APP_NAME);
@@ -1090,22 +1127,11 @@ PIPEWIRE_OpenDevice(_THIS, const char *devname)
     /* Size of a single audio frame in bytes */
     priv->stride = (SDL_AUDIO_BITSIZE(this->spec.format) >> 3) * this->spec.channels;
 
-    if (this->spec.samples != adjusted_samples && !iscapture) {
-        this->spec.samples = adjusted_samples;
+    if (this->spec.samples < min_period) {
+        this->spec.samples = min_period;
         this->spec.size    = this->spec.samples * priv->stride;
     }
 
-    /* The latency of source nodes can change, so buffering is required. */
-    if (iscapture) {
-        priv->buffer_period_size = SPA_MAX(this->spec.samples, adjusted_samples) * priv->stride;
-
-        /* A packet size of 4 periods should be more than is ever needed (no more than 2 should be queued in practice). */
-        priv->buffer = SDL_NewDataQueue(priv->buffer_period_size * 4, priv->buffer_period_size * 2);
-        if (priv->buffer == NULL) {
-            return SDL_SetError("Pipewire: Failed to allocate source buffer");
-        }
-    }
-
     SDL_snprintf(thread_name, sizeof(thread_name), "SDLAudio%c%ld", (iscapture) ? 'C' : 'P', (long)this->handle);
     priv->loop = PIPEWIRE_pw_thread_loop_new(thread_name, NULL);
     if (priv->loop == NULL) {
@@ -1143,7 +1169,7 @@ PIPEWIRE_OpenDevice(_THIS, const char *devname)
     PIPEWIRE_pw_properties_set(props, PW_KEY_APP_NAME, app_name);
     PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_NAME, stream_name);
     PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, stream_name);
-    PIPEWIRE_pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%i", adjusted_samples, this->spec.freq);
+    PIPEWIRE_pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%i", this->spec.samples, this->spec.freq);
     PIPEWIRE_pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%u", this->spec.freq);
     PIPEWIRE_pw_properties_set(props, PW_KEY_NODE_ALWAYS_PROCESS, "true");
 
@@ -1165,19 +1191,23 @@ PIPEWIRE_OpenDevice(_THIS, const char *devname)
         return SDL_SetError("Pipewire: Failed to start stream loop");
     }
 
-    /* Wait until the stream is either running or failed */
+    /* Wait until all init flags are set or the stream has failed. */
     PIPEWIRE_pw_thread_loop_lock(priv->loop);
-    if (!priv->stream_initialized) {
+    while (priv->stream_init_status != PW_READY_FLAG_ALL_BITS &&
+           PIPEWIRE_pw_stream_get_state(priv->stream, NULL) != PW_STREAM_STATE_ERROR) {
         PIPEWIRE_pw_thread_loop_wait(priv->loop);
     }
     PIPEWIRE_pw_thread_loop_unlock(priv->loop);
 
-    state = PIPEWIRE_pw_stream_get_state(priv->stream, &error);
-
-    if (state == PW_STREAM_STATE_ERROR) {
+    if (PIPEWIRE_pw_stream_get_state(priv->stream, &error) == PW_STREAM_STATE_ERROR) {
         return SDL_SetError("Pipewire: Stream error: %s", error);
     }
 
+    /* If this is a capture stream, make sure the intermediate buffer was successfully allocated. */
+    if (iscapture && priv->buffer == NULL) {
+        return SDL_SetError("Pipewire: Failed to allocate source buffer");
+    }
+
     return 0;
 }
 
diff --git a/src/audio/pipewire/SDL_pipewire.h b/src/audio/pipewire/SDL_pipewire.h
index ba7d6fc..767b8f1 100644
--- a/src/audio/pipewire/SDL_pipewire.h
+++ b/src/audio/pipewire/SDL_pipewire.h
@@ -37,9 +37,9 @@ struct SDL_PrivateAudioData
     struct pw_context     *context;
     struct SDL_DataQueue  *buffer;
 
-    size_t buffer_period_size;
+    size_t input_buffer_packet_size;
     Sint32 stride; /* Bytes-per-frame */
-    int    stream_initialized;
+    int    stream_init_status;
 };
 
 #endif /* SDL_pipewire_h_ */