endif
if USE_AIRPLAY_2
-shairport_sync_SOURCES += ap2_event_receiver.c ap2_rc_event_receiver.c ptp-utilities.c utilities/structured_buffer.c plists/get_info_response.c
+shairport_sync_SOURCES += ap2_buffered_audio_processor.c ap2_event_receiver.c ap2_rc_event_receiver.c ptp-utilities.c utilities/buffered_read.c utilities/structured_buffer.c utilities/mod23.c plists/get_info_response.c
shairport_sync_LDADD += lib_pair_ap.a
lib_pair_ap_a_SOURCES = pair_ap/pair.c pair_ap/pair_fruit.c pair_ap/pair_homekit.c pair_ap/pair-tlv.c
noinst_LIBRARIES += lib_pair_ap.a
--- /dev/null
+/*
+ * AirPlay 2 Buffered Audio Processor. This file is part of Shairport Sync
+ * Copyright (c) Mike Brady 2025
+ * All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include "ap2_buffered_audio_processor.h"
+#include "common.h"
+#include "player.h"
+#include "rtp.h"
+#include "utilities/buffered_read.h"
+#include "utilities/mod23.h"
+#include <sodium.h>
+#include <stdint.h>
+
+#ifdef CONFIG_CONVOLUTION
+#include "FFTConvolver/convolver.h"
+#endif
+
+void addADTStoPacket(uint8_t *packet, int packetLen, int rate, int channel_configuration) {
+ // https://stackoverflow.com/questions/18862715/how-to-generate-the-aac-adts-elementary-stream-with-android-mediacodec
+ // with thanks!
+
+ // See https://wiki.multimedia.cx/index.php/Understanding_AAC
+ // see also https://wiki.multimedia.cx/index.php/ADTS for the ADTS layout
+ // see https://wiki.multimedia.cx/index.php/MPEG-4_Audio#Sampling_Frequencies for sampling
+ // frequencies
+
+ /**
+ * Add ADTS header at the beginning of each and every AAC packet.
+ * This is needed as the packet is raw AAC data.
+ *
+ * Note the packetLen must count in the ADTS header itself.
+ **/
+
+ int profile = 2;
+ int freqIdx = 4;
+ if (rate == 44100)
+ freqIdx = 4;
+ else if (rate == 48000)
+ freqIdx = 3;
+ else
+ debug(1, "Unsupported AAC sample rate %d.", rate);
+
+ // Channel Configuration
+ // https://wiki.multimedia.cx/index.php/MPEG-4_Audio#Channel_Configurations
+ // clang-format off
+ // 0: Defined in AOT Specifc Config
+ // 1: 1 channel: front-center
+ // 2: 2 channels: front-left, front-right
+ // 3: 3 channels: front-center, front-left, front-right
+ // 4: 4 channels: front-center, front-left, front-right, back-center
+ // 5: 5 channels: front-center, front-left, front-right, back-left, back-right
+ // 6: 6 channels: front-center, front-left, front-right, back-left, back-right, LFE-channel
+ // 7: 8 channels: front-center, front-left, front-right, side-left, side-right, back-left, back-right, LFE-channel
+ // 8-15: Reserved
+ // clang-format on
+
+ int chanCfg = channel_configuration; // CPE
+
+ // fill in ADTS data
+ packet[0] = 0xFF;
+ packet[1] = 0xF9;
+ packet[2] = ((profile - 1) << 6) + (freqIdx << 2) + (chanCfg >> 2);
+ packet[3] = ((chanCfg & 3) << 6) + (packetLen >> 11);
+ packet[4] = (packetLen & 0x7FF) >> 3;
+ packet[5] = ((packetLen & 7) << 5) + 0x1F;
+ packet[6] = 0xFC;
+}
+
+void rtp_buffered_audio_cleanup_handler(__attribute__((unused)) void *arg) {
+ debug(2, "Buffered Audio Receiver Cleanup Start.");
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ close(conn->buffered_audio_socket);
+ debug(1, "Connection %d: closing TCP Buffered Audio port: %u.", conn->connection_number,
+ conn->local_buffered_audio_port);
+ conn->buffered_audio_socket = 0;
+ debug(1, "Connection %d: rtp_buffered_audio_processor exit.", conn->connection_number);
+}
+
+void *rtp_buffered_audio_processor(void *arg) {
+ // #include <syscall.h>
+ // debug(1, "rtp_buffered_audio_processor PID %d", syscall(SYS_gettid));
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+
+ conn->incoming_ssrc = 0; // reset
+ conn->resampler_ssrc = 0;
+
+ // turn off all flush requests that might have been pending in the connection. Not sure if this is
+ // right...
+ unsigned int fr = 0;
+ for (fr = 0; fr < MAX_DEFERRED_FLUSH_REQUESTS; fr++) {
+ conn->ap2_deferred_flush_requests[fr].inUse = 0;
+ conn->ap2_deferred_flush_requests[fr].active = 0;
+ }
+ conn->ap2_immediate_flush_requested = 0;
+
+ pthread_cleanup_push(rtp_buffered_audio_cleanup_handler, arg);
+
+ pthread_t *buffered_reader_thread = malloc(sizeof(pthread_t));
+ if (buffered_reader_thread == NULL)
+ debug(1, "cannot allocate a buffered_reader_thread!");
+ memset(buffered_reader_thread, 0, sizeof(pthread_t));
+ pthread_cleanup_push(malloc_cleanup, &buffered_reader_thread);
+
+ buffered_tcp_desc *buffered_audio = malloc(sizeof(buffered_tcp_desc));
+ if (buffered_audio == NULL)
+ debug(1, "cannot allocate a buffered_tcp_desc!");
+ // initialise the descriptor
+ memset(buffered_audio, 0, sizeof(buffered_tcp_desc));
+ pthread_cleanup_push(malloc_cleanup, &buffered_audio);
+
+ if (pthread_mutex_init(&buffered_audio->mutex, NULL))
+ debug(1, "Connection %d: error %d initialising buffered_audio mutex.", conn->connection_number,
+ errno);
+ pthread_cleanup_push(mutex_cleanup, &buffered_audio->mutex);
+
+ if (pthread_cond_init(&buffered_audio->not_empty_cv, NULL))
+ die("Connection %d: error %d initialising not_empty cv.", conn->connection_number, errno);
+ pthread_cleanup_push(cv_cleanup, &buffered_audio->not_empty_cv);
+
+ if (pthread_cond_init(&buffered_audio->not_full_cv, NULL))
+ die("Connection %d: error %d initialising not_full cv.", conn->connection_number, errno);
+ pthread_cleanup_push(cv_cleanup, &buffered_audio->not_full_cv);
+
+ // initialise the buffer data structure
+ buffered_audio->buffer_max_size = conn->ap2_audio_buffer_size;
+ buffered_audio->buffer = malloc(conn->ap2_audio_buffer_size);
+ if (buffered_audio->buffer == NULL)
+ debug(1, "cannot allocate an audio buffer of %u bytes!", buffered_audio->buffer_max_size);
+ pthread_cleanup_push(malloc_cleanup, &buffered_audio->buffer);
+
+ // pthread_mutex_lock(&conn->buffered_audio_mutex);
+ buffered_audio->toq = buffered_audio->buffer;
+ buffered_audio->eoq = buffered_audio->buffer;
+
+ buffered_audio->sock_fd = conn->buffered_audio_socket;
+
+ named_pthread_create(buffered_reader_thread, NULL, &buffered_tcp_reader, buffered_audio,
+ "ap2_buf_rdr_%d", conn->connection_number);
+ pthread_cleanup_push(thread_cleanup, buffered_reader_thread);
+
+ const size_t leading_free_space_length =
+ 256; // leave this many bytes free to make room for prefixes that might be added later
+ uint8_t packet[32 * 1024];
+ unsigned char m[32 * 1024 + leading_free_space_length];
+
+ unsigned char *payload_pointer = NULL;
+ unsigned long long payload_length = 0;
+ uint32_t payload_ssrc =
+ SSRC_NONE; // this is the SSRC of the payload, needed to decide if it should be muted
+ uint32_t previous_ssrc = SSRC_NONE;
+
+ uint32_t seq_no =
+ 0; // audio packet number. Initialised to avoid a "possibly uninitialised" warning.
+ uint32_t previous_seqno = 0;
+ uint16_t sequence_number_for_player = 0;
+
+ uint32_t timestamp = 0; // initialised to avoid a "possibly uninitialised" warning.
+ uint32_t previous_timestamp = 0;
+
+ uint32_t expected_timestamp = 0;
+ uint64_t previous_buffer_should_be_time = 0;
+
+ ssize_t nread;
+
+ int new_audio_block_needed = 0; // goes true when a block is needed, false one is read in, but
+ // will be made true by flushing or by playing the block
+ int finished = 0;
+
+ uint64_t blocks_read_since_play_began = 0;
+ uint64_t blocks_read = 0;
+
+ int ap2_immediate_flush_requested = 0; // for diagnostics, probably
+
+ uint32_t first_timestamp_in_this_sequence = 0;
+ int packets_played_in_this_sequence = 0;
+
+ int play_enabled = 0;
+ // double requested_lead_time = 0.0; // normal lead time minimum -- maybe it should be about 0.1
+
+ // wait until our timing information is valid
+ while (have_ptp_timing_information(conn) == 0)
+ usleep(1000);
+
+ reset_buffer(conn); // in case there is any garbage in the player
+
+ do {
+
+ if ((play_enabled == 0) && (conn->ap2_play_enabled != 0)) {
+ // play newly started
+ debug(2, "Play started.");
+ new_audio_block_needed = 1;
+ blocks_read_since_play_began = 0;
+ }
+
+ if ((play_enabled != 0) && (conn->ap2_play_enabled == 0)) {
+ debug(2, "Play stopped.");
+ packets_played_in_this_sequence = 0; // not all blocks read are played...
+#ifdef CONFIG_CONVOLUTION
+ convolver_clear_state();
+#endif
+ reset_buffer(conn); // stop play ASAP
+ }
+
+ play_enabled = conn->ap2_play_enabled;
+
+ // now, if get_next_block is non-zero, read a block. We may flush or use it
+
+ if (new_audio_block_needed != 0) {
+ // a block is preceded by its length in a uint16_t
+ uint16_t data_len;
+ // here we read from the buffer that our thread has been reading
+
+ size_t bytes_remaining_in_buffer;
+ nread = lread_sized_block(buffered_audio, &data_len, sizeof(data_len),
+ &bytes_remaining_in_buffer);
+ data_len = ntohs(data_len);
+
+ // diagnostic
+ if ((conn->ap2_audio_buffer_minimum_size < 0) ||
+ (bytes_remaining_in_buffer < (size_t)conn->ap2_audio_buffer_minimum_size))
+ conn->ap2_audio_buffer_minimum_size = bytes_remaining_in_buffer;
+
+ if (nread > 0) {
+ // get the block itself
+ // debug(1,"buffered audio packet of size %u detected.", data_len - 2);
+ nread = lread_sized_block(buffered_audio, packet, data_len - 2, &bytes_remaining_in_buffer);
+
+ // diagnostic
+ if ((conn->ap2_audio_buffer_minimum_size < 0) ||
+ (bytes_remaining_in_buffer < (size_t)conn->ap2_audio_buffer_minimum_size))
+ conn->ap2_audio_buffer_minimum_size = bytes_remaining_in_buffer;
+ // debug(1, "buffered audio packet of size %u received.", nread);
+
+ if (nread > 0) {
+ // got the block
+ blocks_read++; // note, this doesn't mean they are valid audio blocks
+ blocks_read_since_play_began++; // 1 means previous seq_no and timestamps are invalid
+
+ // get the sequence number
+ // see https://en.wikipedia.org/wiki/Real-time_Transport_Protocol#Packet_header
+ // the Marker bit is always set, and it and the remaining 23 bits form the sequence number
+
+ previous_seqno = seq_no;
+ seq_no = nctohl(&packet[0]) & 0x7FFFFF;
+
+ previous_timestamp = timestamp;
+ timestamp = nctohl(&packet[4]);
+
+ previous_ssrc = payload_ssrc;
+ payload_ssrc = nctohl(&packet[8]);
+
+ if (blocks_read_since_play_began == 1) {
+ debug(2, "Preparing initial decoding chain for %s.", get_ssrc_name(payload_ssrc));
+ prepare_decoding_chain(conn, payload_ssrc);
+ sequence_number_for_player =
+ seq_no & 0xffff; // this is arbitrary -- the sequence_number_for_player numbers will
+ // be sequential irrespective of seq_no jumps...
+ }
+
+ if (blocks_read_since_play_began > 1) {
+
+ if (payload_ssrc != previous_ssrc) {
+ if (ssrc_is_recognised(payload_ssrc) == 0) {
+ debug(2, "Unrecognised SSRC: %u.", payload_ssrc);
+ } else {
+ debug(2,
+ "Reading a block: new encoding: %s, old encoding: %s. Preparing a new "
+ "decoding chain.",
+ get_ssrc_name(payload_ssrc), get_ssrc_name(previous_ssrc));
+ prepare_decoding_chain(conn, payload_ssrc);
+ }
+ }
+
+ uint32_t t_expected_seqno = (previous_seqno + 1) & 0x7fffff;
+ if (t_expected_seqno != seq_no) {
+ debug(2,
+ "reading block %u, the sequence number differs from the expected sequence "
+ "number %u. The previous sequence number was %u",
+ seq_no, t_expected_seqno, previous_seqno);
+ }
+ uint32_t t_expected_timestamp =
+ previous_timestamp + get_ssrc_block_length(previous_ssrc);
+ int32_t diff = timestamp - t_expected_timestamp;
+ if (diff != 0) {
+ debug(2, "reading block %u, the timestamp %u differs from expected_timestamp %u.",
+ seq_no, timestamp, t_expected_timestamp);
+ }
+ }
+ new_audio_block_needed = 0; // block has been read.
+ }
+ }
+
+ if (nread == 0) {
+ // nread is 0 -- the port has been closed
+ debug(1, "buffered audio port closed!");
+ finished = 1;
+ } else if (nread < 0) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data_len .",
+ errno, errorstring);
+ finished = 1;
+ }
+ }
+
+ if (finished == 0) {
+ pthread_cleanup_debug_mutex_lock(&conn->flush_mutex, 25000,
+ 1); // 25 ms is a long time to wait!
+ if (blocks_read != 0) {
+ if (conn->ap2_immediate_flush_requested != 0) {
+ if (ap2_immediate_flush_requested == 0) {
+ debug(2, "immediate flush started at sequence number %u until sequence number of %u.",
+ seq_no, conn->ap2_immediate_flush_until_sequence_number);
+ }
+ if ((blocks_read != 0) && (seq_no == conn->ap2_immediate_flush_until_sequence_number)) {
+ debug(2, "immediate flush complete at seq_no of %u.", seq_no);
+
+ conn->ap2_immediate_flush_requested = 0;
+ ap2_immediate_flush_requested = 0;
+
+ /*
+ // turn off all deferred requests. Not sure if this is right...
+ unsigned int f = 0;
+ for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) {
+ conn->ap2_deferred_flush_requests[f].inUse = 0;
+ conn->ap2_deferred_flush_requests[f].active = 0;
+ }
+ */
+
+ } else {
+ debug(3, "immediate flush of block %u until block %u", seq_no,
+ conn->ap2_immediate_flush_until_sequence_number);
+ ap2_immediate_flush_requested = 1;
+ new_audio_block_needed = 1; //
+ }
+ }
+ }
+
+ // now, even if an immediate flush has been requested and is active, we still need to process
+ // deferred flush requests as they may refer to sequences that are going to be purged anyway
+
+ unsigned int f = 0;
+ for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) {
+ if (conn->ap2_deferred_flush_requests[f].inUse != 0) {
+ if ((conn->ap2_deferred_flush_requests[f].flushFromSeq == seq_no) &&
+ (conn->ap2_deferred_flush_requests[f].flushUntilSeq != seq_no)) {
+ debug(2,
+ "deferred flush activated: flushFromTS: %12u, flushFromSeq: %12u, "
+ "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
+ conn->ap2_deferred_flush_requests[f].flushFromTS,
+ conn->ap2_deferred_flush_requests[f].flushFromSeq,
+ conn->ap2_deferred_flush_requests[f].flushUntilTS,
+ conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
+ conn->ap2_deferred_flush_requests[f].active = 1;
+ new_audio_block_needed = 1;
+ }
+ if (conn->ap2_deferred_flush_requests[f].flushUntilSeq == seq_no) {
+ debug(2,
+ "deferred flush terminated: flushFromTS: %12u, flushFromSeq: %12u, "
+ "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
+ conn->ap2_deferred_flush_requests[f].flushFromTS,
+ conn->ap2_deferred_flush_requests[f].flushFromSeq,
+ conn->ap2_deferred_flush_requests[f].flushUntilTS,
+ conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
+ conn->ap2_deferred_flush_requests[f].active = 0;
+ conn->ap2_deferred_flush_requests[f].inUse = 0;
+ } else if (a_minus_b_mod23(seq_no, conn->ap2_deferred_flush_requests[f].flushUntilSeq) >
+ 0) {
+ // now, do a modulo 2^23 unsigned int calculation to see if we may have overshot the
+ // flushUntilSeq
+ debug(2,
+ "deferred flush terminated due to overshoot at block %u: flushFromTS: %12u, "
+ "flushFromSeq: %12u, "
+ "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
+ seq_no, conn->ap2_deferred_flush_requests[f].flushFromTS,
+ conn->ap2_deferred_flush_requests[f].flushFromSeq,
+ conn->ap2_deferred_flush_requests[f].flushUntilTS,
+ conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
+ conn->ap2_deferred_flush_requests[f].active = 0;
+ conn->ap2_deferred_flush_requests[f].inUse = 0;
+ debug(2, "immediate flush was %s.", ap2_immediate_flush_requested == 0 ? "off" : "on");
+ } else if (conn->ap2_deferred_flush_requests[f].active != 0) {
+ new_audio_block_needed = 1;
+ debug(3,
+ "deferred flush of block: %u. flushFromTS: %12u, flushFromSeq: %12u, "
+ "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
+ seq_no, conn->ap2_deferred_flush_requests[f].flushFromTS,
+ conn->ap2_deferred_flush_requests[f].flushFromSeq,
+ conn->ap2_deferred_flush_requests[f].flushUntilTS,
+ conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
+ }
+ }
+ }
+ pthread_cleanup_pop(1); // the mutex
+
+ // now, if the block is not invalidated by the flush code, see if we need
+ // to decode it and pass it to the player
+ if (new_audio_block_needed == 0) {
+ // is there space in the player thread's buffer system?
+ unsigned int player_buffer_size, player_buffer_occupancy;
+ get_audio_buffer_size_and_occupancy(&player_buffer_size, &player_buffer_occupancy, conn);
+ // debug(1,"player buffer size and occupancy: %u and %u", player_buffer_size,
+ // player_buffer_occupancy);
+
+ // If we are playing and there is room in the player buffer, go ahead and decode the block
+ // and send it to the player. Otherwise, keep the block and sleep for a while.
+ if ((play_enabled != 0) &&
+ (player_buffer_occupancy <= 2 * ((config.audio_backend_buffer_desired_length) *
+ conn->input_rate / conn->frames_per_packet))) {
+ uint64_t buffer_should_be_time;
+ frame_to_local_time(timestamp, &buffer_should_be_time, conn);
+
+ // try to identify blocks that are timed to before the last buffer, and drop 'em
+ int64_t time_from_last_buffer_time =
+ buffer_should_be_time - previous_buffer_should_be_time;
+
+ if ((packets_played_in_this_sequence == 0) || (time_from_last_buffer_time > 0)) {
+ int64_t lead_time = buffer_should_be_time - get_absolute_time_in_ns();
+ payload_length = 0;
+ if (ssrc_is_recognised(payload_ssrc) != 0) {
+ prepare_decoding_chain(conn, payload_ssrc);
+ unsigned long long new_payload_length = 0;
+ payload_pointer = m + leading_free_space_length;
+ if ((lead_time < (int64_t)30000000000L) &&
+ (lead_time >= 0)) { // only decipher the packet if it's not too late or too early
+ int response = -1; // guess that there is a problem
+ if (conn->session_key != NULL) {
+ unsigned char nonce[12];
+ memset(nonce, 0, sizeof(nonce));
+ memcpy(
+ nonce + 4, packet + nread - 8,
+ 8); // front-pad the 8-byte nonce received to get the 12-byte nonce expected
+
+ // https://libsodium.gitbook.io/doc/secret-key_cryptography/aead/chacha20-poly1305/ietf_chacha20-poly1305_construction
+ // Note: the eight-byte nonce must be front-padded out to 12 bytes.
+
+ // Leave leading_free_space_length bytes at the start for possible headers like an
+ // ADTS header (7 bytes)
+ memset(m, 0, leading_free_space_length);
+ response = crypto_aead_chacha20poly1305_ietf_decrypt(
+ payload_pointer, // where the decrypted payload will start
+ &new_payload_length, // mlen_p
+ NULL, // nsec,
+ packet +
+ 12, // the ciphertext starts 12 bytes in and is followed by the MAC tag,
+ nread - (8 + 12), // clen -- the last 8 bytes are the nonce
+ packet + 4, // authenticated additional data
+ 8, // authenticated additional data length
+ nonce,
+ conn->session_key); // *k
+ if (response != 0)
+ debug(1, "Error decrypting audio packet %u -- packet length %d.", seq_no,
+ nread);
+ } else {
+ debug(2, "No session key, so the audio packet can not be deciphered -- skipped.");
+ }
+
+ if ((response == 0) && (new_payload_length > 0)) {
+ // now we have the deciphered block, so send it to the player if we can
+ payload_length = new_payload_length;
+
+ if (ssrc_is_aac(payload_ssrc)) {
+ payload_pointer =
+ payload_pointer - 7; // including the 7-byte leader for the ADTS
+ payload_length = payload_length + 7;
+
+ // now, fill in the 7-byte ADTS information, which seems to be needed by the
+ // decoder we made room for it in the front of the buffer by filling from m + 7.
+ int channelConfiguration = 2; // 2: 2 channels: front-left, front-right
+ if (payload_ssrc == AAC_48000_F24_5P1)
+ channelConfiguration = 6; // 6: 6 channels: front-center, front-left,
+ // front-right, back-left, back-right, LFE-channel
+ else if (payload_ssrc == AAC_48000_F24_7P1)
+ channelConfiguration =
+ 7; // 7: 8 channels: front-center, front-left, front-right,
+ // side-left, side-right, back-left, back-right, LFE-channel
+ addADTStoPacket(payload_pointer, payload_length, conn->input_rate,
+ channelConfiguration);
+ }
+ int mute =
+ ((packets_played_in_this_sequence == 0) && (ssrc_is_aac(payload_ssrc)));
+ if (mute) {
+ debug(2, "Connection %d: muting first AAC block -- block %u -- timestamp %u.",
+ conn->connection_number, seq_no, timestamp);
+ }
+ int32_t timestamp_difference = 0;
+ if (packets_played_in_this_sequence == 0) {
+ // first_block_in_this_sequence = seq_no;
+ first_timestamp_in_this_sequence = timestamp;
+ debug(2,
+ "Connection %d: "
+ "first block %u, first timestamp %u.",
+ conn->connection_number, seq_no, timestamp);
+ } else {
+ timestamp_difference = timestamp - expected_timestamp;
+ if (timestamp_difference != 0) {
+ debug(2,
+ "Connection %d: "
+ "unexpected timestamp in block %u. Actual: %u, expected: %u "
+ "difference: %d, "
+ "%f ms. "
+ "Positive means later, i.e. a gap. First timestamp was %u, payload "
+ "type: \"%s\".",
+ conn->connection_number, seq_no, timestamp, expected_timestamp,
+ timestamp_difference, 1000.0 * timestamp_difference / conn->input_rate,
+ first_timestamp_in_this_sequence, get_ssrc_name(payload_ssrc));
+ // mute the first packet after a discontinuity
+ if (ssrc_is_aac(payload_ssrc)) {
+ debug(2,
+ "Connection %d: muting first AAC block -- block %u -- following a "
+ "timestamp discontinuity, timestamp %u.",
+ conn->connection_number, seq_no, timestamp);
+ mute = 1;
+ }
+ }
+ }
+ int skip_this_block = 0;
+ if (timestamp_difference < 0) {
+ int32_t abs_timestamp_difference = -timestamp_difference;
+ if ((size_t)abs_timestamp_difference > get_ssrc_block_length(payload_ssrc)) {
+ skip_this_block = 1;
+ debug(2,
+ "skipping block %u because it was too far in the past. Timestamp "
+ "difference: %d, length of block: %u.",
+ seq_no, timestamp_difference, get_ssrc_block_length(payload_ssrc));
+ }
+ }
+ if (skip_this_block == 0) {
+ uint32_t packet_size = player_put_packet(
+ payload_ssrc, sequence_number_for_player, timestamp, payload_pointer,
+ payload_length, mute, timestamp_difference, conn);
+ debug(3, "block %u, timestamp %u, length %u sent to the player.", seq_no,
+ timestamp, packet_size);
+ sequence_number_for_player++; // simply increment
+ expected_timestamp = timestamp + packet_size; // for the next time
+ packets_played_in_this_sequence++;
+ }
+ }
+ } else {
+ debug(3,
+ "skipped deciphering block %u with timestamp %u because its lead time is "
+ "out of range at %f "
+ "seconds.",
+ seq_no, timestamp, lead_time * 1.0E-9);
+ uint32_t currentAnchorRTP = 0;
+ uint64_t currentAnchorLocalTime = 0;
+ if (get_ptp_anchor_local_time_info(conn, ¤tAnchorRTP,
+ ¤tAnchorLocalTime) == clock_ok) {
+ debug(3, "anchorRTP: %u, anchorLocalTime: % " PRIu64 ".", currentAnchorRTP,
+ currentAnchorLocalTime);
+ } else {
+ debug(3, "Clock not okay");
+ }
+ }
+ } else {
+ debug(2, "Unrecognised or invalid ssrc: %s.", get_ssrc_name(payload_ssrc));
+ }
+ } else {
+ debug(1, "dropping buffer that should have played before the last actually played.");
+ }
+ new_audio_block_needed = 1; // the block has been used up and is no longer current
+ } else {
+ usleep(20000); // wait for a while
+ }
+ }
+ }
+ } while (finished == 0);
+ debug(2, "Buffered Audio Receiver RTP thread \"normal\" exit.");
+ pthread_cleanup_pop(1); // buffered_tcp_reader thread creation
+ pthread_cleanup_pop(1); // buffer malloc
+ pthread_cleanup_pop(1); // not_full_cv
+ pthread_cleanup_pop(1); // not_empty_cv
+ pthread_cleanup_pop(1); // mutex
+ pthread_cleanup_pop(1); // descriptor malloc
+ pthread_cleanup_pop(1); // pthread_t malloc
+ pthread_cleanup_pop(1); // do the cleanup.
+ pthread_exit(NULL);
+}
--- /dev/null
+#ifndef _AP2_BUFFERED_AUDIO_PROCESSOR_H
+#define _AP2_BUFFERED_AUDIO_PROCESSOR_H
+
+void *rtp_buffered_audio_processor(void *arg);
+
+#endif // _AP2_BUFFERED_AUDIO_PROCESSOR_H
uint64_t local_to_remote_time_jitter;
uint64_t local_to_remote_time_jitter_count;
-typedef struct {
- int closed;
- int error_code;
- int sock_fd;
- char *buffer;
- char *toq;
- char *eoq;
- size_t buffer_max_size;
- size_t buffer_occupancy;
- pthread_mutex_t mutex;
- pthread_cond_t not_empty_cv;
- pthread_cond_t not_full_cv;
-} buffered_tcp_desc;
-
/*
char obf[4096];
char *obfp = obf;
pthread_exit(NULL);
}
-ssize_t buffered_read(buffered_tcp_desc *descriptor, void *buf, size_t count,
- size_t *bytes_remaining) {
- ssize_t response = -1;
- if (debug_mutex_lock(&descriptor->mutex, 50000, 1) != 0)
- debug(1, "problem with mutex");
- pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
- // wipe the slate dlean before reading...
- descriptor->error_code = 0;
- descriptor->closed = 0;
-
- if (descriptor->buffer_occupancy == 0) {
- debug(2, "buffered_read: buffer empty -- waiting for %u bytes.", count);
- }
-
- while ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0) &&
- (descriptor->closed == 0)) {
- if (pthread_cond_wait(&descriptor->not_empty_cv, &descriptor->mutex))
- debug(1, "Error waiting for buffered read");
- else
- debug(2, "buffered_read: signalled with %u bytes after waiting.",
- descriptor->buffer_occupancy);
- }
-
- if (descriptor->error_code) {
- errno = descriptor->error_code;
- debug(1, "buffered_read: error %d.", errno);
- response = -1;
- } else if (descriptor->closed != 0) {
- debug(1, "buffered_read: connection closed.");
- errno = 0; // no error -- just closed
- response = 0;
- } else if (descriptor->buffer_occupancy != 0) {
- ssize_t bytes_to_move = count;
-
- if (descriptor->buffer_occupancy < count) {
- bytes_to_move = descriptor->buffer_occupancy;
- }
-
- ssize_t top_gap = descriptor->buffer + descriptor->buffer_max_size - descriptor->toq;
- if (top_gap < bytes_to_move)
- bytes_to_move = top_gap;
-
- memcpy(buf, descriptor->toq, bytes_to_move);
- descriptor->toq += bytes_to_move;
- if (descriptor->toq == descriptor->buffer + descriptor->buffer_max_size)
- descriptor->toq = descriptor->buffer;
- descriptor->buffer_occupancy -= bytes_to_move;
- if (bytes_remaining != NULL)
- *bytes_remaining = descriptor->buffer_occupancy;
- response = bytes_to_move;
- if (pthread_cond_signal(&descriptor->not_full_cv))
- debug(1, "Error signalling");
- }
-
- pthread_cleanup_pop(1); // release the mutex
- return response;
-}
-
-#define STANDARD_PACKET_SIZE 4096
-
-void buffered_tcp_reader_cleanup_handler(__attribute__((unused)) void *arg) {
- debug(2, "Buffered TCP Reader Thread Exit via Cleanup.");
-}
-
-void *buffered_tcp_reader(void *arg) {
- // #include <syscall.h>
- // debug(1, "buffered_tcp_reader PID %d", syscall(SYS_gettid));
- pthread_cleanup_push(buffered_tcp_reader_cleanup_handler, NULL);
- buffered_tcp_desc *descriptor = (buffered_tcp_desc *)arg;
-
- // listen(descriptor->sock_fd, 5); // this is done in the handle_setup_2 code to ensure it's open
- // when the client hears about it...
- ssize_t nread;
- SOCKADDR remote_addr;
- memset(&remote_addr, 0, sizeof(remote_addr));
- socklen_t addr_size = sizeof(remote_addr);
- int finished = 0;
- int fd = accept(descriptor->sock_fd, (struct sockaddr *)&remote_addr, &addr_size);
- // debug(1, "buffered_tcp_reader: the client has opened a buffered audio link.");
- intptr_t pfd = fd;
- pthread_cleanup_push(socket_cleanup, (void *)pfd);
-
- do {
- int have_time_to_sleep = 0;
- if (debug_mutex_lock(&descriptor->mutex, 500000, 1) != 0)
- debug(1, "problem with mutex");
- pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
- while (descriptor->buffer_occupancy == descriptor->buffer_max_size) {
- if (pthread_cond_wait(&descriptor->not_full_cv, &descriptor->mutex))
- debug(1, "Error waiting for not_full_cv");
- }
- pthread_cleanup_pop(1); // release the mutex
-
- // now we know it is not full, so go ahead and try to read some more into it
-
- // wrap
- if ((size_t)(descriptor->eoq - descriptor->buffer) == descriptor->buffer_max_size)
- descriptor->eoq = descriptor->buffer;
-
- // figure out how much to ask for
- size_t bytes_to_request = STANDARD_PACKET_SIZE;
- size_t free_space = descriptor->buffer_max_size - descriptor->buffer_occupancy;
- if (bytes_to_request > free_space)
- bytes_to_request = free_space; // don't ask for more than will fit
-
- size_t gap_to_end_of_buffer =
- descriptor->buffer + descriptor->buffer_max_size - descriptor->eoq;
- if (gap_to_end_of_buffer < bytes_to_request)
- bytes_to_request =
- gap_to_end_of_buffer; // only ask for what will fill to the top of the buffer
-
- // do the read
- if (descriptor->buffer_occupancy == 0)
- debug(2, "recv of up to %d bytes with an buffer empty.", bytes_to_request);
- nread = recv(fd, descriptor->eoq, bytes_to_request, 0);
- // debug(1, "Received %d bytes for a buffer size of %d bytes.",nread,
- // descriptor->buffer_occupancy + nread);
- if (debug_mutex_lock(&descriptor->mutex, 50000, 1) != 0)
- debug(1, "problem with not empty mutex");
- pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
- if (nread < 0) {
- char errorstring[1024];
- strerror_r(errno, (char *)errorstring, sizeof(errorstring));
- debug(1, "error in buffered_tcp_reader %d: \"%s\". Could not recv a packet.", errno,
- errorstring);
- descriptor->error_code = errno;
- } else if (nread == 0) {
- descriptor->closed = 1;
- debug(
- 1,
- "buffered audio port closed by remote end. Terminating the buffered_tcp_reader thread.");
- finished = 1;
- } else if (nread > 0) {
- descriptor->eoq += nread;
- descriptor->buffer_occupancy += nread;
- }
-
- // signal if we got data or an error or the file closed
- if (pthread_cond_signal(&descriptor->not_empty_cv))
- debug(1, "Error signalling");
- if (descriptor->buffer_occupancy > 16384)
- have_time_to_sleep = 1;
- pthread_cleanup_pop(1); // release the mutex
- if (have_time_to_sleep)
- usleep(10000); // give other threads a chance to run...
- } while (finished == 0);
-
- debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit Begin.");
- pthread_cleanup_pop(1); // close the socket
- pthread_cleanup_pop(1); // cleanup
- debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit.");
- pthread_exit(NULL);
-}
-
-// this will read a block of the size specified to the buffer
-// and will return either with the block or on error
-ssize_t lread_sized_block(buffered_tcp_desc *descriptor, void *buf, size_t count,
- size_t *bytes_remaining) {
- ssize_t response, nread;
- size_t inbuf = 0; // bytes already in the buffer
- int keep_trying = 1;
-
- do {
- nread = buffered_read(descriptor, buf + inbuf, count - inbuf, bytes_remaining);
- if (nread == 0) {
- // a blocking read that returns zero means eof -- implies connection closed
- debug(1, "read_sized_block connection closed.");
- keep_trying = 0;
- } else if (nread < 0) {
- if (errno == EAGAIN) {
- debug(1, "read_sized_block getting Error 11 -- EAGAIN from a blocking read!");
- }
- if ((errno != EAGAIN) && (errno != EINTR)) {
- char errorstring[1024];
- strerror_r(errno, (char *)errorstring, sizeof(errorstring));
- debug(1, "read_sized_block read error %d: \"%s\".", errno, (char *)errorstring);
- keep_trying = 0;
- }
- } else {
- inbuf += (size_t)nread;
- }
- } while ((keep_trying != 0) && (inbuf < count));
- if (nread <= 0)
- response = nread;
- else
- response = inbuf;
- return response;
-}
-
-// https://stackoverflow.com/questions/18862715/how-to-generate-the-aac-adts-elementary-stream-with-android-mediacodec
-// with thanks!
-
-// See https://wiki.multimedia.cx/index.php/Understanding_AAC
-// see also https://wiki.multimedia.cx/index.php/ADTS for the ADTS layout
-// see https://wiki.multimedia.cx/index.php/MPEG-4_Audio#Sampling_Frequencies for sampling
-// frequencies
-
-/**
- * Add ADTS header at the beginning of each and every AAC packet.
- * This is needed as the packet is raw AAC data.
- *
- * Note the packetLen must count in the ADTS header itself.
- **/
-
-void addADTStoPacket(uint8_t *packet, int packetLen, int rate, int channel_configuration) {
- int profile = 2;
- int freqIdx = 4;
- if (rate == 44100)
- freqIdx = 4;
- else if (rate == 48000)
- freqIdx = 3;
- else
- debug(1, "Unsupported AAC sample rate %d.", rate);
-
- // Channel Configuration
- // https://wiki.multimedia.cx/index.php/MPEG-4_Audio#Channel_Configurations
- // clang-format off
- // 0: Defined in AOT Specifc Config
- // 1: 1 channel: front-center
- // 2: 2 channels: front-left, front-right
- // 3: 3 channels: front-center, front-left, front-right
- // 4: 4 channels: front-center, front-left, front-right, back-center
- // 5: 5 channels: front-center, front-left, front-right, back-left, back-right
- // 6: 6 channels: front-center, front-left, front-right, back-left, back-right, LFE-channel
- // 7: 8 channels: front-center, front-left, front-right, side-left, side-right, back-left, back-right, LFE-channel
- // 8-15: Reserved
- // clang-format on
-
- int chanCfg = channel_configuration; // CPE
-
- // fill in ADTS data
- packet[0] = 0xFF;
- packet[1] = 0xF9;
- packet[2] = ((profile - 1) << 6) + (freqIdx << 2) + (chanCfg >> 2);
- packet[3] = ((chanCfg & 3) << 6) + (packetLen >> 11);
- packet[4] = (packetLen & 0x7FF) >> 3;
- packet[5] = ((packetLen & 7) << 5) + 0x1F;
- packet[6] = 0xFC;
-}
-
-void rtp_buffered_audio_cleanup_handler(__attribute__((unused)) void *arg) {
- debug(2, "Buffered Audio Receiver Cleanup Start.");
- rtsp_conn_info *conn = (rtsp_conn_info *)arg;
- close(conn->buffered_audio_socket);
- debug(1, "Connection %d: closing TCP Buffered Audio port: %u.", conn->connection_number,
- conn->local_buffered_audio_port);
- conn->buffered_audio_socket = 0;
- debug(1, "Connection %d: rtp_buffered_audio_processor exit.", conn->connection_number);
-}
-
-#define MOD_23BIT 0x7FFFFF // 2^23 - 1
-
-// Assumes 'a' and 'b' are within 2^22 of each other
-int32_t a_minus_b_mod23(uint32_t a, uint32_t b) {
-
- // Mask to 23 bits
- a &= MOD_23BIT;
- b &= MOD_23BIT;
-
- // Compute difference modulo 2^23
- uint32_t diff = (a - b) & MOD_23BIT;
-
- // Interpret as signed 23-bit value
- // If the top bit (bit 22) is set, it's negative
- int32_t signed_diff = (diff & 0x400000) ? (diff | 0xFF800000) : diff;
-
- return signed_diff;
-}
-
-void *rtp_buffered_audio_processor(void *arg) {
- // #include <syscall.h>
- // debug(1, "rtp_buffered_audio_processor PID %d", syscall(SYS_gettid));
- rtsp_conn_info *conn = (rtsp_conn_info *)arg;
-
- conn->incoming_ssrc = 0; // reset
- conn->resampler_ssrc = 0;
-
- // turn off all flush requests that might have been pending in the connection. Not sure if this is
- // right...
- unsigned int fr = 0;
- for (fr = 0; fr < MAX_DEFERRED_FLUSH_REQUESTS; fr++) {
- conn->ap2_deferred_flush_requests[fr].inUse = 0;
- conn->ap2_deferred_flush_requests[fr].active = 0;
- }
- conn->ap2_immediate_flush_requested = 0;
-
- pthread_cleanup_push(rtp_buffered_audio_cleanup_handler, arg);
-
- pthread_t *buffered_reader_thread = malloc(sizeof(pthread_t));
- if (buffered_reader_thread == NULL)
- debug(1, "cannot allocate a buffered_reader_thread!");
- memset(buffered_reader_thread, 0, sizeof(pthread_t));
- pthread_cleanup_push(malloc_cleanup, &buffered_reader_thread);
-
- buffered_tcp_desc *buffered_audio = malloc(sizeof(buffered_tcp_desc));
- if (buffered_audio == NULL)
- debug(1, "cannot allocate a buffered_tcp_desc!");
- // initialise the descriptor
- memset(buffered_audio, 0, sizeof(buffered_tcp_desc));
- pthread_cleanup_push(malloc_cleanup, &buffered_audio);
-
- if (pthread_mutex_init(&buffered_audio->mutex, NULL))
- debug(1, "Connection %d: error %d initialising buffered_audio mutex.", conn->connection_number,
- errno);
- pthread_cleanup_push(mutex_cleanup, &buffered_audio->mutex);
-
- if (pthread_cond_init(&buffered_audio->not_empty_cv, NULL))
- die("Connection %d: error %d initialising not_empty cv.", conn->connection_number, errno);
- pthread_cleanup_push(cv_cleanup, &buffered_audio->not_empty_cv);
-
- if (pthread_cond_init(&buffered_audio->not_full_cv, NULL))
- die("Connection %d: error %d initialising not_full cv.", conn->connection_number, errno);
- pthread_cleanup_push(cv_cleanup, &buffered_audio->not_full_cv);
-
- // initialise the buffer data structure
- buffered_audio->buffer_max_size = conn->ap2_audio_buffer_size;
- buffered_audio->buffer = malloc(conn->ap2_audio_buffer_size);
- if (buffered_audio->buffer == NULL)
- debug(1, "cannot allocate an audio buffer of %u bytes!", buffered_audio->buffer_max_size);
- pthread_cleanup_push(malloc_cleanup, &buffered_audio->buffer);
-
- // pthread_mutex_lock(&conn->buffered_audio_mutex);
- buffered_audio->toq = buffered_audio->buffer;
- buffered_audio->eoq = buffered_audio->buffer;
-
- buffered_audio->sock_fd = conn->buffered_audio_socket;
-
- named_pthread_create(buffered_reader_thread, NULL, &buffered_tcp_reader, buffered_audio,
- "ap2_buf_rdr_%d", conn->connection_number);
- pthread_cleanup_push(thread_cleanup, buffered_reader_thread);
-
- const size_t leading_free_space_length =
- 256; // leave this many bytes free to make room for prefixes that might be added later
- uint8_t packet[32 * 1024];
- unsigned char m[32 * 1024 + leading_free_space_length];
-
- unsigned char *payload_pointer = NULL;
- unsigned long long payload_length = 0;
- uint32_t payload_ssrc =
- SSRC_NONE; // this is the SSRC of the payload, needed to decide if it should be muted
- uint32_t previous_ssrc = SSRC_NONE;
-
- uint32_t seq_no =
- 0; // audio packet number. Initialised to avoid a "possibly uninitialised" warning.
- uint32_t previous_seqno = 0;
- uint16_t sequence_number_for_player = 0;
-
- uint32_t timestamp = 0; // initialised to avoid a "possibly uninitialised" warning.
- uint32_t previous_timestamp = 0;
-
- uint32_t expected_timestamp = 0;
- uint64_t previous_buffer_should_be_time = 0;
-
- ssize_t nread;
-
- int new_audio_block_needed = 0; // goes true when a block is needed, false one is read in, but
- // will be made true by flushing or by playing the block
- int finished = 0;
-
- uint64_t blocks_read_since_play_began = 0;
- uint64_t blocks_read = 0;
-
- int ap2_immediate_flush_requested = 0; // for diagnostics, probably
-
- uint32_t first_timestamp_in_this_sequence = 0;
- int packets_played_in_this_sequence = 0;
-
- int play_enabled = 0;
- // double requested_lead_time = 0.0; // normal lead time minimum -- maybe it should be about 0.1
-
- // wait until our timing information is valid
- while (have_ptp_timing_information(conn) == 0)
- usleep(1000);
-
- reset_buffer(conn); // in case there is any garbage in the player
-
- do {
-
- if ((play_enabled == 0) && (conn->ap2_play_enabled != 0)) {
- // play newly started
- debug(2, "Play started.");
- new_audio_block_needed = 1;
- blocks_read_since_play_began = 0;
- }
-
- if ((play_enabled != 0) && (conn->ap2_play_enabled == 0)) {
- debug(2, "Play stopped.");
- packets_played_in_this_sequence = 0; // not all blocks read are played...
-#ifdef CONFIG_CONVOLUTION
- convolver_clear_state();
-#endif
- reset_buffer(conn); // stop play ASAP
- }
-
- play_enabled = conn->ap2_play_enabled;
-
- // now, if get_next_block is non-zero, read a block. We may flush or use it
-
- if (new_audio_block_needed != 0) {
- // a block is preceded by its length in a uint16_t
- uint16_t data_len;
- // here we read from the buffer that our thread has been reading
-
- size_t bytes_remaining_in_buffer;
- nread = lread_sized_block(buffered_audio, &data_len, sizeof(data_len),
- &bytes_remaining_in_buffer);
- data_len = ntohs(data_len);
-
- // diagnostic
- if ((conn->ap2_audio_buffer_minimum_size < 0) ||
- (bytes_remaining_in_buffer < (size_t)conn->ap2_audio_buffer_minimum_size))
- conn->ap2_audio_buffer_minimum_size = bytes_remaining_in_buffer;
-
- if (nread > 0) {
- // get the block itself
- // debug(1,"buffered audio packet of size %u detected.", data_len - 2);
- nread = lread_sized_block(buffered_audio, packet, data_len - 2, &bytes_remaining_in_buffer);
-
- // diagnostic
- if ((conn->ap2_audio_buffer_minimum_size < 0) ||
- (bytes_remaining_in_buffer < (size_t)conn->ap2_audio_buffer_minimum_size))
- conn->ap2_audio_buffer_minimum_size = bytes_remaining_in_buffer;
- // debug(1, "buffered audio packet of size %u received.", nread);
-
- if (nread > 0) {
- // got the block
- blocks_read++; // note, this doesn't mean they are valid audio blocks
- blocks_read_since_play_began++; // 1 means previous seq_no and timestamps are invalid
-
- // get the sequence number
- // see https://en.wikipedia.org/wiki/Real-time_Transport_Protocol#Packet_header
- // the Marker bit is always set, and it and the remaining 23 bits form the sequence number
-
- previous_seqno = seq_no;
- seq_no = nctohl(&packet[0]) & 0x7FFFFF;
-
- previous_timestamp = timestamp;
- timestamp = nctohl(&packet[4]);
-
- previous_ssrc = payload_ssrc;
- payload_ssrc = nctohl(&packet[8]);
-
- if (blocks_read_since_play_began == 1) {
- debug(2, "Preparing initial decoding chain for %s.", get_ssrc_name(payload_ssrc));
- prepare_decoding_chain(conn, payload_ssrc);
- sequence_number_for_player =
- seq_no & 0xffff; // this is arbitrary -- the sequence_number_for_player numbers will
- // be sequential irrespective of seq_no jumps...
- }
-
- if (blocks_read_since_play_began > 1) {
-
- if (payload_ssrc != previous_ssrc) {
- if (ssrc_is_recognised(payload_ssrc) == 0) {
- debug(2, "Unrecognised SSRC: %u.", payload_ssrc);
- } else {
- debug(2,
- "Reading a block: new encoding: %s, old encoding: %s. Preparing a new "
- "decoding chain.",
- get_ssrc_name(payload_ssrc), get_ssrc_name(previous_ssrc));
- prepare_decoding_chain(conn, payload_ssrc);
- }
- }
-
- uint32_t t_expected_seqno = (previous_seqno + 1) & 0x7fffff;
- if (t_expected_seqno != seq_no) {
- debug(2,
- "reading block %u, the sequence number differs from the expected sequence "
- "number %u. The previous sequence number was %u",
- seq_no, t_expected_seqno, previous_seqno);
- }
- uint32_t t_expected_timestamp =
- previous_timestamp + get_ssrc_block_length(previous_ssrc);
- int32_t diff = timestamp - t_expected_timestamp;
- if (diff != 0) {
- debug(2, "reading block %u, the timestamp %u differs from expected_timestamp %u.",
- seq_no, timestamp, t_expected_timestamp);
- }
- }
- new_audio_block_needed = 0; // block has been read.
- }
- }
-
- if (nread == 0) {
- // nread is 0 -- the port has been closed
- debug(1, "buffered audio port closed!");
- finished = 1;
- } else if (nread < 0) {
- char errorstring[1024];
- strerror_r(errno, (char *)errorstring, sizeof(errorstring));
- debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data_len .",
- errno, errorstring);
- finished = 1;
- }
- }
-
- if (finished == 0) {
- pthread_cleanup_debug_mutex_lock(&conn->flush_mutex, 25000,
- 1); // 25 ms is a long time to wait!
- if (blocks_read != 0) {
- if (conn->ap2_immediate_flush_requested != 0) {
- if (ap2_immediate_flush_requested == 0) {
- debug(2, "immediate flush started at sequence number %u until sequence number of %u.",
- seq_no, conn->ap2_immediate_flush_until_sequence_number);
- }
- if ((blocks_read != 0) && (seq_no == conn->ap2_immediate_flush_until_sequence_number)) {
- debug(2, "immediate flush complete at seq_no of %u.", seq_no);
-
- conn->ap2_immediate_flush_requested = 0;
- ap2_immediate_flush_requested = 0;
-
- /*
- // turn off all deferred requests. Not sure if this is right...
- unsigned int f = 0;
- for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) {
- conn->ap2_deferred_flush_requests[f].inUse = 0;
- conn->ap2_deferred_flush_requests[f].active = 0;
- }
- */
-
- } else {
- debug(3, "immediate flush of block %u until block %u", seq_no,
- conn->ap2_immediate_flush_until_sequence_number);
- ap2_immediate_flush_requested = 1;
- new_audio_block_needed = 1; //
- }
- }
- }
-
- // now, even if an immediate flush has been requested and is active, we still need to process
- // deferred flush requests as they may refer to sequences that are going to be purged anyway
-
- unsigned int f = 0;
- for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) {
- if (conn->ap2_deferred_flush_requests[f].inUse != 0) {
- if ((conn->ap2_deferred_flush_requests[f].flushFromSeq == seq_no) &&
- (conn->ap2_deferred_flush_requests[f].flushUntilSeq != seq_no)) {
- debug(2,
- "deferred flush activated: flushFromTS: %12u, flushFromSeq: %12u, "
- "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
- conn->ap2_deferred_flush_requests[f].flushFromTS,
- conn->ap2_deferred_flush_requests[f].flushFromSeq,
- conn->ap2_deferred_flush_requests[f].flushUntilTS,
- conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
- conn->ap2_deferred_flush_requests[f].active = 1;
- new_audio_block_needed = 1;
- }
- if (conn->ap2_deferred_flush_requests[f].flushUntilSeq == seq_no) {
- debug(2,
- "deferred flush terminated: flushFromTS: %12u, flushFromSeq: %12u, "
- "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
- conn->ap2_deferred_flush_requests[f].flushFromTS,
- conn->ap2_deferred_flush_requests[f].flushFromSeq,
- conn->ap2_deferred_flush_requests[f].flushUntilTS,
- conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
- conn->ap2_deferred_flush_requests[f].active = 0;
- conn->ap2_deferred_flush_requests[f].inUse = 0;
- } else if (a_minus_b_mod23(seq_no, conn->ap2_deferred_flush_requests[f].flushUntilSeq) >
- 0) {
- // now, do a modulo 2^23 unsigned int calculation to see if we may have overshot the
- // flushUntilSeq
- debug(2,
- "deferred flush terminated due to overshoot at block %u: flushFromTS: %12u, "
- "flushFromSeq: %12u, "
- "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
- seq_no, conn->ap2_deferred_flush_requests[f].flushFromTS,
- conn->ap2_deferred_flush_requests[f].flushFromSeq,
- conn->ap2_deferred_flush_requests[f].flushUntilTS,
- conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
- conn->ap2_deferred_flush_requests[f].active = 0;
- conn->ap2_deferred_flush_requests[f].inUse = 0;
- debug(2, "immediate flush was %s.", ap2_immediate_flush_requested == 0 ? "off" : "on");
- } else if (conn->ap2_deferred_flush_requests[f].active != 0) {
- new_audio_block_needed = 1;
- debug(3,
- "deferred flush of block: %u. flushFromTS: %12u, flushFromSeq: %12u, "
- "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
- seq_no, conn->ap2_deferred_flush_requests[f].flushFromTS,
- conn->ap2_deferred_flush_requests[f].flushFromSeq,
- conn->ap2_deferred_flush_requests[f].flushUntilTS,
- conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
- }
- }
- }
- pthread_cleanup_pop(1); // the mutex
-
- // now, if the block is not invalidated by the flush code, see if we need
- // to decode it and pass it to the player
- if (new_audio_block_needed == 0) {
- // is there space in the player thread's buffer system?
- unsigned int player_buffer_size, player_buffer_occupancy;
- get_audio_buffer_size_and_occupancy(&player_buffer_size, &player_buffer_occupancy, conn);
- // debug(1,"player buffer size and occupancy: %u and %u", player_buffer_size,
- // player_buffer_occupancy);
-
- // If we are playing and there is room in the player buffer, go ahead and decode the block
- // and send it to the player. Otherwise, keep the block and sleep for a while.
- if ((play_enabled != 0) &&
- (player_buffer_occupancy <= 2 * ((config.audio_backend_buffer_desired_length) *
- conn->input_rate / conn->frames_per_packet))) {
- uint64_t buffer_should_be_time;
- frame_to_local_time(timestamp, &buffer_should_be_time, conn);
-
- // try to identify blocks that are timed to before the last buffer, and drop 'em
- int64_t time_from_last_buffer_time =
- buffer_should_be_time - previous_buffer_should_be_time;
-
- if ((packets_played_in_this_sequence == 0) || (time_from_last_buffer_time > 0)) {
- int64_t lead_time = buffer_should_be_time - get_absolute_time_in_ns();
- payload_length = 0;
- if (ssrc_is_recognised(payload_ssrc) != 0) {
- prepare_decoding_chain(conn, payload_ssrc);
- unsigned long long new_payload_length = 0;
- payload_pointer = m + leading_free_space_length;
- if ((lead_time < (int64_t)30000000000L) &&
- (lead_time >= 0)) { // only decipher the packet if it's not too late or too early
- int response = -1; // guess that there is a problem
- if (conn->session_key != NULL) {
- unsigned char nonce[12];
- memset(nonce, 0, sizeof(nonce));
- memcpy(
- nonce + 4, packet + nread - 8,
- 8); // front-pad the 8-byte nonce received to get the 12-byte nonce expected
-
- // https://libsodium.gitbook.io/doc/secret-key_cryptography/aead/chacha20-poly1305/ietf_chacha20-poly1305_construction
- // Note: the eight-byte nonce must be front-padded out to 12 bytes.
-
- // Leave leading_free_space_length bytes at the start for possible headers like an
- // ADTS header (7 bytes)
- memset(m, 0, leading_free_space_length);
- response = crypto_aead_chacha20poly1305_ietf_decrypt(
- payload_pointer, // where the decrypted payload will start
- &new_payload_length, // mlen_p
- NULL, // nsec,
- packet +
- 12, // the ciphertext starts 12 bytes in and is followed by the MAC tag,
- nread - (8 + 12), // clen -- the last 8 bytes are the nonce
- packet + 4, // authenticated additional data
- 8, // authenticated additional data length
- nonce,
- conn->session_key); // *k
- if (response != 0)
- debug(1, "Error decrypting audio packet %u -- packet length %d.", seq_no,
- nread);
- } else {
- debug(2, "No session key, so the audio packet can not be deciphered -- skipped.");
- }
-
- if ((response == 0) && (new_payload_length > 0)) {
- // now we have the deciphered block, so send it to the player if we can
- payload_length = new_payload_length;
-
- if (ssrc_is_aac(payload_ssrc)) {
- payload_pointer =
- payload_pointer - 7; // including the 7-byte leader for the ADTS
- payload_length = payload_length + 7;
-
- // now, fill in the 7-byte ADTS information, which seems to be needed by the
- // decoder we made room for it in the front of the buffer by filling from m + 7.
- int channelConfiguration = 2; // 2: 2 channels: front-left, front-right
- if (payload_ssrc == AAC_48000_F24_5P1)
- channelConfiguration = 6; // 6: 6 channels: front-center, front-left,
- // front-right, back-left, back-right, LFE-channel
- else if (payload_ssrc == AAC_48000_F24_7P1)
- channelConfiguration =
- 7; // 7: 8 channels: front-center, front-left, front-right,
- // side-left, side-right, back-left, back-right, LFE-channel
- addADTStoPacket(payload_pointer, payload_length, conn->input_rate,
- channelConfiguration);
- }
- int mute =
- ((packets_played_in_this_sequence == 0) && (ssrc_is_aac(payload_ssrc)));
- if (mute) {
- debug(2, "Connection %d: muting first AAC block -- block %u -- timestamp %u.",
- conn->connection_number, seq_no, timestamp);
- }
- int32_t timestamp_difference = 0;
- if (packets_played_in_this_sequence == 0) {
- // first_block_in_this_sequence = seq_no;
- first_timestamp_in_this_sequence = timestamp;
- debug(2,
- "Connection %d: "
- "first block %u, first timestamp %u.",
- conn->connection_number, seq_no, timestamp);
- } else {
- timestamp_difference = timestamp - expected_timestamp;
- if (timestamp_difference != 0) {
- debug(2,
- "Connection %d: "
- "unexpected timestamp in block %u. Actual: %u, expected: %u "
- "difference: %d, "
- "%f ms. "
- "Positive means later, i.e. a gap. First timestamp was %u, payload "
- "type: \"%s\".",
- conn->connection_number, seq_no, timestamp, expected_timestamp,
- timestamp_difference, 1000.0 * timestamp_difference / conn->input_rate,
- first_timestamp_in_this_sequence, get_ssrc_name(payload_ssrc));
- // mute the first packet after a discontinuity
- if (ssrc_is_aac(payload_ssrc)) {
- debug(2,
- "Connection %d: muting first AAC block -- block %u -- following a "
- "timestamp discontinuity, timestamp %u.",
- conn->connection_number, seq_no, timestamp);
- mute = 1;
- }
- }
- }
- int skip_this_block = 0;
- if (timestamp_difference < 0) {
- int32_t abs_timestamp_difference = -timestamp_difference;
- if ((size_t)abs_timestamp_difference > get_ssrc_block_length(payload_ssrc)) {
- skip_this_block = 1;
- debug(2,
- "skipping block %u because it was too far in the past. Timestamp "
- "difference: %d, length of block: %u.",
- seq_no, timestamp_difference, get_ssrc_block_length(payload_ssrc));
- }
- }
- if (skip_this_block == 0) {
- uint32_t packet_size = player_put_packet(
- payload_ssrc, sequence_number_for_player, timestamp, payload_pointer,
- payload_length, mute, timestamp_difference, conn);
- debug(3, "block %u, timestamp %u, length %u sent to the player.", seq_no,
- timestamp, packet_size);
- sequence_number_for_player++; // simply increment
- expected_timestamp = timestamp + packet_size; // for the next time
- packets_played_in_this_sequence++;
- }
- }
- } else {
- debug(3,
- "skipped deciphering block %u with timestamp %u because its lead time is "
- "out of range at %f "
- "seconds.",
- seq_no, timestamp, lead_time * 1.0E-9);
- uint32_t currentAnchorRTP = 0;
- uint64_t currentAnchorLocalTime = 0;
- if (get_ptp_anchor_local_time_info(conn, ¤tAnchorRTP,
- ¤tAnchorLocalTime) == clock_ok) {
- debug(3, "anchorRTP: %u, anchorLocalTime: % " PRIu64 ".", currentAnchorRTP,
- currentAnchorLocalTime);
- } else {
- debug(3, "Clock not okay");
- }
- }
- } else {
- debug(2, "Unrecognised or invalid ssrc: %s.", get_ssrc_name(payload_ssrc));
- }
- } else {
- debug(1, "dropping buffer that should have played before the last actually played.");
- }
- new_audio_block_needed = 1; // the block has been used up and is no longer current
- } else {
- usleep(20000); // wait for a while
- }
- }
- }
- } while (finished == 0);
- debug(2, "Buffered Audio Receiver RTP thread \"normal\" exit.");
- pthread_cleanup_pop(1); // buffered_tcp_reader thread creation
- pthread_cleanup_pop(1); // buffer malloc
- pthread_cleanup_pop(1); // not_full_cv
- pthread_cleanup_pop(1); // not_empty_cv
- pthread_cleanup_pop(1); // mutex
- pthread_cleanup_pop(1); // descriptor malloc
- pthread_cleanup_pop(1); // pthread_t malloc
- pthread_cleanup_pop(1); // do the cleanup.
- pthread_exit(NULL);
-}
-
int frame_to_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) {
if (conn->timing_type == ts_ptp)
return frame_to_ptp_local_time(timestamp, time, conn);
int local_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn);
#ifdef CONFIG_AIRPLAY_2
+int have_ptp_timing_information(rtsp_conn_info *conn);
+int get_ptp_anchor_local_time_info(rtsp_conn_info *conn, uint32_t *anchorRTP,
+ uint64_t *anchorLocalTime);
void *rtp_ap2_control_receiver(void *arg);
void *rtp_realtime_audio_receiver(void *arg);
-void *rtp_buffered_audio_processor(void *arg);
void *rtp_ap2_timing_receiver(void *arg);
void *rtp_ap2_general_message_timing_receiver(void *arg);
void set_ptp_anchor_info(rtsp_conn_info *conn, uint64_t clock_id, uint32_t rtptime,
#endif
#ifdef CONFIG_AIRPLAY_2
+#include "ap2_buffered_audio_processor.h"
#include "ap2_event_receiver.h"
#include "ap2_rc_event_receiver.h"
#include "pair_ap/pair.h"
--- /dev/null
+/*
+ * Buffered Read. This file is part of Shairport Sync
+ * Copyright (c) Mike Brady 2025
+
+ * Modifications, including those associated with audio synchronization, multithreading and
+ * metadata handling copyright (c) Mike Brady 2014--2025
+ * All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+ #include <sys/types.h>
+ #include "buffered_read.h"
+ #include "common.h"
+
+ssize_t buffered_read(buffered_tcp_desc *descriptor, void *buf, size_t count,
+ size_t *bytes_remaining) {
+ ssize_t response = -1;
+ if (debug_mutex_lock(&descriptor->mutex, 50000, 1) != 0)
+ debug(1, "problem with mutex");
+ pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
+ // wipe the slate dlean before reading...
+ descriptor->error_code = 0;
+ descriptor->closed = 0;
+
+ if (descriptor->buffer_occupancy == 0) {
+ debug(2, "buffered_read: buffer empty -- waiting for %u bytes.", count);
+ }
+
+ while ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0) &&
+ (descriptor->closed == 0)) {
+ if (pthread_cond_wait(&descriptor->not_empty_cv, &descriptor->mutex))
+ debug(1, "Error waiting for buffered read");
+ else
+ debug(2, "buffered_read: signalled with %u bytes after waiting.",
+ descriptor->buffer_occupancy);
+ }
+
+ if (descriptor->error_code) {
+ errno = descriptor->error_code;
+ debug(1, "buffered_read: error %d.", errno);
+ response = -1;
+ } else if (descriptor->closed != 0) {
+ debug(1, "buffered_read: connection closed.");
+ errno = 0; // no error -- just closed
+ response = 0;
+ } else if (descriptor->buffer_occupancy != 0) {
+ ssize_t bytes_to_move = count;
+
+ if (descriptor->buffer_occupancy < count) {
+ bytes_to_move = descriptor->buffer_occupancy;
+ }
+
+ ssize_t top_gap = descriptor->buffer + descriptor->buffer_max_size - descriptor->toq;
+ if (top_gap < bytes_to_move)
+ bytes_to_move = top_gap;
+
+ memcpy(buf, descriptor->toq, bytes_to_move);
+ descriptor->toq += bytes_to_move;
+ if (descriptor->toq == descriptor->buffer + descriptor->buffer_max_size)
+ descriptor->toq = descriptor->buffer;
+ descriptor->buffer_occupancy -= bytes_to_move;
+ if (bytes_remaining != NULL)
+ *bytes_remaining = descriptor->buffer_occupancy;
+ response = bytes_to_move;
+ if (pthread_cond_signal(&descriptor->not_full_cv))
+ debug(1, "Error signalling");
+ }
+
+ pthread_cleanup_pop(1); // release the mutex
+ return response;
+}
+
+#define STANDARD_PACKET_SIZE 4096
+
+void buffered_tcp_reader_cleanup_handler(__attribute__((unused)) void *arg) {
+ debug(2, "Buffered TCP Reader Thread Exit via Cleanup.");
+}
+
+void *buffered_tcp_reader(void *arg) {
+ // #include <syscall.h>
+ // debug(1, "buffered_tcp_reader PID %d", syscall(SYS_gettid));
+ pthread_cleanup_push(buffered_tcp_reader_cleanup_handler, NULL);
+ buffered_tcp_desc *descriptor = (buffered_tcp_desc *)arg;
+
+ // listen(descriptor->sock_fd, 5); // this is done in the handle_setup_2 code to ensure it's open
+ // when the client hears about it...
+ ssize_t nread;
+ SOCKADDR remote_addr;
+ memset(&remote_addr, 0, sizeof(remote_addr));
+ socklen_t addr_size = sizeof(remote_addr);
+ int finished = 0;
+ int fd = accept(descriptor->sock_fd, (struct sockaddr *)&remote_addr, &addr_size);
+ // debug(1, "buffered_tcp_reader: the client has opened a buffered audio link.");
+ intptr_t pfd = fd;
+ pthread_cleanup_push(socket_cleanup, (void *)pfd);
+
+ do {
+ int have_time_to_sleep = 0;
+ if (debug_mutex_lock(&descriptor->mutex, 500000, 1) != 0)
+ debug(1, "problem with mutex");
+ pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
+ while (descriptor->buffer_occupancy == descriptor->buffer_max_size) {
+ if (pthread_cond_wait(&descriptor->not_full_cv, &descriptor->mutex))
+ debug(1, "Error waiting for not_full_cv");
+ }
+ pthread_cleanup_pop(1); // release the mutex
+
+ // now we know it is not full, so go ahead and try to read some more into it
+
+ // wrap
+ if ((size_t)(descriptor->eoq - descriptor->buffer) == descriptor->buffer_max_size)
+ descriptor->eoq = descriptor->buffer;
+
+ // figure out how much to ask for
+ size_t bytes_to_request = STANDARD_PACKET_SIZE;
+ size_t free_space = descriptor->buffer_max_size - descriptor->buffer_occupancy;
+ if (bytes_to_request > free_space)
+ bytes_to_request = free_space; // don't ask for more than will fit
+
+ size_t gap_to_end_of_buffer =
+ descriptor->buffer + descriptor->buffer_max_size - descriptor->eoq;
+ if (gap_to_end_of_buffer < bytes_to_request)
+ bytes_to_request =
+ gap_to_end_of_buffer; // only ask for what will fill to the top of the buffer
+
+ // do the read
+ if (descriptor->buffer_occupancy == 0)
+ debug(2, "recv of up to %d bytes with an buffer empty.", bytes_to_request);
+ nread = recv(fd, descriptor->eoq, bytes_to_request, 0);
+ // debug(1, "Received %d bytes for a buffer size of %d bytes.",nread,
+ // descriptor->buffer_occupancy + nread);
+ if (debug_mutex_lock(&descriptor->mutex, 50000, 1) != 0)
+ debug(1, "problem with not empty mutex");
+ pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
+ if (nread < 0) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "error in buffered_tcp_reader %d: \"%s\". Could not recv a packet.", errno,
+ errorstring);
+ descriptor->error_code = errno;
+ } else if (nread == 0) {
+ descriptor->closed = 1;
+ debug(
+ 1,
+ "buffered audio port closed by remote end. Terminating the buffered_tcp_reader thread.");
+ finished = 1;
+ } else if (nread > 0) {
+ descriptor->eoq += nread;
+ descriptor->buffer_occupancy += nread;
+ }
+
+ // signal if we got data or an error or the file closed
+ if (pthread_cond_signal(&descriptor->not_empty_cv))
+ debug(1, "Error signalling");
+ if (descriptor->buffer_occupancy > 16384)
+ have_time_to_sleep = 1;
+ pthread_cleanup_pop(1); // release the mutex
+ if (have_time_to_sleep)
+ usleep(10000); // give other threads a chance to run...
+ } while (finished == 0);
+
+ debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit Begin.");
+ pthread_cleanup_pop(1); // close the socket
+ pthread_cleanup_pop(1); // cleanup
+ debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit.");
+ pthread_exit(NULL);
+}
+
+// this will read a block of the size specified to the buffer
+// and will return either with the block or on error
+ssize_t lread_sized_block(buffered_tcp_desc *descriptor, void *buf, size_t count,
+ size_t *bytes_remaining) {
+ ssize_t response, nread;
+ size_t inbuf = 0; // bytes already in the buffer
+ int keep_trying = 1;
+
+ do {
+ nread = buffered_read(descriptor, buf + inbuf, count - inbuf, bytes_remaining);
+ if (nread == 0) {
+ // a blocking read that returns zero means eof -- implies connection closed
+ debug(1, "read_sized_block connection closed.");
+ keep_trying = 0;
+ } else if (nread < 0) {
+ if (errno == EAGAIN) {
+ debug(1, "read_sized_block getting Error 11 -- EAGAIN from a blocking read!");
+ }
+ if ((errno != EAGAIN) && (errno != EINTR)) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "read_sized_block read error %d: \"%s\".", errno, (char *)errorstring);
+ keep_trying = 0;
+ }
+ } else {
+ inbuf += (size_t)nread;
+ }
+ } while ((keep_trying != 0) && (inbuf < count));
+ if (nread <= 0)
+ response = nread;
+ else
+ response = inbuf;
+ return response;
+}
--- /dev/null
+#ifndef _BUFFERED_READ_H
+#define _BUFFERED_READ_H
+
+typedef struct {
+ int closed;
+ int error_code;
+ int sock_fd;
+ char *buffer;
+ char *toq;
+ char *eoq;
+ size_t buffer_max_size;
+ size_t buffer_occupancy;
+ pthread_mutex_t mutex;
+ pthread_cond_t not_empty_cv;
+ pthread_cond_t not_full_cv;
+} buffered_tcp_desc;
+
+void *buffered_tcp_reader(void *arg);
+ssize_t lread_sized_block(buffered_tcp_desc *descriptor, void *buf, size_t count,
+ size_t *bytes_remaining);
+
+#endif // _BUFFERED_READ_H
--- /dev/null
+/*
+ * 23-bit modular arithmetic. This file is part of Shairport Sync
+ * Copyright (c) Mike Brady 2025
+
+ * Modifications, including those associated with audio synchronization, multithreading and
+ * metadata handling copyright (c) Mike Brady 2014--2025
+ * All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+ #include <stdint.h>
+ #include "mod23.h"
+
+#define MOD_23BIT 0x7FFFFF // 2^23 - 1
+
+// Assumes 'a' and 'b' are within 2^22 of each other
+int32_t a_minus_b_mod23(uint32_t a, uint32_t b) {
+
+ // Mask to 23 bits
+ a &= MOD_23BIT;
+ b &= MOD_23BIT;
+
+ // Compute difference modulo 2^23
+ uint32_t diff = (a - b) & MOD_23BIT;
+
+ // Interpret as signed 23-bit value
+ // If the top bit (bit 22) is set, it's negative
+ int32_t signed_diff = (diff & 0x400000) ? (diff | 0xFF800000) : diff;
+
+ return signed_diff;
+}
--- /dev/null
+#ifndef _MOD23_H
+#define _MOD23_H
+
+int32_t a_minus_b_mod23(uint32_t a, uint32_t b);
+
+#endif // _MOD23_H