2 * Asynchronous PipeWire Backend. This file is part of Shairport Sync.
3 * Copyright (c) Mike Brady 2023
6 * Permission is hereby granted, free of charge, to any person
7 * obtaining a copy of this software and associated documentation
8 * files (the "Software"), to deal in the Software without
9 * restriction, including without limitation the rights to use,
10 * copy, modify, merge, publish, distribute, sublicense, and/or
11 * sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
19 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
21 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
22 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
24 * OTHER DEALINGS IN THE SOFTWARE.
27 // This uses ideas from the tone generator sample code at:
28 // https://github.com/PipeWire/pipewire/blob/master/src/examples/audio-src.c
29 // Thanks to Wim Taymans.
39 #include <pipewire/pipewire.h>
40 #include <spa/param/audio/format-utils.h>
42 // note -- these are hardwired into this code.
43 #define DEFAULT_FORMAT SPA_AUDIO_FORMAT_S16_LE
44 #define DEFAULT_BYTES_PER_SAMPLE 2
46 #define DEFAULT_RATE 44100
47 #define DEFAULT_CHANNELS 2
48 #define DEFAULT_BUFFER_SIZE_IN_SECONDS 4
50 // Four seconds buffer -- should be plenty
51 #define buffer_allocation DEFAULT_RATE * DEFAULT_BUFFER_SIZE_IN_SECONDS * DEFAULT_BYTES_PER_SAMPLE * DEFAULT_CHANNELS
53 static pthread_mutex_t buffer_mutex
= PTHREAD_MUTEX_INITIALIZER
;
55 static char *audio_lmb
, *audio_umb
, *audio_toq
, *audio_eoq
;
56 static size_t audio_size
= buffer_allocation
;
57 static size_t audio_occupancy
;
58 static int enable_fill
;
61 int pw_time_is_valid
; // set when the pw_time has been set
62 struct pw_time time_info
; // information about the last time a process callback occurred
63 size_t frames
; // the number of frames sent at that time
66 // to avoid using a mutex, write the same data twice and check they are the same
67 // to ensure they are consistent. Make sure the first is written strictly before the second
68 // using __sync_synchronize();
69 struct timing_data timing_data_1
, timing_data_2
;
72 struct pw_thread_loop
*loop
;
73 struct pw_stream
*stream
;
76 // the pipewire global data structure
77 struct data data
= {NULL
, NULL
};
80 static void on_state_changed(__attribute__((unused)) void *userdata, enum pw_stream_state old,
81 enum pw_stream_state state,
82 __attribute__((unused)) const char *error) {
83 // struct pw_data *pw = userdata;
84 debug(3, "pw: stream state changed %s -> %s", pw_stream_state_as_string(old),
85 pw_stream_state_as_string(state));
89 static void on_process(void *userdata
) {
91 struct data
*data
= userdata
;
94 pthread_mutex_lock(&buffer_mutex
);
96 if ((audio_occupancy
> 0) || (enable_fill
)) {
98 // get a buffer to see how big it can be
99 struct pw_buffer
*b
= pw_stream_dequeue_buffer(data
->stream
);
101 pw_log_warn("out of buffers: %m");
102 die("PipeWire failure -- out of buffers!");
104 struct spa_buffer
*buf
= b
->buffer
;
105 uint8_t *dest
= buf
->datas
[0].data
;
107 int stride
= DEFAULT_BYTES_PER_SAMPLE
* DEFAULT_CHANNELS
;
109 // note: the requested field is the number of frames, not bytes, requested
110 int max_possible_frames
= SPA_MIN(b
->requested
, buf
->datas
[0].maxsize
/ stride
);
112 size_t bytes_we_can_transfer
= max_possible_frames
* stride
;
114 if (audio_occupancy
> 0) {
115 // if (enable_fill == 1)) {
116 // debug(1, "got audio -- disable_fill");
120 if (bytes_we_can_transfer
> audio_occupancy
)
121 bytes_we_can_transfer
= audio_occupancy
;
123 n_frames
= bytes_we_can_transfer
/ stride
;
125 size_t bytes_to_end_of_buffer
= (size_t)(audio_umb
- audio_toq
); // must be zero or positive
126 if (bytes_we_can_transfer
<= bytes_to_end_of_buffer
) {
127 // the bytes are all in a row in the audio buffer
128 memcpy(dest
, audio_toq
, bytes_we_can_transfer
);
129 audio_toq
+= bytes_we_can_transfer
;
131 // the bytes are in two places in the audio buffer
132 size_t first_portion_to_write
= audio_umb
- audio_toq
;
133 if (first_portion_to_write
!= 0)
134 memcpy(dest
, audio_toq
, first_portion_to_write
);
135 uint8_t *new_dest
= dest
+ first_portion_to_write
;
136 memcpy(new_dest
, audio_lmb
, bytes_we_can_transfer
- first_portion_to_write
);
137 audio_toq
= audio_lmb
+ bytes_we_can_transfer
- first_portion_to_write
;
139 audio_occupancy
-= bytes_we_can_transfer
;
142 debug(3, "send silence");
143 // this should really be dithered silence
144 memset(dest
, 0, bytes_we_can_transfer
);
145 n_frames
= max_possible_frames
;
147 buf
->datas
[0].chunk
->offset
= 0;
148 buf
->datas
[0].chunk
->stride
= stride
;
149 buf
->datas
[0].chunk
->size
= n_frames
* stride
;
150 pw_stream_queue_buffer(data
->stream
, b
);
151 debug(3, "Queueing %d frames for output.", n_frames
);
152 } // (else the first data block does not contain a data pointer)
154 pthread_mutex_unlock(&buffer_mutex
);
156 timing_data_1
.frames
= n_frames
;
157 if (pw_stream_get_time_n(data
->stream
, &timing_data_1
.time_info
, sizeof(struct timing_data
)) == 0)
158 timing_data_1
.pw_time_is_valid
= 1;
160 timing_data_1
.pw_time_is_valid
= 0;
161 __sync_synchronize();
162 memcpy((char *)&timing_data_2
, (char *)&timing_data_1
, sizeof(struct timing_data
));
163 __sync_synchronize();
166 static const struct pw_stream_events stream_events
= {PW_VERSION_STREAM_EVENTS
,
167 .process
= on_process
};
168 // PW_VERSION_STREAM_EVENTS, .process = on_process, .state_changed = on_state_changed};
170 static void deinit(void) {
171 pw_thread_loop_stop(data
.loop
);
172 pw_stream_destroy(data
.stream
);
173 pw_thread_loop_destroy(data
.loop
);
175 free(audio_lmb
); // deallocate that buffer
178 static int init(__attribute__((unused
)) int argc
, __attribute__((unused
)) char **argv
) {
179 // set up default values first
180 memset(&timing_data_1
, 0, sizeof(struct timing_data
));
181 memset(&timing_data_2
, 0, sizeof(struct timing_data
));
182 config
.audio_backend_buffer_desired_length
= 0.35;
183 config
.audio_backend_buffer_interpolation_threshold_in_seconds
=
184 0.02; // below this, soxr interpolation will not occur -- it'll be basic interpolation
187 config
.audio_backend_latency_offset
= 0;
189 // get settings from settings file
190 // do the "general" audio options. Note, these options are in the "general" stanza!
191 parse_general_audio_options();
193 // now any PipeWire-specific options
194 if (config
.cfg
!= NULL
) {
197 // Get the optional Application Name, if provided.
198 if (config_lookup_string(config
.cfg
, "pw.application_name", &str
)) {
199 config
.pw_application_name
= (char *)str
;
202 // Get the optional PipeWire node name, if provided.
203 if (config_lookup_string(config
.cfg
, "pw.node_name", &str
)) {
204 config
.pw_node_name
= (char *)str
;
207 // Get the optional PipeWire sink target name, if provided.
208 if (config_lookup_string(config
.cfg
, "pw.sink_target", &str
)) {
209 config
.pw_sink_target
= (char *)str
;
213 // finished collecting settings
215 // allocate space for the audio buffer
216 audio_lmb
= malloc(audio_size
);
217 if (audio_lmb
== NULL
)
218 die("Can't allocate %d bytes for PipeWire buffer.", audio_size
);
219 audio_toq
= audio_eoq
= audio_lmb
;
220 audio_umb
= audio_lmb
+ audio_size
;
222 // debug(1, "init enable_fill");
225 const struct spa_pod
*params
[1];
226 uint8_t buffer
[1024];
227 struct pw_properties
*props
;
228 struct spa_pod_builder b
= SPA_POD_BUILDER_INIT(buffer
, sizeof(buffer
));
231 pw_init(&largc
, NULL
);
233 /* make a threaded loop. */
234 data
.loop
= pw_thread_loop_new("shairport-sync", NULL
);
236 pw_thread_loop_lock(data
.loop
);
238 pw_thread_loop_start(data
.loop
);
240 char* appname
= config
.pw_application_name
;
242 appname
= "Shairport Sync";
244 char* nodename
= config
.pw_node_name
;
245 if (nodename
== NULL
)
246 nodename
= "Shairport Sync";
248 props
= pw_properties_new(PW_KEY_MEDIA_TYPE
, "Audio", PW_KEY_MEDIA_CATEGORY
, "Playback",
249 PW_KEY_MEDIA_ROLE
, "Music", PW_KEY_APP_NAME
, appname
,
250 PW_KEY_NODE_NAME
, nodename
, NULL
);
252 if (config
.pw_sink_target
!= NULL
) {
253 debug(3, "setting sink target to \"%s\".", config
.pw_sink_target
);
254 pw_properties_set(props
, PW_KEY_TARGET_OBJECT
, config
.pw_sink_target
);
257 data
.stream
= pw_stream_new_simple(pw_thread_loop_get_loop(data
.loop
), config
.appName
, props
,
258 &stream_events
, &data
);
260 // Make one parameter with the supported formats. The SPA_PARAM_EnumFormat
261 // id means that this is a format enumeration (of 1 value).
262 params
[0] = spa_format_audio_raw_build(&b
, SPA_PARAM_EnumFormat
,
263 &SPA_AUDIO_INFO_RAW_INIT(.format
= DEFAULT_FORMAT
,
264 .channels
= DEFAULT_CHANNELS
,
265 .rate
= DEFAULT_RATE
));
267 // Now connect this stream. We ask that our process function is
268 // called in a realtime thread.
269 pw_stream_connect(data
.stream
, PW_DIRECTION_OUTPUT
, PW_ID_ANY
,
270 PW_STREAM_FLAG_AUTOCONNECT
| PW_STREAM_FLAG_MAP_BUFFERS
|
271 PW_STREAM_FLAG_RT_PROCESS
,
274 pw_thread_loop_unlock(data
.loop
);
278 static void start(__attribute__((unused
)) int sample_rate
,
279 __attribute__((unused
)) int sample_format
) {
282 static int play(__attribute__((unused
)) void *buf
, int samples
,
283 __attribute__((unused
)) int sample_type
, __attribute__((unused
)) uint32_t timestamp
,
284 __attribute__((unused
)) uint64_t playtime
) {
285 // copy the samples into the queue
286 debug(3, "play %u samples; %u bytes already in the buffer.", samples
, audio_occupancy
);
287 size_t bytes_to_transfer
= samples
* DEFAULT_CHANNELS
* DEFAULT_BYTES_PER_SAMPLE
;
288 pthread_mutex_lock(&buffer_mutex
);
289 size_t bytes_available
= audio_size
- audio_occupancy
;
290 if (bytes_available
< bytes_to_transfer
)
291 bytes_to_transfer
= bytes_available
;
292 if (bytes_to_transfer
> 0) {
293 size_t space_to_end_of_buffer
= audio_umb
- audio_eoq
;
294 if (space_to_end_of_buffer
>= bytes_to_transfer
) {
295 memcpy(audio_eoq
, buf
, bytes_to_transfer
);
296 audio_eoq
+= bytes_to_transfer
;
298 memcpy(audio_eoq
, buf
, space_to_end_of_buffer
);
299 buf
+= space_to_end_of_buffer
;
300 memcpy(audio_lmb
, buf
, bytes_to_transfer
- space_to_end_of_buffer
);
301 audio_eoq
= audio_lmb
+ bytes_to_transfer
- space_to_end_of_buffer
;
303 audio_occupancy
+= bytes_to_transfer
;
305 pthread_mutex_unlock(&buffer_mutex
);
309 int delay(long *the_delay
) {
312 // find out what's already in the PipeWire system and when
313 struct timing_data timing_data
;
316 memcpy(&timing_data
, (char *)&timing_data_1
, sizeof(struct timing_data
));
317 __sync_synchronize();
318 if (memcmp(&timing_data
, (char *)&timing_data_2
, sizeof(struct timing_data
)) != 0) {
319 usleep(2); // microseconds
321 __sync_synchronize();
323 } while ((memcmp(&timing_data
, (char *)&timing_data_2
, sizeof(struct timing_data
)) != 0) &&
325 long total_delay_now_frames_long
= 0;
326 if ((loop_count
< 10) && (timing_data
.pw_time_is_valid
!= 0)) {
327 struct timespec time_now
;
328 clock_gettime(CLOCK_MONOTONIC
, &time_now
);
329 int64_t interval_from_process_time_to_now
=
330 SPA_TIMESPEC_TO_NSEC(&time_now
) - timing_data
.time_info
.now
;
331 int64_t delay_in_ns
= timing_data
.time_info
.delay
+ timing_data
.time_info
.buffered
;
332 delay_in_ns
= delay_in_ns
* 1000000000;
333 delay_in_ns
= delay_in_ns
* timing_data
.time_info
.rate
.num
;
334 delay_in_ns
= delay_in_ns
/ timing_data
.time_info
.rate
.denom
;
336 int64_t total_delay_now_ns
= delay_in_ns
- interval_from_process_time_to_now
;
337 int64_t total_delay_now_frames
= (total_delay_now_ns
* DEFAULT_RATE
) / 1000000000 + timing_data
.frames
;
338 total_delay_now_frames_long
= total_delay_now_frames
;
339 debug(3, "total delay in frames: %ld.", total_delay_now_frames_long
);
341 if (timing_data
.time_info
.queued
!= 0) {
342 debug(1, "buffers queued: %d", timing_data
.time_info
.queued
);
346 "interval_from_process_time_to_now: %" PRId64 " ns, "
347 "delay_in_ns: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".",
348 // delay_timing_data.time_info.rate.num, delay_timing_data.time_info.rate.denom,
349 interval_from_process_time_to_now, delay_in_ns,
350 timing_data.time_info.queued, timing_data.time_info.buffered);
354 warn("Shairport Sync's PipeWire backend can not get timing information from the PipeWire "
355 "system. Is PipeWire running?");
358 pthread_mutex_lock(&buffer_mutex
);
359 result
= total_delay_now_frames_long
+ audio_occupancy
/ (DEFAULT_BYTES_PER_SAMPLE
* DEFAULT_CHANNELS
);
360 pthread_mutex_unlock(&buffer_mutex
);
365 static void flush(void) {
366 pthread_mutex_lock(&buffer_mutex
);
367 audio_toq
= audio_eoq
= audio_lmb
;
368 audio_umb
= audio_lmb
+ audio_size
;
370 // if (enable_fill == 0) {
371 // debug(1, "flush enable_fill");
374 pthread_mutex_unlock(&buffer_mutex
);
377 static void stop(void) {
378 pthread_mutex_lock(&buffer_mutex
);
379 audio_toq
= audio_eoq
= audio_lmb
;
380 audio_umb
= audio_lmb
+ audio_size
;
382 // if (enable_fill == 0) {
383 // debug(1, "stop enable_fill");
386 pthread_mutex_unlock(&buffer_mutex
);
389 audio_output audio_pw
= {.name
= "pw",