]> git.ipfire.org Git - thirdparty/shairport-sync.git/commitdiff
Complete replacement of the PipeWire backend with a new implementation with full...
authorMike Brady <4265913+mikebrady@users.noreply.github.com>
Sun, 1 Oct 2023 08:30:29 +0000 (09:30 +0100)
committerMike Brady <4265913+mikebrady@users.noreply.github.com>
Sun, 1 Oct 2023 08:30:29 +0000 (09:30 +0100)
audio_pw.c

index c4677dc9c40c328b984465a50cd84529eca5f8ef..663dba12eef7e37dd464697482d507d04941d5e5 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Asynchronous PipeWire Backend. This file is part of Shairport Sync.
- * Copyright (c) Mike Brady 2017-2023
+ * Copyright (c) Mike Brady 2023
  * All rights reserved.
  *
  * Permission is hereby granted, free of charge, to any person
  * OTHER DEALINGS IN THE SOFTWARE.
  */
 
+// This uses ideas from the tone generator sample code at:
+// https://github.com/PipeWire/pipewire/blob/master/src/examples/audio-src.c
+// Thanks to the Wim Taymans.
+
 #include "audio.h"
 #include "common.h"
 #include <errno.h>
@@ -39,7 +43,6 @@
 #define DEFAULT_FORMAT SPA_AUDIO_FORMAT_S16_LE
 #define DEFAULT_RATE 44100
 #define DEFAULT_CHANNELS 2
-#define DEFAULT_VOLUME 0.7
 
 // Four seconds buffer -- should be plenty
 #define buffer_allocation 44100 * 4 * 2 * 2
@@ -53,9 +56,9 @@ static size_t audio_occupancy;
 uint64_t starting_time;
 
 struct timing_data {
-  int pw_time_is_valid; //set when the pw_time has been set
+  int pw_time_is_valid;     // set when the pw_time has been set
   struct pw_time time_info; // information about the last time a process callback occurred
-  size_t frames; // the number of frames sent at that time
+  size_t frames;            // the number of frames sent at that time
 };
 
 // to avoid using a mutex, write the same data twice and check they are the same
@@ -66,8 +69,6 @@ struct timing_data timing_data_1, timing_data_2;
 struct data {
   struct pw_thread_loop *loop;
   struct pw_stream *stream;
-
-  double accumulator;
 };
 
 // the pipewire global data structure
@@ -75,14 +76,46 @@ struct data data = {
     0,
 };
 
