]> git.ipfire.org Git - thirdparty/shairport-sync.git/commitdiff
Replace the pipewire backend completely. Play writes to a buffer. The on_process...
authorMike Brady <4265913+mikebrady@users.noreply.github.com>
Sat, 30 Sep 2023 14:45:41 +0000 (15:45 +0100)
committerMike Brady <4265913+mikebrady@users.noreply.github.com>
Sat, 30 Sep 2023 14:45:41 +0000 (15:45 +0100)
audio_pw.c

index 344371a4558835451a6b53e38ece31b5a7b26052..33b27c7e8fe906267c8dca764044277096a6a77c 100644 (file)
@@ -24,7 +24,6 @@
  * OTHER DEALINGS IN THE SOFTWARE.
  */
 
-
 #include "audio.h"
 #include "common.h"
 #include <errno.h>
 #include <string.h>
 #include <unistd.h>
 
-#include <math.h> // many not need this after development
-
-#include <spa/param/audio/format-utils.h>
 #include <pipewire/pipewire.h>
+#include <spa/param/audio/format-utils.h>
 
 // note -- these are hacked and hardwired into this code.
 #define DEFAULT_FORMAT SPA_AUDIO_FORMAT_S16_LE
 // Four seconds buffer -- should be plenty
 #define buffer_allocation 44100 * 4 * 2 * 2
 
-// static pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER;
 
-char *audio_lmb, *audio_umb, *audio_toq, *audio_eoq;
-size_t audio_size = buffer_allocation;
-size_t audio_occupancy;
-
-#define M_PI_M2 (M_PI + M_PI)
+static char *audio_lmb, *audio_umb, *audio_toq, *audio_eoq;
+static size_t audio_size = buffer_allocation;
+static size_t audio_occupancy;
 
 uint64_t starting_time;
 
