#include "FFTConvolver/convolver.h"
#endif
-
struct Nvll {
char *name;
double value;
while ((gradients) && (strcasecmp((const char *)&conn->client_ip_string, gradients->name) != 0))
gradients = gradients->next;
- // if gradients comes out of this non-null, it is pointing to the DACP and it's last-known
+ // if gradients comes out of this non-null, it is pointing to the DACP and its last-known
// gradient
if (gradients) {
gradients->value = conn->local_to_remote_time_gradient;
while ((gradients) && (strcasecmp((const char *)&conn->client_ip_string, gradients->name) != 0))
gradients = gradients->next;
- // if gradients comes out of this non-null, it is pointing to the IP and it's last-known gradient
+ // if gradients comes out of this non-null, it is pointing to the IP and its last-known gradient
if (gradients) {
conn->local_to_remote_time_gradient = gradients->value;
// debug(1,"Using a stored drift of %.2f ppm for \"%s\".", (conn->local_to_remote_time_gradient
conn->anchor_remote_info_is_valid = 1;
// these can be modified if the master clock changes over time
+
conn->anchor_rtptime = rtptime;
conn->anchor_time = networktime;
conn->anchor_clock = clock_id;
+ debug(2, "set_ptp_anchor_info done.");
}
+
int long_time_notifcation_done = 0;
uint64_t previous_offset = 0;
*time = ltime;
result = 0;
} else {
- debug(3, "frame_to_ptp_local_time can't get anchor local time information");
+ debug(2, "frame_to_ptp_local_time can't get anchor local time information");
}
return result;
}
*frame = lframe;
result = 0;
} else {
- debug(3, "local_ptp_time_to_frame can't get anchor local time information");
+ debug(2, "local_ptp_time_to_frame can't get anchor local time information");
}
return result;
}
debug(3, "Connection %d: Packet Received on Event Port with contents: \"%s\".",
conn->connection_number, packet);
} else {
- debug(2, "Connection %d: Event Port connection closed by client",
+ debug(1, "Connection %d: Event Port connection closed by client",
conn->connection_number);
finished = 1;
}
if (debug_mutex_lock(&descriptor->mutex, 50000, 1) != 0)
debug(1, "problem with mutex");
pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
- if (descriptor->closed == 0) {
- if ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0)) {
- debug(2, "buffered_read: waiting for %u bytes.", count);
- }
- while ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0)) {
- if (pthread_cond_wait(&descriptor->not_empty_cv, &descriptor->mutex))
- debug(1, "Error waiting for buffered read");
- else
- debug(2, "buffered_read: signalled with %u bytes after waiting.",
- descriptor->buffer_occupancy);
- }
+ // wipe the slate dlean before reading...
+ descriptor->error_code = 0;
+ descriptor->closed = 0;
+
+ if (descriptor->buffer_occupancy == 0) {
+ debug(2, "buffered_read: buffer empty -- waiting for %u bytes.", count);
}
- if (descriptor->buffer_occupancy != 0) {
+
+ while ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0) &&
+ (descriptor->closed == 0)) {
+ if (pthread_cond_wait(&descriptor->not_empty_cv, &descriptor->mutex))
+ debug(1, "Error waiting for buffered read");
+ else
+ debug(2, "buffered_read: signalled with %u bytes after waiting.",
+ descriptor->buffer_occupancy);
+ }
+
+ if (descriptor->error_code) {
+ errno = descriptor->error_code;
+ debug(1, "buffered_read: error %d.", errno);
+ response = -1;
+ } else if (descriptor->closed != 0) {
+ debug(1, "buffered_read: connection closed.");
+ errno = 0; // no error -- just closed
+ response = 0;
+ } else if (descriptor->buffer_occupancy != 0) {
ssize_t bytes_to_move = count;
if (descriptor->buffer_occupancy < count) {
response = bytes_to_move;
if (pthread_cond_signal(&descriptor->not_full_cv))
debug(1, "Error signalling");
- } else if (descriptor->error_code) {
- errno = descriptor->error_code;
- response = -1;
- } else if (descriptor->closed != 0) {
- response = 0;
}
pthread_cleanup_pop(1); // release the mutex
if (debug_mutex_lock(&descriptor->mutex, 500000, 1) != 0)
debug(1, "problem with mutex");
pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
- while ((descriptor->buffer_occupancy == descriptor->buffer_max_size) ||
- (descriptor->error_code != 0) || (descriptor->closed != 0)) {
+ while (descriptor->buffer_occupancy == descriptor->buffer_max_size) {
if (pthread_cond_wait(&descriptor->not_full_cv, &descriptor->mutex))
- debug(1, "Error waiting for buffered read");
+ debug(1, "Error waiting for not_full_cv");
}
pthread_cleanup_pop(1); // release the mutex
gap_to_end_of_buffer; // only ask for what will fill to the top of the buffer
// do the read
- // debug(1, "Request buffered read of up to %d bytes.", bytes_to_request);
+ if (descriptor->buffer_occupancy == 0)
+ debug(2, "recv of up to %d bytes with an buffer empty.", bytes_to_request);
nread = recv(fd, descriptor->eoq, bytes_to_request, 0);
// debug(1, "Received %d bytes for a buffer size of %d bytes.",nread,
// descriptor->buffer_occupancy + nread);
descriptor->error_code = errno;
} else if (nread == 0) {
descriptor->closed = 1;
+ debug(1, "buffered audio port closed. Terminating the buffered_tcp_reader thread.");
+ finished = 1;
} else if (nread > 0) {
descriptor->eoq += nread;
descriptor->buffer_occupancy += nread;
- } else {
- debug(1, "buffered audio port closed!");
}
+
// signal if we got data or an error or the file closed
if (pthread_cond_signal(&descriptor->not_empty_cv))
debug(1, "Error signalling");
usleep(10000); // give other threads a chance to run...
} while (finished == 0);
- debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit Begin.");
+ debug(3, "Buffered TCP Reader Thread Exit \"Normal\" Exit Begin.");
pthread_cleanup_pop(1); // close the socket
pthread_cleanup_pop(1); // cleanup
- debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit -- Shouldn't happen!.");
+ debug(2, "Buffered TCP Reader Thread Exit \"Normal\" Exit.");
pthread_exit(NULL);
}
nread = buffered_read(descriptor, buf + inbuf, count - inbuf, bytes_remaining);
if (nread == 0) {
// a blocking read that returns zero means eof -- implies connection closed
- debug(3, "read_sized_block connection closed.");
+ debug(1, "read_sized_block connection closed.");
keep_trying = 0;
} else if (nread < 0) {
if (errno == EAGAIN) {
return response;
}
-// not used right now, but potentially useful for understanding flush requests
-void display_flush_requests(int activeOnly, uint32_t currentSeq, uint32_t currentTS,
- rtsp_conn_info *conn) {
- if (conn->flush_requests == NULL) {
- if (activeOnly == 0)
- debug(1, "No flush requests.");
- } else {
- flush_request_t *t = conn->flush_requests;
- do {
- if (t->flushNow) {
- debug(1, "immediate flush to untilSeq: %u, untilTS: %u.", t->flushUntilSeq,
- t->flushUntilTS);
- } else {
- if (activeOnly == 0)
- debug(1, "fromSeq: %u, fromTS: %u, to untilSeq: %u, untilTS: %u.", t->flushFromSeq,
- t->flushFromTS, t->flushUntilSeq, t->flushUntilTS);
- else if ((activeOnly == 1) &&
- (currentSeq >=
- (t->flushFromSeq -
- 1))) // the -1 is because you might have to trim the end of the previous block
- debug(1,
- "fromSeq: %u, fromTS: %u, to untilSeq: %u, untilTS: %u, with currentSeq: %u, "
- "currentTS: %u.",
- t->flushFromSeq, t->flushFromTS, t->flushUntilSeq, t->flushUntilTS, currentSeq,
- currentTS);
- }
- t = t->next;
- } while (t != NULL);
- }
-}
-
-// From
// https://stackoverflow.com/questions/18862715/how-to-generate-the-aac-adts-elementary-stream-with-android-mediacodec
// with thanks!
debug(2, "Buffered Audio Receiver Cleanup Start.");
rtsp_conn_info *conn = (rtsp_conn_info *)arg;
close(conn->buffered_audio_socket);
- debug(2, "Connection %d: TCP Buffered Audio port closed: %u.", conn->connection_number,
+ debug(1, "Connection %d: closing TCP Buffered Audio port: %u.", conn->connection_number,
conn->local_buffered_audio_port);
conn->buffered_audio_socket = 0;
debug(2, "Connection %d: Buffered Audio Receiver Cleanup Done.", conn->connection_number);
}
+#define MOD_23BIT 0x7FFFFF // 2^23 - 1
+
+// Assumes 'a' and 'b' are within 2^22 of each other
+int32_t a_minus_b_mod23(uint32_t a, uint32_t b) {
+
+ // Mask to 23 bits
+ a &= MOD_23BIT;
+ b &= MOD_23BIT;
+
+ // Compute difference modulo 2^23
+ uint32_t diff = (a - b) & MOD_23BIT;
+
+ // Interpret as signed 23-bit value
+ // If the top bit (bit 22) is set, it's negative
+ int32_t signed_diff = (diff & 0x400000) ? (diff | 0xFF800000) : diff;
+
+ return signed_diff;
+}
+
void *rtp_buffered_audio_processor(void *arg) {
// #include <syscall.h>
// debug(1, "rtp_buffered_audio_processor PID %d", syscall(SYS_gettid));
unsigned long long payload_length = 0;
uint32_t payload_ssrc =
SSRC_NONE; // this is the SSRC of the payload, needed to decide if it should be muted
+ uint32_t previous_ssrc = SSRC_NONE;
uint32_t seq_no =
0; // audio packet number. Initialised to avoid a "possibly uninitialised" warning.
- int seqno_valid = 0;
uint32_t previous_seqno = 0;
- int previous_seqno_valid = 0;
+ uint16_t sequence_number_for_player = 0;
+
+ uint32_t timestamp = 0; // initialised to avoid a "possibly uninitialised" warning.
+ uint32_t previous_timestamp = 0;
+
+ uint32_t expected_timestamp = 0;
+ uint64_t previous_buffer_should_be_time = 0;
- int new_buffer_needed = 0;
ssize_t nread;
+ int new_audio_block_needed = 0; // goes true when a block is needed, false one is read in, but
+ // will be made true by flushing or by playing the block
int finished = 0;
+ uint64_t blocks_read_since_play_began = 0;
uint64_t blocks_read = 0;
- // not used...
- // uint64_t blocks_read_in_sequence =
- // 0; // since the start of this sequence -- reset by start or flush
- int flush_requested = 0;
- uint32_t timestamp = 0; // initialised to avoid a "possibly uninitialised" warning.
- int timestamp_valid = 0;
- uint32_t previous_timestamp = 0;
- int previous_timestamp_valid = 0;
-
- uint32_t expected_timestamp = 0;
- int packets_played_in_this_sequence = 0;
+ int ap2_immediate_flush_requested = 0; // for diagnostics, probably
- // uint32_t first_block_in_this_sequence = 0;
uint32_t first_timestamp_in_this_sequence = 0;
+ int packets_played_in_this_sequence = 0;
int play_enabled = 0;
- // uint32_t flush_from_timestamp = 0; // initialised to avoid a "possibly uninitialised" warning.
- double requested_lead_time = 0.0; // normal lead time minimum -- maybe it should be about 0.1
-
- uint32_t old_ssrc = 0; // diagnostic
+ // double requested_lead_time = 0.0; // normal lead time minimum -- maybe it should be about 0.1
// wait until our timing information is valid
-
- // debug(1,"rtp_buffered_audio_processor ready.");
-
while (have_ptp_timing_information(conn) == 0)
usleep(1000);
reset_buffer(conn); // in case there is any garbage in the player
do {
- uint16_t last_seqno_put = 0;
- int flush_is_delayed = 0;
- int flush_newly_requested = 0;
- int flush_newly_complete = 0;
- int play_newly_stopped = 0;
- // are we in in flush mode, or just about to leave it?
- pthread_cleanup_debug_mutex_lock(&conn->flush_mutex, 25000, 1); // 25 ms is a long time to wait!
+ if ((play_enabled == 0) && (conn->ap2_play_enabled != 0)) {
+ // play newly started
+ debug(2, "Play started.");
+ new_audio_block_needed = 1;
+ blocks_read_since_play_began = 0;
+ }
if ((play_enabled != 0) && (conn->ap2_play_enabled == 0)) {
- play_newly_stopped = 1;
debug(2, "Play stopped.");
- // blocks_read_in_sequence =
- // 0; // This may be set to 1 by a flush, so don't zero it during start.
- packets_played_in_this_sequence = 0;
- new_buffer_needed = 0;
+ packets_played_in_this_sequence = 0; // not all blocks read are played...
#ifdef CONFIG_CONVOLUTION
convolver_clear_state();
#endif
- }
-
- if ((play_enabled == 0) && (conn->ap2_play_enabled != 0)) {
- // play newly started
- debug(2, "Play started.");
- }
-
- play_enabled = conn->ap2_play_enabled;
-
- uint32_t flushUntilSeq = (conn->ap2_flush_until_sequence_number - 1) & 0x7fffff;
- uint32_t flushUntilTS = conn->ap2_flush_until_rtp_timestamp;
-
- int flush_request_active = 0;
- if (conn->ap2_flush_requested) {
- if (conn->ap2_flush_from_valid == 0) { // i.e. a flush from right now
- if (play_enabled)
- debug(2, "Connection %d: immediate flush activated while play_enabled is true.",
- conn->connection_number);
- flush_request_active = 1;
- flush_is_delayed = 0;
- } else {
- flush_is_delayed = 1;
- // flush_from_timestamp = conn->ap2_flush_from_rtp_timestamp;
- int32_t blocks_to_start_of_flush = conn->ap2_flush_from_sequence_number - seq_no;
- if (blocks_to_start_of_flush <= 0) {
- debug(3, "Connection %d: deferred flush activated.", conn->connection_number);
- if (play_enabled)
- debug(3, "Connection %d: deferred flush activated while play_enabled is true.",
- conn->connection_number);
- flush_request_active = 1;
- }
- }
- }
- // if we are in flush mode
- if (flush_request_active) {
- if (flush_requested == 0) {
- // here, a flush has been newly requested
-
- debug(3, "Connection %d: Flush requested.", conn->connection_number);
- if (conn->ap2_flush_from_valid) {
- debug(3, " fromTS: %u", conn->ap2_flush_from_rtp_timestamp);
- debug(3, " fromSeq: %u", conn->ap2_flush_from_sequence_number);
- debug(3, "--");
- }
- debug(3, " untilTS: %u", conn->ap2_flush_until_rtp_timestamp);
- debug(3, " untilSeq: %u", conn->ap2_flush_until_sequence_number);
- debug(3, "--");
- debug(3, " currentTS_Start: %u", timestamp);
- debug(3, " currentSeq: %u", seq_no);
-
- flush_newly_requested = 1;
- }
- // blocks_read to ensure seq_no is valid
- // (seq_no - flushUntilSeq) & 0x400000 -- if this is 0, seq_no is >= flushUntilSeq
- if ((blocks_read != 0) && (((seq_no - flushUntilSeq) & 0x400000) == 0)) {
- // we have reached or overshot the flushUntilSeq block
- if (seq_no == flushUntilSeq) { // if the flush ended as expected, just before
- // conn->ap2_flush_until_sequence_number
- debug(3,
- "Connection %d: flush request ended normally at %u/%u with "
- "ap2_flush_until_sequence_number: %u/%u, "
- "flushUntilTS: %u, incoming "
- "timestamp: %u",
- conn->connection_number, seq_no, seq_no & 0xffff,
- conn->ap2_flush_until_sequence_number,
- conn->ap2_flush_until_sequence_number & 0xffff, flushUntilTS, timestamp);
- } else {
- // sometimes, the block number jumps directly to the
- // conn->ap2_flush_until_sequence_number, skipping the preceding one (which is what is in
- // flushUntilSeq...)
- if (seq_no == conn->ap2_flush_until_sequence_number)
- debug(3,
- "Connection %d: flush request ended normally at %u/%u with "
- "ap2_flush_until_sequence_number: %u/%u, "
- "flushUntilTS: %u, incoming "
- "timestamp: %u",
- conn->connection_number, seq_no, seq_no & 0xffff,
- conn->ap2_flush_until_sequence_number,
- conn->ap2_flush_until_sequence_number & 0xffff, flushUntilTS, timestamp);
- else
- debug(1,
- "Connection %d: flush request ended with a discontinuity at %u/%u with "
- "ap2_flush_until_sequence_number: %u/%u, "
- "flushUntilTS: %u, incoming "
- "timestamp: %u",
- conn->connection_number, seq_no, seq_no & 0xffff,
- conn->ap2_flush_until_sequence_number,
- conn->ap2_flush_until_sequence_number & 0xffff, flushUntilTS, timestamp);
- new_buffer_needed = 0; // use this first block in the new sequence
- }
- conn->ap2_flush_requested = 0;
- flush_request_active = 0;
- flush_newly_requested = 0;
- }
- }
-
- if ((flush_requested) && (flush_request_active == 0)) {
- if (play_enabled)
- debug(3, "Connection %d: flush completed while play_enabled is true.",
- conn->connection_number);
- flush_newly_complete = 1;
- // blocks_read_in_sequence =
- // 1; // the last block always (?) becomes the first block after the flush
- }
- flush_requested = flush_request_active; // for next time...
-
- // debug_mutex_unlock(&conn->flush_mutex, 3);
- pthread_cleanup_pop(1); // the mutex
-
- // do this outside the flush mutex
- if (flush_newly_complete) {
- debug(3, "Connection %d: flush complete.", conn->connection_number);
- }
-
- if (play_newly_stopped != 0)
reset_buffer(conn); // stop play ASAP
-
- if (flush_newly_requested) {
- reset_buffer(
- conn); // stop play when an immediate flush starts or when a deferred flush is activated.
- if (flush_is_delayed == 0) {
- debug(3, "Connection %d: immediate buffered audio flush started.", conn->connection_number);
- packets_played_in_this_sequence = 0;
- } else {
- debug(1, "Connection %d: deferred buffered audio flush started.", conn->connection_number);
- packets_played_in_this_sequence = 0;
- }
- }
-
- // now, if a flush is not requested, see if we need to get a block
- if (flush_requested == 0) {
-
- // is there space in the player thread's buffer system?
- unsigned int player_buffer_size, player_buffer_occupancy;
- get_audio_buffer_size_and_occupancy(&player_buffer_size, &player_buffer_occupancy, conn);
- // debug(1,"player buffer size and occupancy: %u and %u", player_buffer_size,
- // player_buffer_occupancy);
-
- if ((play_enabled != 0) &&
- (player_buffer_occupancy <= 2 * ((config.audio_backend_buffer_desired_length) *
- conn->input_rate / conn->frames_per_packet)) &&
- (payload_pointer == NULL) &&
- (flush_requested == 0)) { // must be greater than the lead time
- new_buffer_needed = 1;
- } else {
- usleep(20000); // wait for a while
- }
}
- int64_t lead_time = 0;
-
- // so we need to read a block, as either a flush or a new buffer is needed...
- if ((flush_requested) || (new_buffer_needed)) {
+ play_enabled = conn->ap2_play_enabled;
- // start here to read (and later, decipher) a block.
+ // now, if get_next_block is non-zero, read a block. We may flush or use it
+ if (new_audio_block_needed != 0) {
// a block is preceded by its length in a uint16_t
uint16_t data_len;
// here we read from the buffer that our thread has been reading
nread = lread_sized_block(buffered_audio, &data_len, sizeof(data_len),
&bytes_remaining_in_buffer);
data_len = ntohs(data_len);
+
// diagnostic
if ((conn->ap2_audio_buffer_minimum_size < 0) ||
(bytes_remaining_in_buffer < (size_t)conn->ap2_audio_buffer_minimum_size))
conn->ap2_audio_buffer_minimum_size = bytes_remaining_in_buffer;
- // if (flush_requested)
- // debug(1, "read %u bytes for a flush of a block length of %u.", nread, data_len);
-
if (nread > 0) {
// get the block itself
// debug(1,"buffered audio packet of size %u detected.", data_len - 2);
(bytes_remaining_in_buffer < (size_t)conn->ap2_audio_buffer_minimum_size))
conn->ap2_audio_buffer_minimum_size = bytes_remaining_in_buffer;
// debug(1, "buffered audio packet of size %u received.", nread);
- if (nread > 0) {
+ if (nread > 0) {
// got the block
- blocks_read++; // note, this doesn't mean they are valid audio blocks
- // blocks_read_in_sequence++;
+ blocks_read++; // note, this doesn't mean they are valid audio blocks
+ blocks_read_since_play_began++; // 1 means previous seq_no and timestamps are invalid
// get the sequence number
// see https://en.wikipedia.org/wiki/Real-time_Transport_Protocol#Packet_header
// the Marker bit is always set, and it and the remaining 23 bits form the sequence number
- if (seqno_valid) {
- previous_seqno = seq_no;
- previous_seqno_valid = 1;
- }
-
+ previous_seqno = seq_no;
seq_no = nctohl(&packet[0]) & 0x7FFFFF;
- seqno_valid = 1;
- // if (flush_requested)
- // debug(1, "read %u bytes for a flush of block %u up to block %u.", nread, seq_no,
- // conn->ap2_flush_until_sequence_number);
+ previous_timestamp = timestamp;
+ timestamp = nctohl(&packet[4]);
- // int unexpected_seqno = 0;
+ previous_ssrc = payload_ssrc;
+ payload_ssrc = nctohl(&packet[8]);
+
+ if (blocks_read_since_play_began == 1) {
+ debug(2, "Preparing initial decoding chain for %s.", get_ssrc_name(payload_ssrc));
+ prepare_decoding_chain(conn, payload_ssrc);
+ sequence_number_for_player =
+ seq_no & 0xffff; // this is arbitrary -- the sequence_number_for_player numbers will
+ // be sequential irrespective of seq_no jumps...
+ }
+
+ if (blocks_read_since_play_began > 1) {
+
+ if (payload_ssrc != previous_ssrc) {
+ if (ssrc_is_recognised(payload_ssrc) == 0) {
+ debug(2, "Unrecognised SSRC: %u.", payload_ssrc);
+ } else {
+ debug(2,
+ "Reading a block: new encoding: %s, old encoding: %s. Preparing a new "
+ "decoding chain.",
+ get_ssrc_name(payload_ssrc), get_ssrc_name(previous_ssrc));
+ prepare_decoding_chain(conn, payload_ssrc);
+ }
+ }
- if (previous_seqno_valid) {
uint32_t t_expected_seqno = (previous_seqno + 1) & 0x7fffff;
if (t_expected_seqno != seq_no) {
- // unexpected_seqno = 1;
- if (flush_requested == 0)
- debug(1, "seq_no %u differs from expected_seq_no %u.", seq_no, t_expected_seqno);
+ debug(2,
+ "reading block %u, the sequence number differs from the expected sequence "
+ "number %u. The previous sequence number was %u",
+ seq_no, t_expected_seqno, previous_seqno);
+ }
+ uint32_t t_expected_timestamp =
+ previous_timestamp + get_ssrc_block_length(previous_ssrc);
+ int32_t diff = timestamp - t_expected_timestamp;
+ if (diff != 0) {
+ debug(2, "reading block %u, the timestamp %u differs from expected_timestamp %u.",
+ seq_no, timestamp, t_expected_timestamp);
}
}
+ new_audio_block_needed = 0; // block has been read.
+ }
+ }
- // timestamp
- if (timestamp_valid) {
- previous_timestamp_valid = 1;
- previous_timestamp = timestamp;
- }
-
- timestamp = nctohl(&packet[4]);
+ if (nread == 0) {
+ // nread is 0 -- the port has been closed
+ debug(1, "buffered audio port closed!");
+ finished = 1;
+ } else if (nread < 0) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data_len .",
+ errno, errorstring);
+ finished = 1;
+ }
+ }
- if (previous_timestamp_valid) {
- uint32_t t_expected_timestamp = previous_timestamp + conn->frames_per_packet;
- if (t_expected_timestamp != timestamp)
- debug(1, "timestamp %u differs from expected_timestamp %u.", timestamp,
- t_expected_timestamp);
+ if (finished == 0) {
+ pthread_cleanup_debug_mutex_lock(&conn->flush_mutex, 25000,
+ 1); // 25 ms is a long time to wait!
+ if (blocks_read != 0) {
+ if (conn->ap2_immediate_flush_requested != 0) {
+ if (ap2_immediate_flush_requested == 0) {
+ debug(2, "immediate flush started at sequence number %u until sequence number of %u.",
+ seq_no, conn->ap2_immediate_flush_until_sequence_number);
}
+ if ((blocks_read != 0) && (seq_no == conn->ap2_immediate_flush_until_sequence_number)) {
+ debug(2, "immediate flush complete at seq_no of %u.", seq_no);
- // debug(1,"seqno: %u, seqno16: %u, timestamp: %u.", seq_no, seq_no & 0xffff, timestamp);
-
- payload_ssrc = nctohl(&packet[8]);
+ conn->ap2_immediate_flush_requested = 0;
+ ap2_immediate_flush_requested = 0;
- if ((payload_ssrc != old_ssrc) && (payload_ssrc != SSRC_NONE) &&
- (old_ssrc != SSRC_NONE)) {
- if (ssrc_is_recognised(payload_ssrc) == 0)
- debug(1, "Unrecognised SSRC: %u.", payload_ssrc);
- else
- debug(3, "Reading a block: new encoding: %s, old encoding: %s.",
- get_ssrc_name(payload_ssrc), get_ssrc_name(old_ssrc));
- }
- old_ssrc = payload_ssrc;
-
- prepare_decoding_chain(conn, payload_ssrc);
-
- // change the (0) to (1) to process blocks with unrecognised SSRCs
- if ((1) && (ssrc_is_recognised(payload_ssrc) == 0)) {
- unsigned char nonce[12];
- memset(nonce, 0, sizeof(nonce));
- memcpy(nonce + 4, packet + nread - 8,
- 8); // front-pad the 8-byte nonce received to get the 12-byte nonce expected
- int response = crypto_aead_chacha20poly1305_ietf_decrypt(
- m, // m
- &payload_length, // mlen_p
- NULL, // nsec,
- packet + 12, // the ciphertext starts 12 bytes in and is followed by the MAC tag,
- nread - (8 + 12), // clen -- the last 8 bytes are the nonce
- packet + 4, // authenticated additional data
- 8, // authenticated additional data length
- nonce,
- conn->session_key); // *k
- if (response != 0) {
- debug(
- 1,
- "Can't decipher block %u with ssrc \"%s\". Byte length: %u bytes, timestamp: %u",
- seq_no, get_ssrc_name(payload_ssrc), payload_length, data_len);
- } else {
- if (payload_length == 0) {
- debug(2, "packet %u: unrecognised SSRC %u, and, when deciphered, has no content.",
- seq_no & 0xFFFF, payload_ssrc);
- } else {
- debug(1,
- "packet %u: unrecognised SSRC %u, packet length %d, deciphered length %lld. "
- "Raw contents and then deciphered contents follow:",
- seq_no & 0xFFFF, payload_ssrc, nread, payload_length);
- debug_print_buffer(1, packet, nread);
- debug_print_buffer(1, m, payload_length);
- }
+ /*
+ // turn off all deferred requests. Not sure if this is right...
+ unsigned int f = 0;
+ for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) {
+ conn->ap2_deferred_flush_requests[f].inUse = 0;
+ conn->ap2_deferred_flush_requests[f].active = 0;
}
+ */
+
+ } else {
+ debug(3, "immediate flush of block %u until block %u", seq_no,
+ conn->ap2_immediate_flush_until_sequence_number);
+ ap2_immediate_flush_requested = 1;
+ new_audio_block_needed = 1; //
}
+ }
+ }
+ // now, even if an immediate flush has been requested and is active, we still need to process
+ // deferred flush requests as they may refer to sequences that are going to be purged anyway
+
+ unsigned int f = 0;
+ for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) {
+ if (conn->ap2_deferred_flush_requests[f].inUse != 0) {
+ if ((conn->ap2_deferred_flush_requests[f].flushFromSeq == seq_no) &&
+ (conn->ap2_deferred_flush_requests[f].flushUntilSeq != seq_no)) {
+ debug(2,
+ "deferred flush activated: flushFromTS: %12u, flushFromSeq: %12u, "
+ "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
+ conn->ap2_deferred_flush_requests[f].flushFromTS,
+ conn->ap2_deferred_flush_requests[f].flushFromSeq,
+ conn->ap2_deferred_flush_requests[f].flushUntilTS,
+ conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
+ conn->ap2_deferred_flush_requests[f].active = 1;
+ new_audio_block_needed = 1;
+ }
+ if (conn->ap2_deferred_flush_requests[f].flushUntilSeq == seq_no) {
+ debug(2,
+ "deferred flush terminated: flushFromTS: %12u, flushFromSeq: %12u, "
+ "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
+ conn->ap2_deferred_flush_requests[f].flushFromTS,
+ conn->ap2_deferred_flush_requests[f].flushFromSeq,
+ conn->ap2_deferred_flush_requests[f].flushUntilTS,
+ conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
+ conn->ap2_deferred_flush_requests[f].active = 0;
+ conn->ap2_deferred_flush_requests[f].inUse = 0;
+ } else if (a_minus_b_mod23(seq_no, conn->ap2_deferred_flush_requests[f].flushUntilSeq) >
+ 0) {
+ // now, do a modulo 2^23 unsigned int calculation to see if we may have overshot the
+ // flushUntilSeq
+ debug(2,
+ "deferred flush terminated due to overshoot at block %u: flushFromTS: %12u, "
+ "flushFromSeq: %12u, "
+ "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
+ seq_no, conn->ap2_deferred_flush_requests[f].flushFromTS,
+ conn->ap2_deferred_flush_requests[f].flushFromSeq,
+ conn->ap2_deferred_flush_requests[f].flushUntilTS,
+ conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
+ conn->ap2_deferred_flush_requests[f].active = 0;
+ conn->ap2_deferred_flush_requests[f].inUse = 0;
+ debug(2, "immediate flush was %s.", ap2_immediate_flush_requested == 0 ? "off" : "on");
+ } else if (conn->ap2_deferred_flush_requests[f].active != 0) {
+ new_audio_block_needed = 1;
+ debug(3,
+ "deferred flush of block: %u. flushFromTS: %12u, flushFromSeq: %12u, "
+ "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.",
+ seq_no, conn->ap2_deferred_flush_requests[f].flushFromTS,
+ conn->ap2_deferred_flush_requests[f].flushFromSeq,
+ conn->ap2_deferred_flush_requests[f].flushUntilTS,
+ conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp);
+ }
+ }
+ }
+ pthread_cleanup_pop(1); // the mutex
+
+ // now, if the block is not invalidated by the flush code, see if we need
+ // to decode it and pass it to the player
+ if (new_audio_block_needed == 0) {
+ // is there space in the player thread's buffer system?
+ unsigned int player_buffer_size, player_buffer_occupancy;
+ get_audio_buffer_size_and_occupancy(&player_buffer_size, &player_buffer_occupancy, conn);
+ // debug(1,"player buffer size and occupancy: %u and %u", player_buffer_size,
+ // player_buffer_occupancy);
+
+ // If we are playing and there is room in the player buffer, go ahead and decode the block
+ // and send it to the player. Otherwise, keep the block and sleep for a while.
+ if ((play_enabled != 0) &&
+ (player_buffer_occupancy <= 2 * ((config.audio_backend_buffer_desired_length) *
+ conn->input_rate / conn->frames_per_packet))) {
uint64_t buffer_should_be_time;
frame_to_local_time(timestamp, &buffer_should_be_time, conn);
- lead_time = buffer_should_be_time - get_absolute_time_in_ns();
- payload_pointer = NULL;
- payload_length = 0;
-
- // decipher it only if it's needed, i.e. if it is not to be discarded
- // if a new buffer is needed, the block needs to be deciphered
+ // try to identify blocks that are timed to before the last buffer, and drop 'em
+ int64_t time_from_last_buffer_time =
+ buffer_should_be_time - previous_buffer_should_be_time;
- // if ((new_buffer_needed) || ((flush_requested != 0) && (unexpected_seqno != 0))) {
- if (new_buffer_needed) {
- // debug(1,"nbn seqno: %u, seqno16: %u, timestamp: %u.", seq_no, seq_no & 0xffff,
- // timestamp);
+ if ((packets_played_in_this_sequence == 0) || (time_from_last_buffer_time > 0)) {
+ int64_t lead_time = buffer_should_be_time - get_absolute_time_in_ns();
+ payload_length = 0;
if (ssrc_is_recognised(payload_ssrc) != 0) {
prepare_decoding_chain(conn, payload_ssrc);
unsigned long long new_payload_length = 0;
addADTStoPacket(payload_pointer, payload_length, conn->input_rate,
channelConfiguration);
}
- // debug(1, "creating seqno %u, seqno16 %u, with timestamp %u, leadtime %f.",
- // seq_no,
- // seq_no & 0xffff, timestamp, lead_time * 0.000000001);
+ int mute =
+ ((packets_played_in_this_sequence == 0) && (ssrc_is_aac(payload_ssrc)));
+ if (mute) {
+ debug(2, "Connection %d: muting first AAC block -- block %u -- timestamp %u.",
+ conn->connection_number, seq_no, timestamp);
+ }
+ int32_t timestamp_difference = 0;
+ if (packets_played_in_this_sequence == 0) {
+ // first_block_in_this_sequence = seq_no;
+ first_timestamp_in_this_sequence = timestamp;
+ debug(2,
+ "Connection %d: "
+ "first block %u, first timestamp %u.",
+ conn->connection_number, seq_no, timestamp);
+ } else {
+ timestamp_difference = timestamp - expected_timestamp;
+ if (timestamp_difference != 0) {
+ debug(2,
+ "Connection %d: "
+ "unexpected timestamp in block %u. Actual: %u, expected: %u "
+ "difference: %d, "
+ "%f ms. "
+ "Positive means later, i.e. a gap. First timestamp was %u, payload "
+ "type: \"%s\".",
+ conn->connection_number, seq_no, timestamp, expected_timestamp,
+ timestamp_difference, 1000.0 * timestamp_difference / conn->input_rate,
+ first_timestamp_in_this_sequence, get_ssrc_name(payload_ssrc));
+ // mute the first packet after a discontinuity
+ if (ssrc_is_aac(payload_ssrc)) {
+ debug(2,
+ "Connection %d: muting first AAC block -- block %u -- following a "
+ "timestamp discontinuity, timestamp %u.",
+ conn->connection_number, seq_no, timestamp);
+ mute = 1;
+ }
+ }
+ }
+ int skip_this_block = 0;
+ if (timestamp_difference < 0) {
+ int32_t abs_timestamp_difference = -timestamp_difference;
+ if ((size_t)abs_timestamp_difference > get_ssrc_block_length(payload_ssrc)) {
+ skip_this_block = 1;
+ debug(2,
+ "skipping block %u because it was too far in the past. Timestamp "
+ "difference: %d, length of block: %u.",
+ seq_no, timestamp_difference, get_ssrc_block_length(payload_ssrc));
+ }
+ }
+ if (skip_this_block == 0) {
+ uint32_t packet_size = player_put_packet(
+ payload_ssrc, sequence_number_for_player, timestamp, payload_pointer,
+ payload_length, mute, timestamp_difference, conn);
+ debug(3, "block %u, timestamp %u, length %u sent to the player.", seq_no,
+ timestamp, packet_size);
+ sequence_number_for_player++; // simply increment
+ expected_timestamp = timestamp + packet_size; // for the next time
+ packets_played_in_this_sequence++;
+ }
+ }
+ } else {
+ debug(3,
+ "skipped deciphering block %u with timestamp %u because its lead time is "
+ "out of range at %f "
+ "seconds.",
+ seq_no, timestamp, lead_time * 1.0E-9);
+ uint32_t currentAnchorRTP = 0;
+ uint64_t currentAnchorLocalTime = 0;
+ if (get_ptp_anchor_local_time_info(conn, ¤tAnchorRTP,
+ ¤tAnchorLocalTime) == clock_ok) {
+ debug(3, "anchorRTP: %u, anchorLocalTime: % " PRIu64 ".", currentAnchorRTP,
+ currentAnchorLocalTime);
+ } else {
+ debug(3, "Clock not okay");
}
}
} else {
- debug(3, "Unrecognised or invalid ssrc: %s, packet length %d.",
- get_ssrc_name(payload_ssrc), nread);
+ debug(2, "Unrecognised or invalid ssrc: %s.", get_ssrc_name(payload_ssrc));
}
} else {
- if (seq_no % 10 == 0)
- debug(3, "Dropping seqno %u, seqno16 %u, with timestamp %u, leadtime %f.", seq_no,
- seq_no & 0xffff, timestamp, lead_time * 0.000000001);
+ debug(1, "dropping buffer that should have played before the last actually played.");
}
+ new_audio_block_needed = 1; // the block has been used up and is no longer current
+ } else {
+ usleep(20000); // wait for a while
}
}
-
- if (nread == 0) {
- // nread is 0 -- the port has been closed
- debug(3, "buffered audio port closed!");
- finished = 1;
- } else if (nread < 0) {
- char errorstring[1024];
- strerror_r(errno, (char *)errorstring, sizeof(errorstring));
- debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data_len .",
- errno, errorstring);
- finished = 1;
- }
- }
-
- if ((play_enabled != 0) && (payload_pointer != NULL) && (finished == 0) &&
- (new_buffer_needed != 0)) {
-
- // it seems that some garbage blocks can be left after the flush, so
- // only accept them if they have sensible lead times
- if ((lead_time < (int64_t)30000000000L) && (lead_time >= 0)) {
- // if it's the very first block (thus no priming needed)
- // if ((blocks_read == 1) || (blocks_read_in_sequence > 3)) {
- if ((lead_time >= (int64_t)(requested_lead_time * 1000000000L)) ||
- (packets_played_in_this_sequence != 0)) {
- int mute = ((packets_played_in_this_sequence == 0) && (ssrc_is_aac(payload_ssrc)));
- // if (mute) {
- // debug(1, "Muting first AAC block, timestamp %u.", timestamp);
- // }
- int32_t timestamp_difference = 0;
- if (packets_played_in_this_sequence == 0) {
- // first_block_in_this_sequence = seq_no;
- first_timestamp_in_this_sequence = timestamp;
- } else {
- timestamp_difference = timestamp - expected_timestamp;
- if (timestamp_difference != 0) {
- debug(
- 1,
- "Connection %d: "
- "unexpected timestamp in packet %u. Actual: %u, expected: %u difference: %d, "
- "%f ms. "
- "Positive means later, i.e. a gap. First timestamp was %u, payload type: \"%s\".",
- conn->connection_number, seq_no & 0xffff, timestamp, expected_timestamp,
- timestamp_difference, 1000.0 * timestamp_difference / conn->input_rate,
- first_timestamp_in_this_sequence, get_ssrc_name(payload_ssrc));
- // mute the first packet after a discontinuity
- if (ssrc_is_aac(payload_ssrc)) {
- // debug(1, "Muting first AAC block after a timestamp discontinuity, timestamp %u.",
- // timestamp);
- mute = 1;
- }
- }
- }
-
- // frames_per_packet should be set during setup.
- // if ((timestamp_difference >= 0) || (conn->frames_per_packet + timestamp_difference >
- // 0)) {
- if (timestamp_difference < 0)
- debug(3,
- "The next %d frames are late, even though the sequence numbers are in line. The "
- "\"late\" frames should be decoded to prime the AAC decoder and then dropped.",
- -timestamp_difference);
- uint32_t packet_size =
- player_put_packet(payload_ssrc, seq_no & 0xFFFF, timestamp, payload_pointer,
- payload_length, mute, timestamp_difference, conn);
- // debug(1, "put packet: %u, packets_played_in_this_sequence is %d.", seq_no & 0xFFFF,
- // packets_played_in_this_sequence);
- if (last_seqno_put != 0) {
- uint16_t seqno_expected = last_seqno_put + 1;
- if (seqno_expected != (seq_no & 0xffff))
- debug(1, "Packet puts not in sequence. Expected: %u, actual: %u.", seqno_expected,
- (seq_no & 0xffff));
- }
- last_seqno_put = seq_no & 0xffff;
- expected_timestamp = timestamp + packet_size; // for the next time
- new_buffer_needed = 0;
- packets_played_in_this_sequence++;
- /*
- } else {
- // expected_seq_no = seq_no + 1;
- debug(1,
- "Connection %d: "
- "dropping packet %u, seq_no %u, that arrived %.3f ms later than "
- "expected due to a discontinuity in the timestamp sequence. First "
- "timestamp was %u.",
- conn->connection_number, seq_no, timestamp,
- -1000.0 * timestamp_difference / conn->input_rate,
- first_timestamp_in_this_sequence);
- }
- */
- }
- // }
- } else {
- debug(3,
- "Dropping packet %u from seqno %u, seqno16 %u, with out-of-range lead_time: %.3f "
- "seconds.",
- timestamp, seq_no, seq_no & 0xffff, 0.000000001 * lead_time);
- expected_timestamp = timestamp + conn->frames_per_packet; // for the next time
- }
- payload_pointer = NULL; // payload consumed
}
} while (finished == 0);
debug(2, "Buffered Audio Receiver RTP thread \"normal\" exit.");