-static int fill(void *dest, int max_frames, int stride) {
-  size_t bytes_we_can_transfer = max_frames * stride;
+static void on_state_changed(__attribute__((unused)) void *userdata, enum pw_stream_state old,
+                             enum pw_stream_state state,
+                             __attribute__((unused)) const char *error) {
+  // struct pw_data *pw = userdata;
+  debug(3, "pw: stream state changed %s -> %s", pw_stream_state_as_string(old),
+        pw_stream_state_as_string(state));
+}
+
+static void on_process(void *userdata) {
+
+  struct data *data = userdata;
+  int n_frames = 0;
+
   pthread_mutex_lock(&buffer_mutex);
-  if (bytes_we_can_transfer > audio_occupancy)
-    bytes_we_can_transfer = audio_occupancy;
-  pthread_mutex_unlock(&buffer_mutex);
-  if (bytes_we_can_transfer > 0) {
+
+  if (audio_occupancy > 0) {
+
+    // get a buffer to see how big it can be
+    struct pw_buffer *b = pw_stream_dequeue_buffer(data->stream);
+    if (b == NULL) {
+      pw_log_warn("out of buffers: %m");
+      return;
+    }
+    struct spa_buffer *buf = b->buffer;
+    uint8_t *dest = buf->datas[0].data;
+    if (dest == NULL) // the first data block does not contain a data pointer
+      return;
+
+    int stride = sizeof(int16_t) * DEFAULT_CHANNELS;
+    int max_possible_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride);
+
+    size_t bytes_we_can_transfer = max_possible_frames * stride;
+
+    if (bytes_we_can_transfer > audio_occupancy)
+      bytes_we_can_transfer = audio_occupancy;
+
+    n_frames = bytes_we_can_transfer / stride;
+
     size_t bytes_to_end_of_buffer = (size_t)(audio_umb - audio_toq); // must be zero or positive
+
     if (bytes_we_can_transfer <= bytes_to_end_of_buffer) {
       // the bytes are all in a row in the audio buffer
       memcpy(dest, audio_toq, bytes_we_can_transfer);
@@ -96,81 +129,42 @@ static int fill(void *dest, int max_frames, int stride) {
       memcpy(new_dest, audio_lmb, bytes_we_can_transfer - first_portion_to_write);
       audio_toq = audio_lmb + bytes_we_can_transfer - first_portion_to_write;
     }
-    // lock
-    pthread_mutex_lock(&buffer_mutex);
-    audio_occupancy -= bytes_we_can_transfer;
-    pthread_mutex_unlock(&buffer_mutex);
-    // unlock
-  }
-  return bytes_we_can_transfer / stride; // back to numbers of frames
-}
 
-/* our data processing function is in general:
- *
- *  struct pw_buffer *b;
- *  b = pw_stream_dequeue_buffer(stream);
- *
- *  .. generate stuff in the buffer ...
- *
- *  pw_stream_queue_buffer(stream, b);
- */
-static void on_process(void *userdata) {
-  struct data *data = userdata;
-  
-  struct pw_time time_info;
-  memset(&time_info, 0, sizeof(time_info));
-  
-  struct pw_buffer *b;
-  struct spa_buffer *buf;
-  int max_possible_frames, n_frames, stride;
-  uint8_t *p;
-
-  if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) {
-    pw_log_warn("out of buffers: %m");
-    return;
-  }
+    buf->datas[0].chunk->offset = 0;
+    buf->datas[0].chunk->stride = stride;
+    buf->datas[0].chunk->size = n_frames * stride;
+    pw_stream_queue_buffer(data->stream, b);
+    debug(3, "Queueing %d frames for output.", n_frames);
 
-  buf = b->buffer;
-  if ((p = buf->datas[0].data) == NULL) // the first data block does not contain a data pointer
-    return;
+    audio_occupancy -= bytes_we_can_transfer;
+  }
+  pthread_mutex_unlock(&buffer_mutex);
 
-  stride = sizeof(int16_t) * DEFAULT_CHANNELS;
-  max_possible_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride);
-  
-  do {
-    n_frames = fill(p, max_possible_frames, stride);
-    if (n_frames == 0) {
-      usleep(1000);
-    }
-  } while(n_frames == 0);
-
-  buf->datas[0].chunk->offset = 0;
-  buf->datas[0].chunk->stride = stride;
-  buf->datas[0].chunk->size = n_frames * stride;
-  debug(3, "Queueing %d frames for output.", n_frames);
-  
-  if (pw_stream_get_time_n(data->stream, &timing_data_1.time_info, sizeof(time_info)) == 0)
+  timing_data_1.frames = n_frames;
+  if (pw_stream_get_time_n(data->stream, &timing_data_1.time_info, sizeof(struct timing_data)) == 0)
     timing_data_1.pw_time_is_valid = 1;
   else
     timing_data_1.pw_time_is_valid = 0;
   __sync_synchronize();
   memcpy((char *)&timing_data_2, (char *)&timing_data_1, sizeof(struct timing_data));
   __sync_synchronize();
-  pw_stream_queue_buffer(data->stream, b);
-
 }
 
-
 static const struct pw_stream_events stream_events = {
-    PW_VERSION_STREAM_EVENTS,
-    .process = on_process,
-};
+    PW_VERSION_STREAM_EVENTS, .process = on_process, .state_changed = on_state_changed};
+
+static void deinit(void) {
+  pw_thread_loop_stop(data.loop);
+  pw_stream_destroy(data.stream);
+  pw_thread_loop_destroy(data.loop);
+  pw_deinit();
+  free(audio_lmb); // deallocate that buffer
+}
 
 static int init(__attribute__((unused)) int argc, __attribute__((unused)) char **argv) {
-  debug(1, "pw_init");
   // set up default values first
-  memset(&timing_data_1,0,sizeof(struct timing_data));
-  memset(&timing_data_2,0,sizeof(struct timing_data));
+  memset(&timing_data_1, 0, sizeof(struct timing_data));
+  memset(&timing_data_2, 0, sizeof(struct timing_data));
   config.audio_backend_buffer_desired_length = 0.35;
   config.audio_backend_buffer_interpolation_threshold_in_seconds =
       0.02; // below this, soxr interpolation will not occur -- it'll be basic interpolation
@@ -207,30 +201,17 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char *
   int largc = 0;
   pw_init(&largc, NULL);
 
-  /* make a main loop. If you already have another main loop, you can add
-   * the fd of this pipewire mainloop to it. */
-  data.loop = pw_thread_loop_new("tone-generator", NULL);
+  /* make a threaded loop. */
+  data.loop = pw_thread_loop_new("shairport-sync", NULL);
 
   pw_thread_loop_lock(data.loop);
 
   pw_thread_loop_start(data.loop);
 
-  /* Create a simple stream, the simple stream manages the core and remote
-   * objects for you if you don't need to deal with them.
-   *
-   * If you plan to autoconnect your stream, you need to provide at least
-   * media, category and role properties.
-   *
-   * Pass your events and a user_data pointer as the last arguments. This
-   * will inform you about the stream state. The most important event
-   * you need to listen to is the process event where you need to produce
-   * the data.
-   */
-
   props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Playback",
                             PW_KEY_MEDIA_ROLE, "Music", PW_KEY_APP_NAME, "Shairport Sync", NULL);
 