+struct timing_data {
+  int pw_time_is_valid; //set when the pw_time has been set
+  struct pw_time time_info; // information about the last time a process callback occurred
+  size_t frames; // the number of frames sent at that time
+};
+
+// to avoid using a mutex, write the same data twice and check they are the same
+// to ensure they are consistent. Make sure the first is written strictly before the second
+// using __sync_synchronize();
+struct timing_data timing_data_1, timing_data_2;
+
 struct data {
   struct pw_thread_loop *loop;
   struct pw_stream *stream;
@@ -69,22 +75,34 @@ struct data data = {
     0,
 };
 
-static void fill_le16(struct data *d, void *dest, int n_frames) {
-  //float *dst = dest, val;
-  float val;
-  int16_t *dst = dest, le16val;
-  int i, c;
-
-  for (i = 0; i < n_frames; i++) {
-    d->accumulator += M_PI_M2 * 440 / DEFAULT_RATE;
-    if (d->accumulator >= M_PI_M2)
-      d->accumulator -= M_PI_M2;
-
-    val = sin(d->accumulator) * DEFAULT_VOLUME;
-    le16val = INT16_MAX * val;
-    for (c = 0; c < DEFAULT_CHANNELS; c++)
-      *dst++ = le16val;
+static int fill(void *dest, int max_frames, int stride) {
+  size_t bytes_we_can_transfer = max_frames * stride;
+  pthread_mutex_lock(&buffer_mutex);
+  if (bytes_we_can_transfer > audio_occupancy)
+    bytes_we_can_transfer = audio_occupancy;
+  pthread_mutex_unlock(&buffer_mutex);
+  if (bytes_we_can_transfer > 0) {
+    size_t bytes_to_end_of_buffer = (size_t)(audio_umb - audio_toq); // must be zero or positive
+    if (bytes_we_can_transfer <= bytes_to_end_of_buffer) {
+      // the bytes are all in a row in the audio buffer
+      memcpy(dest, audio_toq, bytes_we_can_transfer);
+      audio_toq += bytes_we_can_transfer;
+    } else {
+      // the bytes are in two places in the audio buffer
+      size_t first_portion_to_write = audio_umb - audio_toq;
+      if (first_portion_to_write != 0)
+        memcpy(dest, audio_toq, first_portion_to_write);
+      uint8_t *new_dest = dest + first_portion_to_write;
+      memcpy(new_dest, audio_lmb, bytes_we_can_transfer - first_portion_to_write);
+      audio_toq = audio_lmb + bytes_we_can_transfer - first_portion_to_write;
+    }
+    // lock
+    pthread_mutex_lock(&buffer_mutex);
+    audio_occupancy -= bytes_we_can_transfer;
+    pthread_mutex_unlock(&buffer_mutex);
+    // unlock
   }
+  return bytes_we_can_transfer / stride; // back to numbers of frames
 }
 
 /* our data processing function is in general:
@@ -97,57 +115,52 @@ static void fill_le16(struct data *d, void *dest, int n_frames) {
  *  pw_stream_queue_buffer(stream, b);
  */
 static void on_process(void *userdata) {
-  int wait;
-  do {
-    uint64_t time_now = get_absolute_time_in_ns();
-    int64_t elapsed_time = time_now - starting_time;
-    double elapsed_time_seconds = fmod(elapsed_time * 0.000000001, 10.0);
-    wait = (elapsed_time_seconds > 4.0) && (elapsed_time_seconds < 6.0);
-    if (wait != 0) {
-    //  debug(1, "wait...");
-      usleep(1000);
-    }
-  } while (wait != 0);
   struct data *data = userdata;
+  
+  struct pw_time time_info;
+  memset(&time_info, 0, sizeof(time_info));
+  
   struct pw_buffer *b;
   struct spa_buffer *buf;
-  int n_frames, stride;
+  int max_possible_frames, n_frames, stride;
   uint8_t *p;
 
   if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) {
     pw_log_warn("out of buffers: %m");
     return;
   }
-  
-  struct pw_time time_info;
-  memset(&time_info, 0, sizeof(time_info));
-
-  int response = pw_stream_get_time_n(data->stream, &time_info, sizeof(time_info));    
-  if (response == 0) {
-    struct timespec ts;
-    clock_gettime(CLOCK_MONOTONIC, &ts);
-    int64_t diff = SPA_TIMESPEC_TO_NSEC(&ts) - time_info.now;
-    int64_t elapsed = (time_info.rate.denom * diff) / (time_info.rate.num * SPA_NSEC_PER_SEC);
-    debug(1, "rate.num: %" PRId64 ", rate.denom: %" PRId64 ", diff: %" PRId64 "ns, %" PRId64 " frames, delay: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".", time_info.rate.num, time_info.rate.denom, diff, elapsed, time_info.delay, time_info.queued, time_info.buffered);  
-  } else {
-    debug(1, "can't get time info: %d.", response);
-  }
 
   buf = b->buffer;
-  if ((p = buf->datas[0].data) == NULL)
+  if ((p = buf->datas[0].data) == NULL) // the first data block does not contain a data pointer
     return;
 
   stride = sizeof(int16_t) * DEFAULT_CHANNELS;
-  n_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride);
-
-  fill_le16(data, p, n_frames);
+  max_possible_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride);
+  
+  do {
+    n_frames = fill(p, max_possible_frames, stride);
+    if (n_frames == 0) {
+      usleep(1000);
+    }
+  } while(n_frames == 0);
 
   buf->datas[0].chunk->offset = 0;
   buf->datas[0].chunk->stride = stride;
   buf->datas[0].chunk->size = n_frames * stride;
+  debug(3, "Queueing %d frames for output.", n_frames);
+  
+  if (pw_stream_get_time_n(data->stream, &timing_data_1.time_info, sizeof(time_info)) == 0)
+    timing_data_1.pw_time_is_valid = 1;
+  else
+    timing_data_1.pw_time_is_valid = 0;
+  __sync_synchronize();
+  memcpy((char *)&timing_data_2, (char *)&timing_data_1, sizeof(struct timing_data));
+  __sync_synchronize();
   pw_stream_queue_buffer(data->stream, b);
+
 }
 
+
 static const struct pw_stream_events stream_events = {
     PW_VERSION_STREAM_EVENTS,
     .process = on_process,
@@ -156,6 +169,8 @@ static const struct pw_stream_events stream_events = {
 static int init(__attribute__((unused)) int argc, __attribute__((unused)) char **argv) {
   debug(1, "pw_init");
   // set up default values first
+  memset(&timing_data_1,0,sizeof(struct timing_data));
+  memset(&timing_data_2,0,sizeof(struct timing_data));
   config.audio_backend_buffer_desired_length = 0.35;
   config.audio_backend_buffer_interpolation_threshold_in_seconds =
       0.02; // below this, soxr interpolation will not occur -- it'll be basic interpolation
@@ -168,12 +183,12 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char *
   // do the "general" audio  options. Note, these options are in the "general" stanza!
   parse_general_audio_options();
 
-/*
-  // now any PipeWire-specific options
-  if (config.cfg != NULL) {
-    const char *str;
-  }
-*/
+  /*
+    // now any PipeWire-specific options
+    if (config.cfg != NULL) {
+      const char *str;
+    }
+  */
   // finished collecting settings
 
   // allocate space for the audio buffer
@@ -194,10 +209,10 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char *
 
   /* make a main loop. If you already have another main loop, you can add
    * the fd of this pipewire mainloop to it. */
-  data.loop =  pw_thread_loop_new("tone-generator", NULL);
+  data.loop = pw_thread_loop_new("tone-generator", NULL);
 
   pw_thread_loop_lock(data.loop);
-    
+
   pw_thread_loop_start(data.loop);
 
   /* Create a simple stream, the simple stream manages the core and remote
@@ -211,7 +226,7 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char *
    * you need to listen to is the process event where you need to produce
    * the data.
    */
-  
+
   props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Playback",
                             PW_KEY_MEDIA_ROLE, "Music", NULL);
   if (argc > 1)
@@ -240,29 +255,31 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char *
 }
 
 static void deinit(void) {
-  debug (1, "pw_deinit");
+  debug(1, "pw_deinit");
   pw_thread_loop_stop(data.loop);
   pw_stream_destroy(data.stream);
   pw_thread_loop_destroy(data.loop);
   pw_deinit();
+  free(audio_lmb); // deallocate that buffer
   debug(1, "pa_deinit done");
 }
 
-static int play(__attribute__((unused)) void *buf, int samples, __attribute__((unused)) int sample_type,
-                __attribute__((unused)) uint32_t timestamp,
-                __attribute__((unused)) uint64_t playtime) {
-  debug(1,"pw_play of %d samples.",samples);
-  /*
+static void start(__attribute__((unused)) int sample_rate,
+                  __attribute__((unused)) int sample_format) {
+}
+
+static int play(__attribute__((unused)) void *buf, int samples,
+                __attribute__((unused)) int sample_type, __attribute__((unused)) uint32_t timestamp,
+                __attribute__((unused)) uint64_t playtime) {  
   // copy the samples into the queue
-  check_pa_stream_status(stream, "audio_pw play.");
   size_t bytes_to_transfer = samples * 2 * 2;
   size_t space_to_end_of_buffer = audio_umb - audio_eoq;
   if (space_to_end_of_buffer >= bytes_to_transfer) {
     memcpy(audio_eoq, buf, bytes_to_transfer);
-    audio_occupancy += bytes_to_transfer;
     pthread_mutex_lock(&buffer_mutex);
-    audio_eoq += bytes_to_transfer;
+    audio_occupancy += bytes_to_transfer;
     pthread_mutex_unlock(&buffer_mutex);
+    audio_eoq += bytes_to_transfer;
   } else {
     memcpy(audio_eoq, buf, space_to_end_of_buffer);
     buf += space_to_end_of_buffer;
@@ -272,47 +289,74 @@ static int play(__attribute__((unused)) void *buf, int samples, __attribute__((u
     pthread_mutex_unlock(&buffer_mutex);
     audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer;
   }
-  // maybe goose it if it's stopped?
-  */
+  debug(3, "play %d samples; %d bytes in buffer.", samples, audio_occupancy);
   return 0;
 }
 
-/*
-int pa_delay(long *the_delay) {
-  check_pa_stream_status(stream, "audio_pa delay.");
-  // debug(1,"pa_delay");
-  long result = 0;
-  int reply = 0;
-  pa_usec_t latency;
-  int negative;
-  pa_threaded_mainloop_lock(mainloop);
-  int gl = pa_stream_get_latency(stream, &latency, &negative);
-  pa_threaded_mainloop_unlock(mainloop);
-  if (gl == PA_ERR_NODATA) {
-    // debug(1, "No latency data yet.");
-    reply = -ENODEV;
-  } else if (gl != 0) {
-    // debug(1,"Error %d getting latency.",gl);
-    reply = -EIO;
+int delay(long *the_delay) {
+  // find out what's already in the PipeWire system and when
+  struct timing_data timing_data;
+  int loop_count = 1;
+  do {
+    memcpy(&timing_data, (char *)&timing_data_1, sizeof(struct timing_data));
+    __sync_synchronize();
+    if (memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) {
+        usleep(2); // microseconds
+        loop_count++;
+        __sync_synchronize();
+    }
+  } while ((memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) && (loop_count < 10));
+  long total_delay_now_frames_long = 0;
+  if ((loop_count < 10) && (timing_data.pw_time_is_valid != 0)) {
+      struct timespec time_now;
+    clock_gettime(CLOCK_MONOTONIC, &time_now);
+    int64_t interval_from_process_time_to_now = SPA_TIMESPEC_TO_NSEC(&time_now) - timing_data.time_info.now;
+    int64_t delay_in_ns = timing_data.time_info.delay + timing_data.time_info.buffered;
+    delay_in_ns = delay_in_ns * 1000000000;
+    delay_in_ns = delay_in_ns * timing_data.time_info.rate.num;
+    delay_in_ns = delay_in_ns / timing_data.time_info.rate.denom;
+    
+    int64_t total_delay_now_ns = delay_in_ns - interval_from_process_time_to_now;
+    int64_t total_delay_now_frames = (total_delay_now_ns * 44100)/1000000000 + timing_data.frames;
+    total_delay_now_frames_long = total_delay_now_frames;
+    debug(3, "total delay in frames: % " PRId64 ", %ld.", total_delay_now_frames, total_delay_now_frames_long);
+
+    debug(3,
+          "interval_from_process_time_to_now: %" PRId64 " ns, "
+          "delay_in_ns: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".",
+          // delay_timing_data.time_info.rate.num, delay_timing_data.time_info.rate.denom, 
+          interval_from_process_time_to_now, delay_in_ns,
+          timing_data.time_info.queued, timing_data.time_info.buffered);
+
+
   } else {
-    result = (audio_occupancy / (2 * 2)) + (latency * 44100) / 1000000;
-    reply = 0;
+    debug(1, "can't get time info.");
   }
+
+  long result = 0;
+  int reply = 0;
+  pthread_mutex_lock(&buffer_mutex);
+  result = total_delay_now_frames_long + audio_occupancy / (2 * 2);
+  pthread_mutex_unlock(&buffer_mutex);
   *the_delay = result;
   return reply;
 }
-*/
 
-void flush(void) {
+
+static void flush(void) {
   audio_toq = audio_eoq = audio_lmb;
   audio_umb = audio_lmb + audio_size;
+  pthread_mutex_lock(&buffer_mutex);
   audio_occupancy = 0;
+  pthread_mutex_unlock(&buffer_mutex);
 }
 
 static void stop(void) {
   audio_toq = audio_eoq = audio_lmb;
   audio_umb = audio_lmb + audio_size;
+  pthread_mutex_lock(&buffer_mutex);
   audio_occupancy = 0;
+  pthread_mutex_unlock(&buffer_mutex);
 }
 
 audio_output audio_pw = {.name = "pw",
@@ -320,11 +364,11 @@ audio_output audio_pw = {.name = "pw",
                          .init = &init,
                          .deinit = &deinit,
                          .prepare = NULL,
-                         .start = NULL,
+                         .start = &start,
                          .stop = &stop,
                          .is_running = NULL,
                          .flush = &flush,
-                         .delay = NULL,
+                         .delay = &delay,
                          .stats = NULL,
                          .play = &play,
                          .volume = NULL,