From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Sat, 30 Sep 2023 14:45:41 +0000 (+0100) Subject: Replace the pipewire backend completely. Play writes to a buffer. The on_process... X-Git-Tag: 4.3.2^2~24 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=27e4ba09a1bb07defabb9e6d2e68e76312e0ac33;p=thirdparty%2Fshairport-sync.git Replace the pipewire backend completely. Play writes to a buffer. The on_process() function reads from the buffer and updates timing information for the delay() function. Barebones -- can't set the app name or the volume. Assumes no further delays and no buffers when on_process is called. But it works! --- diff --git a/audio_pw.c b/audio_pw.c index 344371a4..33b27c7e 100644 --- a/audio_pw.c +++ b/audio_pw.c @@ -24,7 +24,6 @@ * OTHER DEALINGS IN THE SOFTWARE. */ - #include "audio.h" #include "common.h" #include @@ -33,10 +32,8 @@ #include #include -#include // many not need this after development - -#include #include +#include // note -- these are hacked and hardwired into this code. #define DEFAULT_FORMAT SPA_AUDIO_FORMAT_S16_LE @@ -47,16 +44,25 @@ // Four seconds buffer -- should be plenty #define buffer_allocation 44100 * 4 * 2 * 2 -// static pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER; -char *audio_lmb, *audio_umb, *audio_toq, *audio_eoq; -size_t audio_size = buffer_allocation; -size_t audio_occupancy; - -#define M_PI_M2 (M_PI + M_PI) +static char *audio_lmb, *audio_umb, *audio_toq, *audio_eoq; +static size_t audio_size = buffer_allocation; +static size_t audio_occupancy; uint64_t starting_time; +struct timing_data { + int pw_time_is_valid; //set when the pw_time has been set + struct pw_time time_info; // information about the last time a process callback occurred + size_t frames; // the number of frames sent at that time +}; + +// to avoid using a mutex, write the same data twice and check they are the same +// to ensure they are consistent. Make sure the first is written strictly before the second +// using __sync_synchronize(); +struct timing_data timing_data_1, timing_data_2; + struct data { struct pw_thread_loop *loop; struct pw_stream *stream; @@ -69,22 +75,34 @@ struct data data = { 0, }; -static void fill_le16(struct data *d, void *dest, int n_frames) { - //float *dst = dest, val; - float val; - int16_t *dst = dest, le16val; - int i, c; - - for (i = 0; i < n_frames; i++) { - d->accumulator += M_PI_M2 * 440 / DEFAULT_RATE; - if (d->accumulator >= M_PI_M2) - d->accumulator -= M_PI_M2; - - val = sin(d->accumulator) * DEFAULT_VOLUME; - le16val = INT16_MAX * val; - for (c = 0; c < DEFAULT_CHANNELS; c++) - *dst++ = le16val; +static int fill(void *dest, int max_frames, int stride) { + size_t bytes_we_can_transfer = max_frames * stride; + pthread_mutex_lock(&buffer_mutex); + if (bytes_we_can_transfer > audio_occupancy) + bytes_we_can_transfer = audio_occupancy; + pthread_mutex_unlock(&buffer_mutex); + if (bytes_we_can_transfer > 0) { + size_t bytes_to_end_of_buffer = (size_t)(audio_umb - audio_toq); // must be zero or positive + if (bytes_we_can_transfer <= bytes_to_end_of_buffer) { + // the bytes are all in a row in the audio buffer + memcpy(dest, audio_toq, bytes_we_can_transfer); + audio_toq += bytes_we_can_transfer; + } else { + // the bytes are in two places in the audio buffer + size_t first_portion_to_write = audio_umb - audio_toq; + if (first_portion_to_write != 0) + memcpy(dest, audio_toq, first_portion_to_write); + uint8_t *new_dest = dest + first_portion_to_write; + memcpy(new_dest, audio_lmb, bytes_we_can_transfer - first_portion_to_write); + audio_toq = audio_lmb + bytes_we_can_transfer - first_portion_to_write; + } + // lock + pthread_mutex_lock(&buffer_mutex); + audio_occupancy -= bytes_we_can_transfer; + pthread_mutex_unlock(&buffer_mutex); + // unlock } + return bytes_we_can_transfer / stride; // back to numbers of frames } /* our data processing function is in general: @@ -97,57 +115,52 @@ static void fill_le16(struct data *d, void *dest, int n_frames) { * pw_stream_queue_buffer(stream, b); */ static void on_process(void *userdata) { - int wait; - do { - uint64_t time_now = get_absolute_time_in_ns(); - int64_t elapsed_time = time_now - starting_time; - double elapsed_time_seconds = fmod(elapsed_time * 0.000000001, 10.0); - wait = (elapsed_time_seconds > 4.0) && (elapsed_time_seconds < 6.0); - if (wait != 0) { - // debug(1, "wait..."); - usleep(1000); - } - } while (wait != 0); struct data *data = userdata; + + struct pw_time time_info; + memset(&time_info, 0, sizeof(time_info)); + struct pw_buffer *b; struct spa_buffer *buf; - int n_frames, stride; + int max_possible_frames, n_frames, stride; uint8_t *p; if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) { pw_log_warn("out of buffers: %m"); return; } - - struct pw_time time_info; - memset(&time_info, 0, sizeof(time_info)); - - int response = pw_stream_get_time_n(data->stream, &time_info, sizeof(time_info)); - if (response == 0) { - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - int64_t diff = SPA_TIMESPEC_TO_NSEC(&ts) - time_info.now; - int64_t elapsed = (time_info.rate.denom * diff) / (time_info.rate.num * SPA_NSEC_PER_SEC); - debug(1, "rate.num: %" PRId64 ", rate.denom: %" PRId64 ", diff: %" PRId64 "ns, %" PRId64 " frames, delay: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".", time_info.rate.num, time_info.rate.denom, diff, elapsed, time_info.delay, time_info.queued, time_info.buffered); - } else { - debug(1, "can't get time info: %d.", response); - } buf = b->buffer; - if ((p = buf->datas[0].data) == NULL) + if ((p = buf->datas[0].data) == NULL) // the first data block does not contain a data pointer return; stride = sizeof(int16_t) * DEFAULT_CHANNELS; - n_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride); - - fill_le16(data, p, n_frames); + max_possible_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride); + + do { + n_frames = fill(p, max_possible_frames, stride); + if (n_frames == 0) { + usleep(1000); + } + } while(n_frames == 0); buf->datas[0].chunk->offset = 0; buf->datas[0].chunk->stride = stride; buf->datas[0].chunk->size = n_frames * stride; + debug(3, "Queueing %d frames for output.", n_frames); + + if (pw_stream_get_time_n(data->stream, &timing_data_1.time_info, sizeof(time_info)) == 0) + timing_data_1.pw_time_is_valid = 1; + else + timing_data_1.pw_time_is_valid = 0; + __sync_synchronize(); + memcpy((char *)&timing_data_2, (char *)&timing_data_1, sizeof(struct timing_data)); + __sync_synchronize(); pw_stream_queue_buffer(data->stream, b); + } + static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .process = on_process, @@ -156,6 +169,8 @@ static const struct pw_stream_events stream_events = { static int init(__attribute__((unused)) int argc, __attribute__((unused)) char **argv) { debug(1, "pw_init"); // set up default values first + memset(&timing_data_1,0,sizeof(struct timing_data)); + memset(&timing_data_2,0,sizeof(struct timing_data)); config.audio_backend_buffer_desired_length = 0.35; config.audio_backend_buffer_interpolation_threshold_in_seconds = 0.02; // below this, soxr interpolation will not occur -- it'll be basic interpolation @@ -168,12 +183,12 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char * // do the "general" audio options. Note, these options are in the "general" stanza! parse_general_audio_options(); -/* - // now any PipeWire-specific options - if (config.cfg != NULL) { - const char *str; - } -*/ + /* + // now any PipeWire-specific options + if (config.cfg != NULL) { + const char *str; + } + */ // finished collecting settings // allocate space for the audio buffer @@ -194,10 +209,10 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char * /* make a main loop. If you already have another main loop, you can add * the fd of this pipewire mainloop to it. */ - data.loop = pw_thread_loop_new("tone-generator", NULL); + data.loop = pw_thread_loop_new("tone-generator", NULL); pw_thread_loop_lock(data.loop); - + pw_thread_loop_start(data.loop); /* Create a simple stream, the simple stream manages the core and remote @@ -211,7 +226,7 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char * * you need to listen to is the process event where you need to produce * the data. */ - + props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Playback", PW_KEY_MEDIA_ROLE, "Music", NULL); if (argc > 1) @@ -240,29 +255,31 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char * } static void deinit(void) { - debug (1, "pw_deinit"); + debug(1, "pw_deinit"); pw_thread_loop_stop(data.loop); pw_stream_destroy(data.stream); pw_thread_loop_destroy(data.loop); pw_deinit(); + free(audio_lmb); // deallocate that buffer debug(1, "pa_deinit done"); } -static int play(__attribute__((unused)) void *buf, int samples, __attribute__((unused)) int sample_type, - __attribute__((unused)) uint32_t timestamp, - __attribute__((unused)) uint64_t playtime) { - debug(1,"pw_play of %d samples.",samples); - /* +static void start(__attribute__((unused)) int sample_rate, + __attribute__((unused)) int sample_format) { +} + +static int play(__attribute__((unused)) void *buf, int samples, + __attribute__((unused)) int sample_type, __attribute__((unused)) uint32_t timestamp, + __attribute__((unused)) uint64_t playtime) { // copy the samples into the queue - check_pa_stream_status(stream, "audio_pw play."); size_t bytes_to_transfer = samples * 2 * 2; size_t space_to_end_of_buffer = audio_umb - audio_eoq; if (space_to_end_of_buffer >= bytes_to_transfer) { memcpy(audio_eoq, buf, bytes_to_transfer); - audio_occupancy += bytes_to_transfer; pthread_mutex_lock(&buffer_mutex); - audio_eoq += bytes_to_transfer; + audio_occupancy += bytes_to_transfer; pthread_mutex_unlock(&buffer_mutex); + audio_eoq += bytes_to_transfer; } else { memcpy(audio_eoq, buf, space_to_end_of_buffer); buf += space_to_end_of_buffer; @@ -272,47 +289,74 @@ static int play(__attribute__((unused)) void *buf, int samples, __attribute__((u pthread_mutex_unlock(&buffer_mutex); audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer; } - // maybe goose it if it's stopped? - */ + debug(3, "play %d samples; %d bytes in buffer.", samples, audio_occupancy); return 0; } -/* -int pa_delay(long *the_delay) { - check_pa_stream_status(stream, "audio_pa delay."); - // debug(1,"pa_delay"); - long result = 0; - int reply = 0; - pa_usec_t latency; - int negative; - pa_threaded_mainloop_lock(mainloop); - int gl = pa_stream_get_latency(stream, &latency, &negative); - pa_threaded_mainloop_unlock(mainloop); - if (gl == PA_ERR_NODATA) { - // debug(1, "No latency data yet."); - reply = -ENODEV; - } else if (gl != 0) { - // debug(1,"Error %d getting latency.",gl); - reply = -EIO; +int delay(long *the_delay) { + // find out what's already in the PipeWire system and when + struct timing_data timing_data; + int loop_count = 1; + do { + memcpy(&timing_data, (char *)&timing_data_1, sizeof(struct timing_data)); + __sync_synchronize(); + if (memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) { + usleep(2); // microseconds + loop_count++; + __sync_synchronize(); + } + } while ((memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) && (loop_count < 10)); + long total_delay_now_frames_long = 0; + if ((loop_count < 10) && (timing_data.pw_time_is_valid != 0)) { + struct timespec time_now; + clock_gettime(CLOCK_MONOTONIC, &time_now); + int64_t interval_from_process_time_to_now = SPA_TIMESPEC_TO_NSEC(&time_now) - timing_data.time_info.now; + int64_t delay_in_ns = timing_data.time_info.delay + timing_data.time_info.buffered; + delay_in_ns = delay_in_ns * 1000000000; + delay_in_ns = delay_in_ns * timing_data.time_info.rate.num; + delay_in_ns = delay_in_ns / timing_data.time_info.rate.denom; + + int64_t total_delay_now_ns = delay_in_ns - interval_from_process_time_to_now; + int64_t total_delay_now_frames = (total_delay_now_ns * 44100)/1000000000 + timing_data.frames; + total_delay_now_frames_long = total_delay_now_frames; + debug(3, "total delay in frames: % " PRId64 ", %ld.", total_delay_now_frames, total_delay_now_frames_long); + + debug(3, + "interval_from_process_time_to_now: %" PRId64 " ns, " + "delay_in_ns: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".", + // delay_timing_data.time_info.rate.num, delay_timing_data.time_info.rate.denom, + interval_from_process_time_to_now, delay_in_ns, + timing_data.time_info.queued, timing_data.time_info.buffered); + + } else { - result = (audio_occupancy / (2 * 2)) + (latency * 44100) / 1000000; - reply = 0; + debug(1, "can't get time info."); } + + long result = 0; + int reply = 0; + pthread_mutex_lock(&buffer_mutex); + result = total_delay_now_frames_long + audio_occupancy / (2 * 2); + pthread_mutex_unlock(&buffer_mutex); *the_delay = result; return reply; } -*/ -void flush(void) { + +static void flush(void) { audio_toq = audio_eoq = audio_lmb; audio_umb = audio_lmb + audio_size; + pthread_mutex_lock(&buffer_mutex); audio_occupancy = 0; + pthread_mutex_unlock(&buffer_mutex); } static void stop(void) { audio_toq = audio_eoq = audio_lmb; audio_umb = audio_lmb + audio_size; + pthread_mutex_lock(&buffer_mutex); audio_occupancy = 0; + pthread_mutex_unlock(&buffer_mutex); } audio_output audio_pw = {.name = "pw", @@ -320,11 +364,11 @@ audio_output audio_pw = {.name = "pw", .init = &init, .deinit = &deinit, .prepare = NULL, - .start = NULL, + .start = &start, .stop = &stop, .is_running = NULL, .flush = &flush, - .delay = NULL, + .delay = &delay, .stats = NULL, .play = &play, .volume = NULL,