-  data.stream = pw_stream_new_simple(pw_thread_loop_get_loop(data.loop), "audio-src-tg", props,
+  data.stream = pw_stream_new_simple(pw_thread_loop_get_loop(data.loop), "shairport-sync", props,
                                      &stream_events, &data);
 
   /* Make one parameter with the supported formats. The SPA_PARAM_EnumFormat
@@ -248,58 +229,42 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char *
                     params, 1);
 
   pw_thread_loop_unlock(data.loop);
-  debug(1, "pa_init done");
   return 0;
 }
 
-static void deinit(void) {
-  pw_thread_loop_unlock(data.loop);
-  debug(1, "pipewire: deinit deactivated the stream");
-  pw_thread_loop_signal(data.loop, false);
-  pw_thread_loop_wait(data.loop);
-  debug(1, "pipewire: deinit loop wait done");
-  pw_thread_loop_stop(data.loop);
-  debug(1, "pipewire: deinit loop stopped");
-  pw_stream_destroy(data.stream);
-  debug(1, "pipewire: deinit stream destroyed");    
-  pw_thread_loop_destroy(data.loop);
-  debug(1, "pipewire: deinit loop destroyed");  
-  pw_deinit();
-  debug(1, "pipewire: pw_deinit terminated");  
-  free(audio_lmb); // deallocate that buffer
-  debug(1, "pipewire: deinit done");
-}
-
 static void start(__attribute__((unused)) int sample_rate,
-                  __attribute__((unused)) int sample_format) {
-}
+                  __attribute__((unused)) int sample_format) {}
 
 static int play(__attribute__((unused)) void *buf, int samples,
                 __attribute__((unused)) int sample_type, __attribute__((unused)) uint32_t timestamp,
-                __attribute__((unused)) uint64_t playtime) {  
+                __attribute__((unused)) uint64_t playtime) {
   // copy the samples into the queue
+  debug(3, "play %u samples; %u bytes already in the buffer.", samples, audio_occupancy);
   size_t bytes_to_transfer = samples * 2 * 2;
-  size_t space_to_end_of_buffer = audio_umb - audio_eoq;
-  if (space_to_end_of_buffer >= bytes_to_transfer) {
-    memcpy(audio_eoq, buf, bytes_to_transfer);
-    pthread_mutex_lock(&buffer_mutex);
-    audio_occupancy += bytes_to_transfer;
-    pthread_mutex_unlock(&buffer_mutex);
-    audio_eoq += bytes_to_transfer;
-  } else {
-    memcpy(audio_eoq, buf, space_to_end_of_buffer);
-    buf += space_to_end_of_buffer;
-    memcpy(audio_lmb, buf, bytes_to_transfer - space_to_end_of_buffer);
-    pthread_mutex_lock(&buffer_mutex);
+  pthread_mutex_lock(&buffer_mutex);
+  size_t bytes_available = audio_size - audio_occupancy;
+  if (bytes_available < bytes_to_transfer)
+    bytes_to_transfer = bytes_available;
+  if (bytes_to_transfer > 0) {
+    size_t space_to_end_of_buffer = audio_umb - audio_eoq;
+    if (space_to_end_of_buffer >= bytes_to_transfer) {
+      memcpy(audio_eoq, buf, bytes_to_transfer);
+      audio_eoq += bytes_to_transfer;
+    } else {
+      memcpy(audio_eoq, buf, space_to_end_of_buffer);
+      buf += space_to_end_of_buffer;
+      memcpy(audio_lmb, buf, bytes_to_transfer - space_to_end_of_buffer);
+      audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer;
+    }
     audio_occupancy += bytes_to_transfer;
-    pthread_mutex_unlock(&buffer_mutex);
-    audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer;
   }
-  debug(3, "play %d samples; %d bytes in buffer.", samples, audio_occupancy);
+  pthread_mutex_unlock(&buffer_mutex);
   return 0;
 }
 
 int delay(long *the_delay) {
+  long result = 0;
+  int reply = 0;
   // find out what's already in the PipeWire system and when
   struct timing_data timing_data;
   int loop_count = 1;
@@ -307,40 +272,44 @@ int delay(long *the_delay) {
     memcpy(&timing_data, (char *)&timing_data_1, sizeof(struct timing_data));
     __sync_synchronize();
     if (memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) {
-        usleep(2); // microseconds
-        loop_count++;
-        __sync_synchronize();
+      usleep(2); // microseconds
+      loop_count++;
+      __sync_synchronize();
     }
-  } while ((memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) && (loop_count < 10));
+  } while ((memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) &&
+           (loop_count < 10));
   long total_delay_now_frames_long = 0;
   if ((loop_count < 10) && (timing_data.pw_time_is_valid != 0)) {
-      struct timespec time_now;
+    struct timespec time_now;
     clock_gettime(CLOCK_MONOTONIC, &time_now);
-    int64_t interval_from_process_time_to_now = SPA_TIMESPEC_TO_NSEC(&time_now) - timing_data.time_info.now;
+    int64_t interval_from_process_time_to_now =
+        SPA_TIMESPEC_TO_NSEC(&time_now) - timing_data.time_info.now;
     int64_t delay_in_ns = timing_data.time_info.delay + timing_data.time_info.buffered;
     delay_in_ns = delay_in_ns * 1000000000;
     delay_in_ns = delay_in_ns * timing_data.time_info.rate.num;
     delay_in_ns = delay_in_ns / timing_data.time_info.rate.denom;
-    
+
     int64_t total_delay_now_ns = delay_in_ns - interval_from_process_time_to_now;
-    int64_t total_delay_now_frames = (total_delay_now_ns * 44100)/1000000000 + timing_data.frames;
+    int64_t total_delay_now_frames = (total_delay_now_ns * 44100) / 1000000000 + timing_data.frames;
     total_delay_now_frames_long = total_delay_now_frames;
-    debug(3, "total delay in frames: % " PRId64 ", %ld.", total_delay_now_frames, total_delay_now_frames_long);
-
-    debug(3,
-          "interval_from_process_time_to_now: %" PRId64 " ns, "
-          "delay_in_ns: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".",
-          // delay_timing_data.time_info.rate.num, delay_timing_data.time_info.rate.denom, 
-          interval_from_process_time_to_now, delay_in_ns,
-          timing_data.time_info.queued, timing_data.time_info.buffered);
+    debug(3, "total delay in frames: %ld.", total_delay_now_frames_long);
 
+    if (timing_data.time_info.queued != 0) {
+      debug(1, "buffers queued: %d", timing_data.time_info.queued);
+    }
+    /*
+        debug(3,
+              "interval_from_process_time_to_now: %" PRId64 " ns, "
+              "delay_in_ns: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".",
+              // delay_timing_data.time_info.rate.num, delay_timing_data.time_info.rate.denom,
+              interval_from_process_time_to_now, delay_in_ns,
+              timing_data.time_info.queued, timing_data.time_info.buffered);
+    */
 
   } else {
     debug(1, "can't get time info.");
   }
 
-  long result = 0;
-  int reply = 0;
   pthread_mutex_lock(&buffer_mutex);
   result = total_delay_now_frames_long + audio_occupancy / (2 * 2);
   pthread_mutex_unlock(&buffer_mutex);
@@ -348,19 +317,18 @@ int delay(long *the_delay) {
   return reply;
 }
 
-
 static void flush(void) {
+  pthread_mutex_lock(&buffer_mutex);
   audio_toq = audio_eoq = audio_lmb;
   audio_umb = audio_lmb + audio_size;
-  pthread_mutex_lock(&buffer_mutex);
   audio_occupancy = 0;
   pthread_mutex_unlock(&buffer_mutex);
 }
 
 static void stop(void) {
+  pthread_mutex_lock(&buffer_mutex);
   audio_toq = audio_eoq = audio_lmb;
   audio_umb = audio_lmb + audio_size;
-  pthread_mutex_lock(&buffer_mutex);
   audio_occupancy = 0;
   pthread_mutex_unlock(&buffer_mutex);
 }