From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Sun, 1 Oct 2023 08:30:29 +0000 (+0100) Subject: Complete replacement of the PipeWire backend with a new implementation with full... X-Git-Tag: 4.3.2^2~21 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bde2bcc8b4322f612474ac7a4bc252101dfbf663;p=thirdparty%2Fshairport-sync.git Complete replacement of the PipeWire backend with a new implementation with full synchrnoisation. Works well, but still preliminary. --- diff --git a/audio_pw.c b/audio_pw.c index c4677dc9..663dba12 100644 --- a/audio_pw.c +++ b/audio_pw.c @@ -1,6 +1,6 @@ /* * Asynchronous PipeWire Backend. This file is part of Shairport Sync. - * Copyright (c) Mike Brady 2017-2023 + * Copyright (c) Mike Brady 2023 * All rights reserved. * * Permission is hereby granted, free of charge, to any person @@ -24,6 +24,10 @@ * OTHER DEALINGS IN THE SOFTWARE. */ +// This uses ideas from the tone generator sample code at: +// https://github.com/PipeWire/pipewire/blob/master/src/examples/audio-src.c +// Thanks to the Wim Taymans. + #include "audio.h" #include "common.h" #include @@ -39,7 +43,6 @@ #define DEFAULT_FORMAT SPA_AUDIO_FORMAT_S16_LE #define DEFAULT_RATE 44100 #define DEFAULT_CHANNELS 2 -#define DEFAULT_VOLUME 0.7 // Four seconds buffer -- should be plenty #define buffer_allocation 44100 * 4 * 2 * 2 @@ -53,9 +56,9 @@ static size_t audio_occupancy; uint64_t starting_time; struct timing_data { - int pw_time_is_valid; //set when the pw_time has been set + int pw_time_is_valid; // set when the pw_time has been set struct pw_time time_info; // information about the last time a process callback occurred - size_t frames; // the number of frames sent at that time + size_t frames; // the number of frames sent at that time }; // to avoid using a mutex, write the same data twice and check they are the same @@ -66,8 +69,6 @@ struct timing_data timing_data_1, timing_data_2; struct data { struct pw_thread_loop *loop; struct pw_stream *stream; - - double accumulator; }; // the pipewire global data structure @@ -75,14 +76,46 @@ struct data data = { 0, }; -static int fill(void *dest, int max_frames, int stride) { - size_t bytes_we_can_transfer = max_frames * stride; +static void on_state_changed(__attribute__((unused)) void *userdata, enum pw_stream_state old, + enum pw_stream_state state, + __attribute__((unused)) const char *error) { + // struct pw_data *pw = userdata; + debug(3, "pw: stream state changed %s -> %s", pw_stream_state_as_string(old), + pw_stream_state_as_string(state)); +} + +static void on_process(void *userdata) { + + struct data *data = userdata; + int n_frames = 0; + pthread_mutex_lock(&buffer_mutex); - if (bytes_we_can_transfer > audio_occupancy) - bytes_we_can_transfer = audio_occupancy; - pthread_mutex_unlock(&buffer_mutex); - if (bytes_we_can_transfer > 0) { + + if (audio_occupancy > 0) { + + // get a buffer to see how big it can be + struct pw_buffer *b = pw_stream_dequeue_buffer(data->stream); + if (b == NULL) { + pw_log_warn("out of buffers: %m"); + return; + } + struct spa_buffer *buf = b->buffer; + uint8_t *dest = buf->datas[0].data; + if (dest == NULL) // the first data block does not contain a data pointer + return; + + int stride = sizeof(int16_t) * DEFAULT_CHANNELS; + int max_possible_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride); + + size_t bytes_we_can_transfer = max_possible_frames * stride; + + if (bytes_we_can_transfer > audio_occupancy) + bytes_we_can_transfer = audio_occupancy; + + n_frames = bytes_we_can_transfer / stride; + size_t bytes_to_end_of_buffer = (size_t)(audio_umb - audio_toq); // must be zero or positive + if (bytes_we_can_transfer <= bytes_to_end_of_buffer) { // the bytes are all in a row in the audio buffer memcpy(dest, audio_toq, bytes_we_can_transfer); @@ -96,81 +129,42 @@ static int fill(void *dest, int max_frames, int stride) { memcpy(new_dest, audio_lmb, bytes_we_can_transfer - first_portion_to_write); audio_toq = audio_lmb + bytes_we_can_transfer - first_portion_to_write; } - // lock - pthread_mutex_lock(&buffer_mutex); - audio_occupancy -= bytes_we_can_transfer; - pthread_mutex_unlock(&buffer_mutex); - // unlock - } - return bytes_we_can_transfer / stride; // back to numbers of frames -} -/* our data processing function is in general: - * - * struct pw_buffer *b; - * b = pw_stream_dequeue_buffer(stream); - * - * .. generate stuff in the buffer ... - * - * pw_stream_queue_buffer(stream, b); - */ -static void on_process(void *userdata) { - struct data *data = userdata; - - struct pw_time time_info; - memset(&time_info, 0, sizeof(time_info)); - - struct pw_buffer *b; - struct spa_buffer *buf; - int max_possible_frames, n_frames, stride; - uint8_t *p; - - if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) { - pw_log_warn("out of buffers: %m"); - return; - } + buf->datas[0].chunk->offset = 0; + buf->datas[0].chunk->stride = stride; + buf->datas[0].chunk->size = n_frames * stride; + pw_stream_queue_buffer(data->stream, b); + debug(3, "Queueing %d frames for output.", n_frames); - buf = b->buffer; - if ((p = buf->datas[0].data) == NULL) // the first data block does not contain a data pointer - return; + audio_occupancy -= bytes_we_can_transfer; + } + pthread_mutex_unlock(&buffer_mutex); - stride = sizeof(int16_t) * DEFAULT_CHANNELS; - max_possible_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride); - - do { - n_frames = fill(p, max_possible_frames, stride); - if (n_frames == 0) { - usleep(1000); - } - } while(n_frames == 0); - - buf->datas[0].chunk->offset = 0; - buf->datas[0].chunk->stride = stride; - buf->datas[0].chunk->size = n_frames * stride; - debug(3, "Queueing %d frames for output.", n_frames); - - if (pw_stream_get_time_n(data->stream, &timing_data_1.time_info, sizeof(time_info)) == 0) + timing_data_1.frames = n_frames; + if (pw_stream_get_time_n(data->stream, &timing_data_1.time_info, sizeof(struct timing_data)) == 0) timing_data_1.pw_time_is_valid = 1; else timing_data_1.pw_time_is_valid = 0; __sync_synchronize(); memcpy((char *)&timing_data_2, (char *)&timing_data_1, sizeof(struct timing_data)); __sync_synchronize(); - pw_stream_queue_buffer(data->stream, b); - } - static const struct pw_stream_events stream_events = { - PW_VERSION_STREAM_EVENTS, - .process = on_process, -}; + PW_VERSION_STREAM_EVENTS, .process = on_process, .state_changed = on_state_changed}; + +static void deinit(void) { + pw_thread_loop_stop(data.loop); + pw_stream_destroy(data.stream); + pw_thread_loop_destroy(data.loop); + pw_deinit(); + free(audio_lmb); // deallocate that buffer +} static int init(__attribute__((unused)) int argc, __attribute__((unused)) char **argv) { - debug(1, "pw_init"); // set up default values first - memset(&timing_data_1,0,sizeof(struct timing_data)); - memset(&timing_data_2,0,sizeof(struct timing_data)); + memset(&timing_data_1, 0, sizeof(struct timing_data)); + memset(&timing_data_2, 0, sizeof(struct timing_data)); config.audio_backend_buffer_desired_length = 0.35; config.audio_backend_buffer_interpolation_threshold_in_seconds = 0.02; // below this, soxr interpolation will not occur -- it'll be basic interpolation @@ -207,30 +201,17 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char * int largc = 0; pw_init(&largc, NULL); - /* make a main loop. If you already have another main loop, you can add - * the fd of this pipewire mainloop to it. */ - data.loop = pw_thread_loop_new("tone-generator", NULL); + /* make a threaded loop. */ + data.loop = pw_thread_loop_new("shairport-sync", NULL); pw_thread_loop_lock(data.loop); pw_thread_loop_start(data.loop); - /* Create a simple stream, the simple stream manages the core and remote - * objects for you if you don't need to deal with them. - * - * If you plan to autoconnect your stream, you need to provide at least - * media, category and role properties. - * - * Pass your events and a user_data pointer as the last arguments. This - * will inform you about the stream state. The most important event - * you need to listen to is the process event where you need to produce - * the data. - */ - props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Playback", PW_KEY_MEDIA_ROLE, "Music", PW_KEY_APP_NAME, "Shairport Sync", NULL); - data.stream = pw_stream_new_simple(pw_thread_loop_get_loop(data.loop), "audio-src-tg", props, + data.stream = pw_stream_new_simple(pw_thread_loop_get_loop(data.loop), "shairport-sync", props, &stream_events, &data); /* Make one parameter with the supported formats. The SPA_PARAM_EnumFormat @@ -248,58 +229,42 @@ static int init(__attribute__((unused)) int argc, __attribute__((unused)) char * params, 1); pw_thread_loop_unlock(data.loop); - debug(1, "pa_init done"); return 0; } -static void deinit(void) { - pw_thread_loop_unlock(data.loop); - debug(1, "pipewire: deinit deactivated the stream"); - pw_thread_loop_signal(data.loop, false); - pw_thread_loop_wait(data.loop); - debug(1, "pipewire: deinit loop wait done"); - pw_thread_loop_stop(data.loop); - debug(1, "pipewire: deinit loop stopped"); - pw_stream_destroy(data.stream); - debug(1, "pipewire: deinit stream destroyed"); - pw_thread_loop_destroy(data.loop); - debug(1, "pipewire: deinit loop destroyed"); - pw_deinit(); - debug(1, "pipewire: pw_deinit terminated"); - free(audio_lmb); // deallocate that buffer - debug(1, "pipewire: deinit done"); -} - static void start(__attribute__((unused)) int sample_rate, - __attribute__((unused)) int sample_format) { -} + __attribute__((unused)) int sample_format) {} static int play(__attribute__((unused)) void *buf, int samples, __attribute__((unused)) int sample_type, __attribute__((unused)) uint32_t timestamp, - __attribute__((unused)) uint64_t playtime) { + __attribute__((unused)) uint64_t playtime) { // copy the samples into the queue + debug(3, "play %u samples; %u bytes already in the buffer.", samples, audio_occupancy); size_t bytes_to_transfer = samples * 2 * 2; - size_t space_to_end_of_buffer = audio_umb - audio_eoq; - if (space_to_end_of_buffer >= bytes_to_transfer) { - memcpy(audio_eoq, buf, bytes_to_transfer); - pthread_mutex_lock(&buffer_mutex); - audio_occupancy += bytes_to_transfer; - pthread_mutex_unlock(&buffer_mutex); - audio_eoq += bytes_to_transfer; - } else { - memcpy(audio_eoq, buf, space_to_end_of_buffer); - buf += space_to_end_of_buffer; - memcpy(audio_lmb, buf, bytes_to_transfer - space_to_end_of_buffer); - pthread_mutex_lock(&buffer_mutex); + pthread_mutex_lock(&buffer_mutex); + size_t bytes_available = audio_size - audio_occupancy; + if (bytes_available < bytes_to_transfer) + bytes_to_transfer = bytes_available; + if (bytes_to_transfer > 0) { + size_t space_to_end_of_buffer = audio_umb - audio_eoq; + if (space_to_end_of_buffer >= bytes_to_transfer) { + memcpy(audio_eoq, buf, bytes_to_transfer); + audio_eoq += bytes_to_transfer; + } else { + memcpy(audio_eoq, buf, space_to_end_of_buffer); + buf += space_to_end_of_buffer; + memcpy(audio_lmb, buf, bytes_to_transfer - space_to_end_of_buffer); + audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer; + } audio_occupancy += bytes_to_transfer; - pthread_mutex_unlock(&buffer_mutex); - audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer; } - debug(3, "play %d samples; %d bytes in buffer.", samples, audio_occupancy); + pthread_mutex_unlock(&buffer_mutex); return 0; } int delay(long *the_delay) { + long result = 0; + int reply = 0; // find out what's already in the PipeWire system and when struct timing_data timing_data; int loop_count = 1; @@ -307,40 +272,44 @@ int delay(long *the_delay) { memcpy(&timing_data, (char *)&timing_data_1, sizeof(struct timing_data)); __sync_synchronize(); if (memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) { - usleep(2); // microseconds - loop_count++; - __sync_synchronize(); + usleep(2); // microseconds + loop_count++; + __sync_synchronize(); } - } while ((memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) && (loop_count < 10)); + } while ((memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) && + (loop_count < 10)); long total_delay_now_frames_long = 0; if ((loop_count < 10) && (timing_data.pw_time_is_valid != 0)) { - struct timespec time_now; + struct timespec time_now; clock_gettime(CLOCK_MONOTONIC, &time_now); - int64_t interval_from_process_time_to_now = SPA_TIMESPEC_TO_NSEC(&time_now) - timing_data.time_info.now; + int64_t interval_from_process_time_to_now = + SPA_TIMESPEC_TO_NSEC(&time_now) - timing_data.time_info.now; int64_t delay_in_ns = timing_data.time_info.delay + timing_data.time_info.buffered; delay_in_ns = delay_in_ns * 1000000000; delay_in_ns = delay_in_ns * timing_data.time_info.rate.num; delay_in_ns = delay_in_ns / timing_data.time_info.rate.denom; - + int64_t total_delay_now_ns = delay_in_ns - interval_from_process_time_to_now; - int64_t total_delay_now_frames = (total_delay_now_ns * 44100)/1000000000 + timing_data.frames; + int64_t total_delay_now_frames = (total_delay_now_ns * 44100) / 1000000000 + timing_data.frames; total_delay_now_frames_long = total_delay_now_frames; - debug(3, "total delay in frames: % " PRId64 ", %ld.", total_delay_now_frames, total_delay_now_frames_long); - - debug(3, - "interval_from_process_time_to_now: %" PRId64 " ns, " - "delay_in_ns: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".", - // delay_timing_data.time_info.rate.num, delay_timing_data.time_info.rate.denom, - interval_from_process_time_to_now, delay_in_ns, - timing_data.time_info.queued, timing_data.time_info.buffered); + debug(3, "total delay in frames: %ld.", total_delay_now_frames_long); + if (timing_data.time_info.queued != 0) { + debug(1, "buffers queued: %d", timing_data.time_info.queued); + } + /* + debug(3, + "interval_from_process_time_to_now: %" PRId64 " ns, " + "delay_in_ns: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".", + // delay_timing_data.time_info.rate.num, delay_timing_data.time_info.rate.denom, + interval_from_process_time_to_now, delay_in_ns, + timing_data.time_info.queued, timing_data.time_info.buffered); + */ } else { debug(1, "can't get time info."); } - long result = 0; - int reply = 0; pthread_mutex_lock(&buffer_mutex); result = total_delay_now_frames_long + audio_occupancy / (2 * 2); pthread_mutex_unlock(&buffer_mutex); @@ -348,19 +317,18 @@ int delay(long *the_delay) { return reply; } - static void flush(void) { + pthread_mutex_lock(&buffer_mutex); audio_toq = audio_eoq = audio_lmb; audio_umb = audio_lmb + audio_size; - pthread_mutex_lock(&buffer_mutex); audio_occupancy = 0; pthread_mutex_unlock(&buffer_mutex); } static void stop(void) { + pthread_mutex_lock(&buffer_mutex); audio_toq = audio_eoq = audio_lmb; audio_umb = audio_lmb + audio_size; - pthread_mutex_lock(&buffer_mutex); audio_occupancy = 0; pthread_mutex_unlock(&buffer_mutex); }