#include <sys/types.h>
#include <time.h>
#include <unistd.h>
+#ifdef CONFIG_AIRPLAY_2
+#include "ptp-utilities.h"
+#include <libavcodec/avcodec.h>
+#include <libavformat/avformat.h>
+#include <libavutil/opt.h>
+#include <libswresample/swresample.h>
+#include <sodium.h>
+#endif
+
struct Nvll {
char *name;
uint64_t local_to_remote_time_jitter;
uint64_t local_to_remote_time_jitter_count;
+typedef struct {
+ int closed;
+ int error_code;
+ int sock_fd;
+ char *buffer;
+ char *toq;
+ char *eoq;
+ size_t buffer_max_size;
+ size_t buffer_occupancy;
+ pthread_mutex_t mutex;
+ pthread_cond_t not_empty_cv;
+ pthread_cond_t not_full_cv;
+} buffered_tcp_desc;
+
void rtp_initialise(rtsp_conn_info *conn) {
conn->rtp_time_of_last_resend_request_error_ns = 0;
conn->rtp_running = 0;
}
void rtp_terminate(rtsp_conn_info *conn) {
- conn->reference_timestamp = 0;
+ conn->anchor_rtptime = 0;
// destroy the timer mutex
int rc = pthread_mutex_destroy(&conn->reference_time_mutex);
if (rc)
if (plen >= 16) {
if ((config.diagnostic_drop_packet_fraction == 0.0) ||
(drand48() > config.diagnostic_drop_packet_fraction))
- player_put_packet(seqno, actual_timestamp, pktp, plen, conn);
+ player_put_packet(seqno, actual_timestamp, pktp, plen,
+ conn); // the '1' means is original format
else
debug(3, "Dropping audio packet %u to simulate a bad connection.", seqno);
continue;
}
warn("Audio receiver -- Unknown RTP packet of type 0x%02X length %d.", type, nread);
} else {
- debug(1, "Error receiving an audio packet.");
+ char em[1024];
+ strerror_r(errno, em, sizeof(em));
+ debug(1, "Error %d receiving an audio packet: \"%s\".", errno, em);
}
}
}
void rtp_control_handler_cleanup_handler(__attribute__((unused)) void *arg) {
- debug(3, "Control Receiver Cleanup Done.");
+ debug(1, "Control Receiver Cleanup Done.");
}
void *rtp_control_receiver(void *arg) {
pthread_cleanup_push(rtp_control_handler_cleanup_handler, arg);
rtsp_conn_info *conn = (rtsp_conn_info *)arg;
- conn->reference_timestamp = 0; // nothing valid received yet
+ conn->anchor_rtptime = 0; // nothing valid received yet
uint8_t packet[2048], *pktp;
// struct timespec tn;
uint64_t remote_time_of_sync;
}
} else {
uint64_t remote_frame_time_interval =
- conn->remote_reference_timestamp_time -
+ conn->anchor_time -
conn->initial_reference_time; // here, this should never be zero
if (remote_frame_time_interval) {
conn->remote_frame_rate =
- (1.0E9 * (conn->reference_timestamp - conn->initial_reference_timestamp)) /
+ (1.0E9 * (conn->anchor_rtptime - conn->initial_reference_timestamp)) /
remote_frame_time_interval;
} else {
conn->remote_frame_rate = 0.0; // use as a flag.
}
// this is for debugging
- uint64_t old_remote_reference_time = conn->remote_reference_timestamp_time;
- uint32_t old_reference_timestamp = conn->reference_timestamp;
+ uint64_t old_remote_reference_time = conn->anchor_time;
+ uint32_t old_reference_timestamp = conn->anchor_rtptime;
// int64_t old_latency_delayed_timestamp = conn->latency_delayed_timestamp;
- conn->remote_reference_timestamp_time = remote_time_of_sync;
+ conn->anchor_time = remote_time_of_sync;
// conn->reference_timestamp_time =
// remote_time_of_sync - local_to_remote_time_difference_now(conn);
- conn->reference_timestamp = sync_rtp_timestamp;
+ conn->anchor_rtptime = sync_rtp_timestamp;
conn->latency_delayed_timestamp = rtp_timestamp_less_latency;
debug_mutex_unlock(&conn->reference_time_mutex, 0);
// check if packet contains enough content to be reasonable
if (plen >= 16) {
- player_put_packet(seqno, actual_timestamp, pktp, plen, conn);
+ player_put_packet(seqno, actual_timestamp, pktp, plen,
+ conn); // the '1' means is original format
continue;
} else {
debug(3, "Too-short retransmitted audio packet received in control port, ignored.");
debug(3, "Control Receiver -- dropping a packet to simulate a bad network.");
}
} else {
- debug(1, "Control Receiver -- error receiving a packet.");
+
+ char em[1024];
+ strerror_r(errno, em, sizeof(em));
+ debug(1, "Control Receiver -- error %d receiving a packet: \"%s\".", errno, em);
}
}
debug(1, "Control RTP thread \"normal\" exit -- this can't happen. Hah!");
pthread_exit(NULL);
}
-static uint16_t bind_port(int ip_family, const char *self_ip_address, uint32_t scope_id,
- int *sock) {
- // look for a port in the range, if any was specified.
- int ret = 0;
-
- int local_socket = socket(ip_family, SOCK_DGRAM, IPPROTO_UDP);
- if (local_socket == -1)
- die("Could not allocate a socket.");
-
- /*
- int val = 1;
- ret = setsockopt(local_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
- if (ret < 0) {
- char errorstring[1024];
- strerror_r(errno, (char *)errorstring, sizeof(errorstring));
- debug(1, "Error %d: \"%s\". Couldn't set SO_REUSEADDR");
- }
- */
-
- SOCKADDR myaddr;
- int tryCount = 0;
- uint16_t desired_port;
- do {
- tryCount++;
- desired_port = nextFreeUDPPort();
- memset(&myaddr, 0, sizeof(myaddr));
- if (ip_family == AF_INET) {
- struct sockaddr_in *sa = (struct sockaddr_in *)&myaddr;
- sa->sin_family = AF_INET;
- sa->sin_port = ntohs(desired_port);
- inet_pton(AF_INET, self_ip_address, &(sa->sin_addr));
- ret = bind(local_socket, (struct sockaddr *)sa, sizeof(struct sockaddr_in));
- }
-#ifdef AF_INET6
- if (ip_family == AF_INET6) {
- struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&myaddr;
- sa6->sin6_family = AF_INET6;
- sa6->sin6_port = ntohs(desired_port);
- inet_pton(AF_INET6, self_ip_address, &(sa6->sin6_addr));
- sa6->sin6_scope_id = scope_id;
- ret = bind(local_socket, (struct sockaddr *)sa6, sizeof(struct sockaddr_in6));
- }
-#endif
-
- } while ((ret < 0) && (errno == EADDRINUSE) && (desired_port != 0) &&
- (tryCount < config.udp_port_range));
-
- // debug(1,"UDP port chosen: %d.",desired_port);
-
- if (ret < 0) {
- close(local_socket);
- char errorstring[1024];
- strerror_r(errno, (char *)errorstring, sizeof(errorstring));
- die("error %d: \"%s\". Could not bind a UDP port! Check the udp_port_range is large enough -- "
- "it must be "
- "at least 3, and 10 or more is suggested -- or "
- "check for restrictive firewall settings or a bad router! UDP base is %u, range is %u and "
- "current suggestion is %u.",
- errno, errorstring, config.udp_port_base, config.udp_port_range, desired_port);
- }
-
- uint16_t sport;
- SOCKADDR local;
- socklen_t local_len = sizeof(local);
- getsockname(local_socket, (struct sockaddr *)&local, &local_len);
-#ifdef AF_INET6
- if (local.SAFAMILY == AF_INET6) {
- struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&local;
- sport = ntohs(sa6->sin6_port);
- } else
-#endif
- {
- struct sockaddr_in *sa = (struct sockaddr_in *)&local;
- sport = ntohs(sa->sin_port);
- }
- *sock = local_socket;
- return sport;
-}
-
void rtp_setup(SOCKADDR *local, SOCKADDR *remote, uint16_t cport, uint16_t tport,
rtsp_conn_info *conn) {
conn->remote_control_port = cport;
conn->remote_timing_port = tport;
- conn->local_control_port = bind_port(conn->connection_ip_family, conn->self_ip_string,
+ conn->local_control_port = bind_UDP_port(conn->connection_ip_family, conn->self_ip_string,
conn->self_scope_id, &conn->control_socket);
- conn->local_timing_port = bind_port(conn->connection_ip_family, conn->self_ip_string,
+ conn->local_timing_port = bind_UDP_port(conn->connection_ip_family, conn->self_ip_string,
conn->self_scope_id, &conn->timing_socket);
- conn->local_audio_port = bind_port(conn->connection_ip_family, conn->self_ip_string,
+ conn->local_audio_port = bind_UDP_port(conn->connection_ip_family, conn->self_ip_string,
conn->self_scope_id, &conn->audio_socket);
debug(3, "listening for audio, control and timing on ports %d, %d, %d.", conn->local_audio_port,
conn->local_control_port, conn->local_timing_port);
- conn->reference_timestamp = 0;
+ conn->anchor_rtptime = 0;
conn->request_sent = 0;
conn->rtp_running = 1;
}
}
-void get_reference_timestamp_stuff(uint32_t *timestamp, uint64_t *timestamp_time,
- uint64_t *remote_timestamp_time, rtsp_conn_info *conn) {
- // types okay
- debug_mutex_lock(&conn->reference_time_mutex, 1000, 0);
- *timestamp = conn->reference_timestamp;
- *remote_timestamp_time = conn->remote_reference_timestamp_time;
- *timestamp_time =
- conn->remote_reference_timestamp_time - local_to_remote_time_difference_now(conn);
- debug_mutex_unlock(&conn->reference_time_mutex, 0);
-}
-
-void clear_reference_timestamp(rtsp_conn_info *conn) {
+void reset_ntp_anchor_info(rtsp_conn_info *conn) {
debug_mutex_lock(&conn->reference_time_mutex, 1000, 1);
- conn->reference_timestamp = 0;
- conn->remote_reference_timestamp_time = 0;
+ conn->anchor_rtptime = 0;
+ conn->anchor_time = 0;
debug_mutex_unlock(&conn->reference_time_mutex, 3);
}
-int have_timestamp_timing_information(rtsp_conn_info *conn) {
- if (conn->reference_timestamp == 0)
+void reset_anchor_info(rtsp_conn_info *conn) {
+ reset_ntp_anchor_info(conn);
+}
+
+int have_ntp_timestamp_timing_information(rtsp_conn_info *conn) {
+ if (conn->anchor_rtptime == 0)
return 0;
else
return 1;
}
+int have_timestamp_timing_information(rtsp_conn_info *conn) {
+ return have_ntp_timestamp_timing_information(conn);
+}
+
// set this to zero to use the rates supplied by the sources, which might not always be completely
// right...
const int use_nominal_rate = 0; // specify whether to use the nominal input rate, usually 44100 fps
if ((conn->initial_reference_time) && (conn->initial_reference_timestamp)) {
// uint32_t local_frames = conn->reference_timestamp - conn->initial_reference_timestamp;
uint32_t local_frames =
- modulo_32_offset(conn->initial_reference_timestamp, conn->reference_timestamp);
- uint64_t local_time = conn->remote_reference_timestamp_time - conn->initial_reference_time;
+ modulo_32_offset(conn->initial_reference_timestamp, conn->anchor_rtptime);
+ uint64_t local_time = conn->anchor_time - conn->initial_reference_time;
if ((local_frames == 0) || (local_time == 0) || (use_nominal_rate)) {
result = 1;
} else {
return result;
}
+void set_ntp_anchor_info(rtsp_conn_info *conn, uint32_t rtptime,
+ uint64_t networktime) {
+ conn->anchor_remote_info_is_valid = 1;
+ conn->anchor_rtptime = rtptime;
+ conn->anchor_time = networktime;
+}
+
// the timestamp is a timestamp calculated at the input rate
// the reference timestamps are denominated in terms of the input rate
-int frame_to_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) {
+int frame_to_ntp_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) {
debug_mutex_lock(&conn->reference_time_mutex, 1000, 0);
int result = 0;
uint64_t time_difference;
uint32_t frame_difference;
result = sanitised_source_rate_information(&frame_difference, &time_difference, conn);
- uint64_t timestamp_interval_time;
uint64_t remote_time_of_timestamp;
- uint32_t timestamp_interval = modulo_32_offset(conn->reference_timestamp, timestamp);
- if (timestamp_interval <=
- conn->input_rate * 3600) { // i.e. timestamp was really after the reference timestamp
- timestamp_interval_time = (timestamp_interval * time_difference) /
+ int32_t timestamp_interval = timestamp - conn->anchor_rtptime;
+ int64_t timestamp_interval_time = timestamp_interval;
+ timestamp_interval_time = timestamp_interval_time * time_difference;
+ timestamp_interval_time = timestamp_interval_time /
frame_difference; // this is the nominal time, based on the
// fps specified between current and
// previous sync frame.
- remote_time_of_timestamp = conn->remote_reference_timestamp_time +
+ remote_time_of_timestamp = conn->anchor_time +
timestamp_interval_time; // based on the reference timestamp time
// plus the time interval calculated based
// on the specified fps.
- } else { // i.e. timestamp was actually before the reference timestamp
- timestamp_interval =
- modulo_32_offset(timestamp, conn->reference_timestamp); // fix the calculation
- timestamp_interval_time = (timestamp_interval * time_difference) /
- frame_difference; // this is the nominal time, based on the
- // fps specified between current and
- // previous sync frame.
- remote_time_of_timestamp = conn->remote_reference_timestamp_time -
- timestamp_interval_time; // based on the reference timestamp time
- // plus the time interval calculated based
- // on the specified fps.
- }
*time = remote_time_of_timestamp - local_to_remote_time_difference_now(conn);
debug_mutex_unlock(&conn->reference_time_mutex, 0);
return result;
}
-int local_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn) {
+int local_ntp_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn) {
debug_mutex_lock(&conn->reference_time_mutex, 1000, 0);
int result = 0;
-
uint64_t time_difference;
uint32_t frame_difference;
result = sanitised_source_rate_information(&frame_difference, &time_difference, conn);
-
// first, get from [local] time to remote time.
uint64_t remote_time = time + local_to_remote_time_difference_now(conn);
// next, get the remote time interval from the remote_time to the reference time
- uint64_t time_interval;
-
// here, we calculate the time interval, in terms of remote time
- uint64_t offset = modulo_64_offset(conn->remote_reference_timestamp_time, remote_time);
- int reference_time_was_earlier = (offset <= (uint64_t)3600000000000);
- if (reference_time_was_earlier) // if we haven't had a reference within the last hour, it'll be
- // taken as afterwards
- time_interval = remote_time - conn->remote_reference_timestamp_time;
- else
- time_interval = conn->remote_reference_timestamp_time - remote_time;
-
+ int64_t offset = remote_time - conn->anchor_time;
// now, convert the remote time interval into frames using the frame rate we have observed or
// which has been nominated
- uint32_t frame_interval = 0;
- if (time_difference)
- frame_interval = (time_interval * frame_difference) / time_difference;
+ int64_t frame_interval = 0;
+ frame_interval = (offset * frame_difference) / time_difference;
+ int32_t frame_interval_32 = frame_interval;
+ uint32_t new_frame = conn->anchor_rtptime + frame_interval_32;
+ *frame = new_frame;
+ debug_mutex_unlock(&conn->reference_time_mutex, 0);
+ return result;
+}
+
+void set_ptp_anchor_info(rtsp_conn_info *conn, uint64_t clock_id, uint32_t rtptime,
+ uint64_t networktime) {
+ if (conn->anchor_clock != clock_id)
+ debug(1, "Connection %d: Set Anchor Clock: %" PRIx64 ".", conn->connection_number, clock_id);
+ conn->anchor_remote_info_is_valid = 1;
+ conn->anchor_rtptime = rtptime;
+ conn->anchor_time = networktime;
+ conn->anchor_clock = clock_id;
+}
+
+void reset_ptp_anchor_info(rtsp_conn_info *conn) {
+ debug(2, "Connection %d: Clear anchor information.", conn->connection_number);
+ conn->last_anchor_info_is_valid = 0;
+ conn->anchor_remote_info_is_valid = 0;
+}
+
+int get_ptp_anchor_local_time_info(rtsp_conn_info *conn, uint32_t *anchorRTP,
+ uint64_t *anchorLocalTime) {
+ int response = 0;
+ uint64_t actual_clock_id, actual_offset;
+
+ int ptp_status = ptp_get_clock_info(&actual_clock_id, &actual_offset);
+
+ if (ptp_status == 0) {
+ if (conn->anchor_remote_info_is_valid != 0) {
+ if (actual_clock_id == conn->anchor_clock) {
+ conn->last_anchor_clock_offset = actual_offset;
+ conn->last_anchor_time_of_update = get_absolute_time_in_ns();
+ conn->last_anchor_info_is_valid = 1;
+ if (anchorRTP != NULL)
+ *anchorRTP = conn->anchor_rtptime;
+ if (anchorLocalTime != NULL)
+ *anchorLocalTime = conn->anchor_time - conn->last_anchor_clock_offset;
+ } else {
+ if (conn->last_anchor_info_is_valid != 0) {
+ int64_t time_since_last_update =
+ get_absolute_time_in_ns() - conn->last_anchor_time_of_update;
+ if (time_since_last_update > 5000000000) {
+ debug(1, "change master clock to %" PRIx64 ".", actual_clock_id);
+ uint64_t new_anchor_time = conn->anchor_time;
+ new_anchor_time = new_anchor_time - conn->last_anchor_clock_offset; // to local
+ new_anchor_time = new_anchor_time + actual_offset; // to the next clock
+ conn->anchor_time = new_anchor_time;
+ conn->anchor_clock = actual_clock_id;
+ if (anchorRTP != NULL)
+ *anchorRTP = conn->anchor_rtptime;
+ if (anchorLocalTime != NULL)
+ *anchorLocalTime = conn->anchor_time - actual_offset;
+ } else {
+ if (anchorRTP != NULL)
+ *anchorRTP = conn->anchor_rtptime;
+ if (anchorLocalTime != NULL)
+ *anchorLocalTime = conn->anchor_time - conn->last_anchor_clock_offset;
+ }
+ } else {
+ debug(2, "no clock and no old anchor times");
+ response = -1;
+ }
+ }
+ } else {
+ debug(2, "don't have anchor_remote_time_info");
+ response = -1;
+ }
+ } else if (ptp_status == -1) {
+ debug(3, "don't have the ptp clock interface");
+ response = -1;
+ } else {
+ debug(3, "ptp clock not valid");
+ response = -1;
+ }
+ return response;
+}
+
+int get_anchor_local_time_info(rtsp_conn_info *conn, uint32_t *anchorRTP,
+ uint64_t *anchorLocalTime) {
+ return get_ptp_anchor_local_time_info(conn, anchorRTP,anchorLocalTime);
+}
+
+int have_ptp_timing_information(rtsp_conn_info *conn) {
+ if (get_anchor_local_time_info(conn, NULL, NULL) == 0)
+ return 1;
else
- debug(1, "local_time_to_frame: time_difference is zero");
- if (reference_time_was_earlier) {
- // debug(1,"Frame interval is %" PRId64 " frames.",frame_interval);
- *frame = (conn->reference_timestamp + frame_interval);
+ return 0;
+}
+
+int have_timing_information(rtsp_conn_info *conn) {
+ return have_ptp_timing_information(conn);
+}
+
+int frame_to_ptp_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) {
+ int result = -1;
+ uint32_t anchor_rtptime;
+ uint64_t anchor_local_time;
+ if (get_anchor_local_time_info(conn, &anchor_rtptime, &anchor_local_time) == 0) {
+ int32_t frame_difference = timestamp - anchor_rtptime;
+ int64_t time_difference = frame_difference;
+ time_difference = time_difference * 1000000000;
+ time_difference = time_difference / conn->input_rate;
+ uint64_t ltime = anchor_local_time + time_difference;
+ *time = ltime;
+ result = 0;
} else {
- // debug(1,"Frame interval is %" PRId64 " frames.",-frame_interval);
- *frame = (conn->reference_timestamp - frame_interval);
+ debug(3, "frame_to_local_time can't get anchor local time information");
+ }
+ return result;
+}
+
+int frame_to_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) {
+ return frame_to_ntp_local_time(timestamp, time, conn);
+}
+
+int local_ptp_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn) {
+ int result = -1;
+ uint32_t anchor_rtptime;
+ uint64_t anchor_local_time;
+ if (get_anchor_local_time_info(conn, &anchor_rtptime, &anchor_local_time) == 0) {
+ int64_t time_difference = time - anchor_local_time;
+ int64_t frame_difference = time_difference;
+ frame_difference = frame_difference * conn->input_rate; // but this is by 10^9
+ frame_difference = frame_difference / 1000000000;
+ int32_t fd32 = frame_difference;
+ uint32_t lframe = anchor_rtptime + fd32;
+ *frame = lframe;
+ result = 0;
+ } else {
+ debug(3, "local_time_to_frame can't get anchor local time information");
}
- debug_mutex_unlock(&conn->reference_time_mutex, 0);
return result;
}
+int local_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn) {
+ return local_ntp_time_to_frame(time, frame, conn);
+}
+
void rtp_request_resend(seq_t first, uint32_t count, rtsp_conn_info *conn) {
+ // debug(1, "rtp_request_resend of %u packets from sequence number %u.", count, first);
if (conn->rtp_running) {
// if (!request_sent) {
// debug(2, "requesting resend of %d packets starting at %u.", count, first);
char req[8]; // *not* a standard RTCP NACK
req[0] = 0x80;
- req[1] = (char)0x55 | (char)0x80; // Apple 'resend'
+#ifdef CONFIG_AIRPLAY_2
+ if (conn->ap2_remote_control_socket_addr_length == 0) {
+ debug(2, "No remote socket -- skipping the resend");
+ return; // hack
+ }
+ req[1] = 0xD5; // Airplay 2 'resend'
+#else
+ req[1] = (char)0x55 | (char)0x80; // Apple 'resend'
+#endif
*(unsigned short *)(req + 2) = htons(1); // our sequence number
*(unsigned short *)(req + 4) = htons(first); // missed seqnum
*(unsigned short *)(req + 6) = htons(count); // count
+#ifndef CONFIG_AIRPLAY_2
socklen_t msgsize = sizeof(struct sockaddr_in);
#ifdef AF_INET6
if (conn->rtp_client_control_socket.SAFAMILY == AF_INET6) {
msgsize = sizeof(struct sockaddr_in6);
}
+#endif
#endif
uint64_t time_of_sending_ns = get_absolute_time_in_ns();
uint64_t resend_error_backoff_time = 300000000; // 0.3 seconds
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 100000;
-
+#ifdef CONFIG_AIRPLAY_2
+ if (setsockopt(conn->ap2_control_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,
+ sizeof(timeout)) < 0)
+ debug(1, "Can't set timeout on resend request socket.");
+ if (sendto(conn->ap2_control_socket, req, sizeof(req), 0,
+ (struct sockaddr *)&conn->ap2_remote_control_socket_addr,
+ conn->ap2_remote_control_socket_addr_length) == -1) {
+#else
if (setsockopt(conn->control_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,
sizeof(timeout)) < 0)
debug(1, "Can't set timeout on resend request socket.");
if (sendto(conn->control_socket, req, sizeof(req), 0,
(struct sockaddr *)&conn->rtp_client_control_socket, msgsize) == -1) {
+#endif
char em[1024];
strerror_r(errno, em, sizeof(em));
debug(2, "Error %d using sendto to request a resend: \"%s\".", errno, em);
//}
}
}
+
+#if 0
+#ifdef CONFIG_AIRPLAY_2
+
+void rtp_event_receiver_cleanup_handler(void *arg) {
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ debug(2, "Connection %d: Event Receiver Cleanup.", conn->connection_number);
+}
+
+void *rtp_event_receiver(void *arg) {
+ debug(2, "Event Receiver started");
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ pthread_cleanup_push(rtp_event_receiver_cleanup_handler, arg);
+
+ listen(conn->event_socket, 5);
+
+ uint8_t packet[4096];
+ ssize_t nread;
+ SOCKADDR remote_addr;
+ memset(&remote_addr, 0, sizeof(remote_addr));
+ socklen_t addr_size = sizeof(remote_addr);
+
+ int fd = accept(conn->event_socket, (struct sockaddr *)&remote_addr, &addr_size);
+ pthread_cleanup_push(socket_cleanup, (void *)fd);
+ int finished = 0;
+ do {
+ nread = recv(fd, packet, sizeof(packet), 0);
+
+ if (nread < 0) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "error in rtp_event_receiver %d: \"%s\". Could not recv a packet.", errno,
+ errorstring);
+ // if ((config.diagnostic_drop_packet_fraction == 0.0) ||
+ // (drand48() > config.diagnostic_drop_packet_fraction)) {
+ } else if (nread > 0) {
+
+ // ssize_t plen = nread;
+ debug(1, "Packet Received on Event Port.");
+ if (packet[1] == 0xD7) {
+ debug(1, "Event Receiver -- Time Announce RTP packet of type 0x%02X length %d received.",
+ packet[1], nread);
+ } else {
+ debug(1, "Event Receiver -- Unknown RTP packet of type 0x%02X length %d received.",
+ packet[1], nread);
+ }
+ // } else {
+ // debug(3, "Event Receiver Thread -- dropping incoming packet to simulate a bad network.");
+ // }
+ } else {
+ finished = 1;
+ }
+ } while (finished == 0);
+ debug(1, "Event Receiver RTP thread \"normal\" exit.");
+ pthread_cleanup_pop(1); // close the socket
+
+ pthread_cleanup_pop(1); // do the cleanup
+ debug(1, "Connection %d: Event Receiver RTP thread exit.", conn->connection_number);
+ pthread_exit(NULL);
+}
+
+void rtp_ap2_control_handler_cleanup_handler(void *arg) {
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ debug(2, "Connection %d: AP2 Control Receiver Cleanup.", conn->connection_number);
+}
+
+int32_t decipher_player_put_packet(uint8_t *ciphered_audio_alt, ssize_t nread,
+ rtsp_conn_info *conn) {
+
+ // this deciphers the packet -- it doesn't decode it from ALAC
+ uint16_t sequence_number = 0;
+
+ // if the packet is too small, don't go ahead.
+ // it must contain an uint16_t sequence number and eight bytes of AAD followed by the
+ // ciphertext and then followed by an eight-byte nonce. Thus it must be greater than 18
+ if (nread > 18) {
+
+ memcpy(&sequence_number, ciphered_audio_alt, sizeof(uint16_t));
+ sequence_number = ntohs(sequence_number);
+
+ uint32_t timestamp;
+ memcpy(×tamp, ciphered_audio_alt + sizeof(uint16_t), sizeof(uint32_t));
+ timestamp = ntohl(timestamp);
+
+ /*
+ uint32_t ssrc;
+ memcpy(&ssrc, packet+8, sizeof(uint32_t));
+ ssrc = ntohl(ssrc);
+ */
+
+ // debug(1, "Realtime Audio Receiver Packet received. Version: %u, Padding: %u, Extension:
+ // %u, Csrc Count: %u, Marker: %u, Payload Type: %u, Sequence Number: %u, Timestamp: %u,
+ // SSRC: %u.", version, padding, extension, csrc_count, marker, payload_type,
+ // sequence_number, timestamp, ssrc);
+
+ unsigned char nonce[12];
+ memset(nonce, 0, sizeof(nonce));
+ memcpy(nonce + 4, ciphered_audio_alt + nread - 8,
+ 8); // front-pad the 8-byte nonce received to get the 12-byte nonce expected
+
+ // https://libsodium.gitbook.io/doc/secret-key_cryptography/aead/chacha20-poly1305/ietf_chacha20-poly1305_construction
+ // Note: the eight-byte nonce must be front-padded out to 12 bytes.
+
+ unsigned char m[4096];
+ unsigned long long new_payload_length = 0;
+ int response = crypto_aead_chacha20poly1305_ietf_decrypt(
+ m, // m
+ &new_payload_length, // mlen_p
+ NULL, // nsec,
+ ciphered_audio_alt +
+ 10, // the ciphertext starts 10 bytes in and is followed by the MAC tag,
+ nread - (8 + 10), // clen -- the last 8 bytes are the nonce
+ ciphered_audio_alt + 2, // authenticated additional data
+ 8, // authenticated additional data length
+ nonce,
+ conn->session_key); // *k
+ if (response != 0) {
+ debug(1, "Error decrypting an audio packet.");
+ }
+ // now pass it in to the regular processing chain
+
+ unsigned long long max_int = INT_MAX; // put in the right format
+ if (new_payload_length > max_int)
+ debug(1, "Madly long payload length!");
+ int plen = new_payload_length; //
+ player_put_packet(sequence_number, timestamp, m, plen,
+ conn); // the '1' means is original format
+ return sequence_number;
+ } else {
+ debug(1, "packet was too small -- ignored");
+ return -1;
+ }
+}
+
+void *rtp_ap2_control_receiver(void *arg) {
+ pthread_cleanup_push(rtp_ap2_control_handler_cleanup_handler, arg);
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ uint8_t packet[4096];
+ ssize_t nread;
+ while (1) {
+ SOCKADDR from_sock_addr;
+ socklen_t from_sock_addr_length = sizeof(SOCKADDR);
+ memset(&from_sock_addr, 0, sizeof(SOCKADDR));
+
+ nread = recvfrom(conn->ap2_control_socket, packet, sizeof(packet), 0,
+ (struct sockaddr *)&from_sock_addr, &from_sock_addr_length);
+
+ if (nread >= 0) {
+ // debug(1,"rtp_ap2_control_receiver coded: %u, %u", packet[0], packet[1]);
+
+ if ((config.diagnostic_drop_packet_fraction == 0.0) ||
+ (drand48() > config.diagnostic_drop_packet_fraction)) {
+ // store the from_sock_addr if we haven't already done so
+ // v remember to zero this when you're finished!
+ if (conn->ap2_remote_control_socket_addr_length == 0) {
+ memcpy(&conn->ap2_remote_control_socket_addr, &from_sock_addr, from_sock_addr_length);
+ conn->ap2_remote_control_socket_addr_length = from_sock_addr_length;
+ }
+ switch (packet[1]) {
+ case 215: // code 215, effectively an anchoring announcement
+ {
+ // struct timespec tnr;
+ // clock_gettime(CLOCK_REALTIME, &tnr);
+ // uint64_t local_realtime_now = timespec_to_ns(&tnr);
+
+ /*
+ char obf[4096];
+ char *obfp = obf;
+ int obfc;
+ for (obfc=0;obfc<nread;obfc++) {
+ snprintf(obfp, 3, "%02X", packet[obfc]);
+ obfp+=2;
+ };
+ *obfp=0;
+ debug(1,"AP2 Timing Control Received: \"%s\"",obf);
+ */
+
+ uint64_t remote_packet_time_ns = nctoh64(packet + 8);
+ uint64_t clock_id = nctoh64(packet + 20);
+
+ // debug(1, "we have clock_id: %" PRIx64 ".", clock_id);
+ // debug(1,"remote_packet_time_ns: %" PRIx64 ", local_realtime_now_ns: %" PRIx64 ".",
+ // remote_packet_time_ns, local_realtime_now);
+ uint32_t frame_1 =
+ nctohl(packet + 4); // this seems to be the frame with latency of 77165 included
+ uint32_t frame_2 = nctohl(packet + 16); // this seems to be the frame the time refers to
+ // this just updates the anchor information contained in the packet
+ // the frame and its remote time
+ // add in the audio_backend_latency_offset;
+ int32_t notified_latency = frame_2 - frame_1;
+ int32_t added_latency = (int32_t)(config.audio_backend_latency_offset * conn->input_rate);
+ if (added_latency < (-(notified_latency + 11035)))
+ debug(1, "the audio_backend_latency_offset is causing a negative latency!");
+
+ /*
+ debug_mutex_lock(&conn->reference_time_mutex, 1000, 0);
+ conn->remote_reference_timestamp_time = remote_packet_time_ns;
+ conn->reference_timestamp =
+ frame_1 - 11035 - added_latency; // add the latency in to the anchortime
+ debug_mutex_unlock(&conn->reference_time_mutex, 0);
+ */
+
+ set_ptp_anchor_info(conn, clock_id, frame_1 - 11035 - added_latency, remote_packet_time_ns);
+
+ } break;
+ case 0xd6:
+ // six bytes in is the sequence number at the start of the encrypted audio packet
+ // returns the sequence number but we're not really interested
+ decipher_player_put_packet(packet + 6, nread - 6, conn);
+ break;
+ default: {
+ char *packet_in_hex_cstring =
+ debug_malloc_hex_cstring(packet, nread); // remember to free this afterwards
+ debug(1,
+ "AP2 Control Receiver Packet of first byte 0x%02X, type 0x%02X length %d received: "
+ "\"%s\".",
+ packet[0], packet[1], nread, packet_in_hex_cstring);
+ free(packet_in_hex_cstring);
+ } break;
+ }
+ } else {
+ debug(1, "AP2 Control Receiver -- dropping a packet.");
+ }
+ } else {
+ debug(1, "AP2 Control Receiver -- error %d receiving a packet.", errno);
+ }
+ }
+ debug(1, "AP2 Control RTP thread \"normal\" exit -- this can't happen. Hah!");
+ pthread_cleanup_pop(0); // don't execute anything here.
+ debug(1, "AP2 Control RTP thread exit.");
+ pthread_exit(NULL);
+}
+
+void rtp_realtime_audio_cleanup_handler(__attribute__((unused)) void *arg) {
+ debug(1, "Realtime Audio Receiver Cleanup Start.");
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ close(conn->realtime_audio_socket);
+ conn->realtime_audio_socket = 0;
+ debug(1, "Realtime Audio Receiver Cleanup Done.");
+}
+
+void *rtp_realtime_audio_receiver(void *arg) {
+ pthread_cleanup_push(rtp_realtime_audio_cleanup_handler, arg);
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ uint8_t packet[4096];
+ int32_t last_seqno = -1;
+ ssize_t nread;
+ while (1) {
+ nread = recv(conn->realtime_audio_socket, packet, sizeof(packet), 0);
+
+ if (nread > 36) { // 36 is the 12-byte header and and 24-byte footer
+ if ((config.diagnostic_drop_packet_fraction == 0.0) ||
+ (drand48() > config.diagnostic_drop_packet_fraction)) {
+
+ /*
+ char *packet_in_hex_cstring =
+ debug_malloc_hex_cstring(packet, nread); // remember to free this afterwards
+ debug(1, "Audio Receiver Packet of type 0x%02X length %d received: \"%s\".",
+ packet[1], nread, packet_in_hex_cstring);
+ free(packet_in_hex_cstring);
+ */
+
+ /*
+ // debug(1, "Realtime Audio Receiver Packet of type 0x%02X length %d received.", packet[1],
+ nread);
+ // now get hold of its various bits and pieces
+ uint8_t version = (packet[0] & 0b11000000) >> 6;
+ uint8_t padding = (packet[0] & 0b00100000) >> 5;
+ uint8_t extension = (packet[0] & 0b00010000) >> 4;
+ uint8_t csrc_count = packet[0] & 0b00001111;
+ uint8_t marker = (packet[1] & 0b1000000) >> 7;
+ uint8_t payload_type = packet[1] & 0b01111111;
+ */
+
+ int32_t seqno = decipher_player_put_packet(packet + 2, nread - 2, conn);
+ if (seqno >= 0) {
+ if (last_seqno == -1) {
+ last_seqno = seqno;
+ } else {
+ last_seqno = (last_seqno + 1) & 0xffff;
+ // if (seqno != last_seqno)
+ // debug(3, "RTP: Packets out of sequence: expected: %d, got %d.", last_seqno, seqno);
+ last_seqno = seqno; // reset warning...
+ }
+ } else {
+ debug(1, "Realtime Audio Receiver -- bad packet dropped.");
+ }
+ } else {
+ debug(3, "Realtime Audio Receiver -- dropping a packet.");
+ }
+ } else {
+ debug(1, "Realtime Audio Receiver -- error receiving a packet.");
+ }
+ }
+ pthread_cleanup_pop(0); // don't execute anything here.
+ pthread_exit(NULL);
+}
+
+void rtp_buffered_audio_receiver_cleanup_handler(__attribute__((unused)) void *arg) {
+ debug(1, "Buffered Audio Receiver Cleanup Start.");
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ close(conn->buffered_audio_socket);
+ conn->buffered_audio_socket = 0;
+ debug(1, "Buffered Audio Receiver Cleanup Done.");
+}
+
+ssize_t buffered_read(buffered_tcp_desc *descriptor, void *buf, size_t count) {
+ ssize_t response;
+ // usleep(1500000);
+ if (pthread_mutex_lock(&descriptor->mutex) != 0)
+ debug(1, "problem with mutex");
+ pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
+ if ((descriptor->closed == 0)) {
+ while ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0)) {
+ if (pthread_cond_wait(&descriptor->not_empty_cv, &descriptor->mutex))
+ debug(1, "Error waiting for buffered read");
+ }
+ }
+ if (descriptor->buffer_occupancy != 0) {
+ ssize_t bytes_to_move = count;
+
+ if (descriptor->buffer_occupancy < count)
+ bytes_to_move = descriptor->buffer_occupancy;
+
+ ssize_t top_gap = descriptor->buffer + descriptor->buffer_max_size - descriptor->toq;
+ if (top_gap < bytes_to_move)
+ bytes_to_move = top_gap;
+
+ memcpy(buf, descriptor->toq, bytes_to_move);
+ descriptor->toq += bytes_to_move;
+ if (descriptor->toq == descriptor->buffer + descriptor->buffer_max_size)
+ descriptor->toq = descriptor->buffer;
+ descriptor->buffer_occupancy -= bytes_to_move;
+ response = bytes_to_move;
+ if (pthread_cond_signal(&descriptor->not_full_cv))
+ debug(1, "Error signalling");
+ } else if (descriptor->error_code) {
+ errno = descriptor->error_code;
+ response = -1;
+ } else if (descriptor->closed != 0) {
+ response = 0;
+ }
+
+ pthread_cleanup_pop(1); // release the mutex
+ return response;
+}
+
+#define STANDARD_PACKET_SIZE 65536
+
+void buffered_tcp_reader_cleanup_handler(__attribute__((unused)) void *arg) {
+ debug(2, "Buffered TCP Reader Thread Exit via Cleanup.");
+}
+
+void *buffered_tcp_reader(void *arg) {
+ pthread_cleanup_push(buffered_tcp_reader_cleanup_handler, NULL);
+ buffered_tcp_desc *descriptor = (buffered_tcp_desc *)arg;
+
+ listen(descriptor->sock_fd, 5);
+ ssize_t nread;
+ SOCKADDR remote_addr;
+ memset(&remote_addr, 0, sizeof(remote_addr));
+ socklen_t addr_size = sizeof(remote_addr);
+ int finished = 0;
+ int fd = accept(descriptor->sock_fd, (struct sockaddr *)&remote_addr, &addr_size);
+ pthread_cleanup_push(socket_cleanup, (void *)fd);
+
+ do {
+ if (pthread_mutex_lock(&descriptor->mutex) != 0)
+ debug(1, "problem with mutex");
+ pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
+ while ((descriptor->buffer_occupancy == descriptor->buffer_max_size) ||
+ (descriptor->error_code != 0) || (descriptor->closed != 0)) {
+ if (pthread_cond_wait(&descriptor->not_full_cv, &descriptor->mutex))
+ debug(1, "Error waiting for buffered read");
+ }
+ pthread_cleanup_pop(1); // release the mutex
+
+ // now we know it is not full, so go ahead and try to read some more into it
+
+ // wrap
+ if ((size_t)(descriptor->eoq - descriptor->buffer) == descriptor->buffer_max_size)
+ descriptor->eoq = descriptor->buffer;
+
+ // figure out how much to ask for
+ size_t bytes_to_request = STANDARD_PACKET_SIZE;
+ size_t free_space = descriptor->buffer_max_size - descriptor->buffer_occupancy;
+ if (bytes_to_request > free_space)
+ bytes_to_request = free_space; // don't ask for more than will fit
+
+ size_t gap_to_end_of_buffer =
+ descriptor->buffer + descriptor->buffer_max_size - descriptor->eoq;
+ if (gap_to_end_of_buffer < bytes_to_request)
+ bytes_to_request =
+ gap_to_end_of_buffer; // only ask for what will fill to the top of the buffer
+
+ // do the read
+ // debug(1, "Request buffered read of up to %d bytes.", bytes_to_request);
+ nread = recv(fd, descriptor->eoq, bytes_to_request, 0);
+ // debug(1, "Received %d bytes for a buffer size of %d bytes.",nread,
+ // descriptor->buffer_occupancy + nread);
+ if (pthread_mutex_lock(&descriptor->mutex) != 0)
+ debug(1, "problem with not empty mutex");
+ pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex);
+ if (nread < 0) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "error in buffered_read %d: \"%s\". Could not recv a packet.", errno, errorstring);
+ descriptor->error_code = errno;
+ } else if (nread == 0) {
+ descriptor->closed = 1;
+ } else if (nread > 0) {
+ descriptor->eoq += nread;
+ descriptor->buffer_occupancy += nread;
+ } else {
+ debug(1, "buffered audio port closed!");
+ }
+ // signal if we got data or an error or the file closed
+ if (pthread_cond_signal(&descriptor->not_empty_cv))
+ debug(1, "Error signalling");
+ pthread_cleanup_pop(1); // release the mutex
+ } while (finished == 0);
+
+ debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit Begin.");
+ pthread_cleanup_pop(1); // close the socket
+ pthread_cleanup_pop(1); // cleanup
+ debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit -- Shouldn't happen!.");
+ pthread_exit(NULL);
+}
+
+void avcodec_alloc_context3_cleanup_handler(void *arg) {
+ debug(3, "avcodec_alloc_context3_cleanup_handler");
+ AVCodecContext *codec_context = arg;
+ av_free(codec_context);
+}
+
+void avcodec_open2_cleanup_handler(void *arg) {
+ debug(3, "avcodec_open2_cleanup_handler");
+ AVCodecContext *codec_context = arg;
+ avcodec_close(codec_context);
+}
+
+void av_parser_init_cleanup_handler(void *arg) {
+ debug(3, "av_parser_init_cleanup_handler");
+ AVCodecParserContext *codec_parser_context = arg;
+ av_parser_close(codec_parser_context);
+}
+
+void swr_alloc_cleanup_handler(void *arg) {
+ debug(3, "swr_alloc_cleanup_handler");
+ SwrContext **swr = arg;
+ swr_free(swr);
+}
+
+void av_packet_alloc_cleanup_handler(void *arg) {
+ debug(3, "av_packet_alloc_cleanup_handler");
+ AVPacket **pkt = arg;
+ av_packet_free(pkt);
+}
+
+// this will read a block of the size specified to the buffer
+// and will return either with the block or on error
+ssize_t lread_sized_block(buffered_tcp_desc *descriptor, void *buf, size_t count) {
+ ssize_t response, nread;
+ size_t inbuf = 0; // bytes already in the buffer
+ int keep_trying = 1;
+
+ do {
+ nread = buffered_read(descriptor, buf + inbuf, count - inbuf);
+ if (nread == 0) {
+ // a blocking read that returns zero means eof -- implies connection closed
+ debug(3, "read_sized_block connection closed.");
+ keep_trying = 0;
+ } else if (nread < 0) {
+ if (errno == EAGAIN) {
+ debug(1, "read_sized_block getting Error 11 -- EAGAIN from a blocking read!");
+ }
+ if ((errno != ECONNRESET) && (errno != EAGAIN) && (errno != EINTR)) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "read_sized_block read error %d: \"%s\".", errno, (char *)errorstring);
+ keep_trying = 0;
+ }
+ } else {
+ inbuf += (size_t)nread;
+ }
+ } while ((keep_trying != 0) && (inbuf < count));
+ if (nread <= 0)
+ response = nread;
+ else
+ response = inbuf;
+ return response;
+}
+
+// From
+// https://stackoverflow.com/questions/18862715/how-to-generate-the-aac-adts-elementary-stream-with-android-mediacodec
+// with thanks!
+/**
+ * Add ADTS header at the beginning of each and every AAC packet.
+ * This is needed as MediaCodec encoder generates a packet of raw
+ * AAC data.
+ *
+ * Note the packetLen must count in the ADTS header itself.
+ **/
+void addADTStoPacket(uint8_t *packet, int packetLen) {
+ int profile = 2; // AAC LC
+ // 39=MediaCodecInfo.CodecProfileLevel.AACObjectELD;
+ int freqIdx = 4; // 44.1KHz
+ int chanCfg = 2; // CPE
+
+ // fill in ADTS data
+ packet[0] = 0xFF;
+ packet[1] = 0xF9;
+ packet[2] = ((profile - 1) << 6) + (freqIdx << 2) + (chanCfg >> 2);
+ packet[3] = ((chanCfg & 3) << 6) + (packetLen >> 11);
+ packet[4] = (packetLen & 0x7FF) >> 3;
+ packet[5] = ((packetLen & 7) << 5) + 0x1F;
+ packet[6] = 0xFC;
+}
+
+void rtp_buffered_audio_cleanup_handler(__attribute__((unused)) void *arg) {
+ debug(3, "Buffered Audio Receiver Cleanup Start.");
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ close(conn->buffered_audio_socket);
+ conn->buffered_audio_socket = 0;
+ debug(2, "Buffered Audio Receiver Cleanup Done.");
+}
+
+void *rtp_buffered_audio_processor(void *arg) {
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ pthread_cleanup_push(rtp_buffered_audio_cleanup_handler, arg);
+
+ pthread_t *buffered_reader_thread = malloc(sizeof(pthread_t));
+ if (buffered_reader_thread == NULL)
+ debug(1, "cannot allocate a buffered_reader_thread!");
+ memset(buffered_reader_thread, 0, sizeof(pthread_t));
+ pthread_cleanup_push(malloc_cleanup, buffered_reader_thread);
+
+ buffered_tcp_desc *buffered_audio = malloc(sizeof(buffered_tcp_desc));
+ if (buffered_audio == NULL)
+ debug(1, "cannot allocate a buffered_tcp_desc!");
+ // initialise the descriptor
+ memset(buffered_audio, 0, sizeof(buffered_tcp_desc));
+ pthread_cleanup_push(malloc_cleanup, buffered_audio);
+
+ if (pthread_mutex_init(&buffered_audio->mutex, NULL))
+ debug(1, "Connection %d: error %d initialising buffered_audio mutex.", conn->connection_number,
+ errno);
+ pthread_cleanup_push(mutex_cleanup, &buffered_audio->mutex);
+
+ if (pthread_cond_init(&buffered_audio->not_empty_cv, NULL))
+ die("Connection %d: error %d initialising not_empty cv.", conn->connection_number, errno);
+ pthread_cleanup_push(cv_cleanup, &buffered_audio->not_empty_cv);
+
+ if (pthread_cond_init(&buffered_audio->not_full_cv, NULL))
+ die("Connection %d: error %d initialising not_full cv.", conn->connection_number, errno);
+ pthread_cleanup_push(cv_cleanup, &buffered_audio->not_full_cv);
+
+ // initialise the buffer data structure
+ buffered_audio->buffer_max_size = conn->ap2_audio_buffer_size;
+ buffered_audio->buffer = malloc(conn->ap2_audio_buffer_size);
+ if (buffered_audio->buffer == NULL)
+ debug(1, "cannot allocate an audio buffer of %u bytes!", buffered_audio->buffer_max_size);
+ pthread_cleanup_push(malloc_cleanup, buffered_audio->buffer);
+
+ // pthread_mutex_lock(&conn->buffered_audio_mutex);
+ buffered_audio->toq = buffered_audio->buffer;
+ buffered_audio->eoq = buffered_audio->buffer;
+
+ buffered_audio->sock_fd = conn->buffered_audio_socket;
+
+ pthread_create(buffered_reader_thread, NULL, &buffered_tcp_reader, buffered_audio);
+ pthread_cleanup_push(thread_cleanup, buffered_reader_thread);
+
+ // ideas and some code from https://rodic.fr/blog/libavcodec-tutorial-decode-audio-file/
+ // with thanks
+
+ // initialize all muxers, demuxers and protocols for libavformat
+ // (does nothing if called twice during the course of one program execution)
+ // not needed in ffmpeg 4.0 and later...
+ // av_register_all();
+
+ AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC);
+ if (codec == NULL) {
+ debug(1, "Can't find an AAC decoder!");
+ }
+
+ AVCodecContext *codec_context = avcodec_alloc_context3(codec);
+ if (codec_context == NULL) {
+ debug(1, "Could not allocate audio codec context!");
+ }
+ // push a deallocator -- av_free(codec_context)
+ pthread_cleanup_push(avcodec_alloc_context3_cleanup_handler, codec_context);
+
+ if (avcodec_open2(codec_context, codec, NULL) < 0) {
+ debug(1, "Could not open a codec into the audio codec context");
+ }
+ // push a closer -- avcodec_close(codec_context);
+ pthread_cleanup_push(avcodec_open2_cleanup_handler, codec_context);
+
+ AVCodecParserContext *codec_parser_context = av_parser_init(codec->id);
+ if (codec_parser_context == NULL) {
+ debug(1, "Can't initialise a parser context!");
+ }
+ // push a closer -- av_parser_close(codec_parser_context);
+ pthread_cleanup_push(av_parser_init_cleanup_handler, codec_parser_context);
+
+ AVPacket *pkt = av_packet_alloc();
+ if (pkt == NULL) {
+ debug(1, "Can't allocate an AV packet");
+ }
+ // push a deallocator -- av_packet_free(pkt);
+ pthread_cleanup_push(av_packet_alloc_cleanup_handler, &pkt);
+
+ AVFrame *decoded_frame = NULL;
+ int dst_linesize;
+ int dst_bufsize;
+
+ // Prepare software resampler to convert floating point (?) to S16
+ SwrContext *swr = swr_alloc();
+ if (swr == NULL) {
+ debug(1, "can not allocate a swr context");
+ }
+ // push a deallocator -- av_packet_free(pkt);
+ pthread_cleanup_push(swr_alloc_cleanup_handler, &swr);
+
+ av_opt_set_int(swr, "in_channel_layout", AV_CH_LAYOUT_STEREO, 0);
+ av_opt_set_int(swr, "out_channel_layout", AV_CH_LAYOUT_STEREO, 0);
+ av_opt_set_int(swr, "in_sample_rate", 44100, 0);
+ av_opt_set_int(swr, "out_sample_rate", 44100, 0);
+ av_opt_set_sample_fmt(swr, "in_sample_fmt", AV_SAMPLE_FMT_FLTP, 0);
+ av_opt_set_sample_fmt(swr, "out_sample_fmt", AV_SAMPLE_FMT_S16, 0);
+ swr_init(swr);
+
+ uint8_t packet[16 * 1024];
+ unsigned char m[16 * 1024]; // leave the first 7 bytes blank to make room for the ADTS
+ uint8_t *pcm_audio = NULL; // the S16 output
+ unsigned char *data_to_process;
+ ssize_t data_remaining;
+ uint32_t seq_no, last_block_flushed_seq_no; // audio packet number
+ uint32_t previous_seq_no = 0;
+ int new_buffer_needed;
+
+ uint32_t last_block_flushed_rtp_time;
+
+ ssize_t nread;
+
+ int finished = 0;
+ int pcm_buffer_size =
+ (1024 + 352) * 8; // This seems to be right. 8 is for 2 * 32-bit samples per frame
+ uint8_t pcm_buffer[pcm_buffer_size];
+
+ int pcm_buffer_occupancy = 0;
+ int pcm_buffer_read_point = 0; // offset to where the next buffer should come from
+ uint32_t pcm_buffer_read_point_rtptime = 0;
+
+ uint64_t blocks_read = 0;
+ int flush_requested = 0;
+ int streaming_has_started = 0;
+ do {
+ int flush_newly_requested = 0;
+ // are we in in flush mode, or just about to leave it?
+ debug_mutex_lock(&conn->flush_mutex, 10000, 1); // 10ms is a long time to wait!
+ uint32_t flushUntilSeq = conn->ap2_flush_sequence_number;
+ uint32_t flushUntilTS = conn->ap2_flush_rtp_timestamp;
+ // if we are in flush mode
+ if (conn->ap2_flush_requested) {
+ if (flush_requested == 0) {
+ // here, a flush has been newly requested
+ debug(1,
+ "flush requested. flushUntilSeq = %u, flushUntilTS = %u, seq_no: %u, "
+ "pcm_buffer_read_point_rtptime = "
+ "%u, buffer_size: %u.",
+ flushUntilSeq, flushUntilTS, seq_no, pcm_buffer_read_point_rtptime,
+ buffered_audio->buffer_occupancy);
+ last_block_flushed_rtp_time = pcm_buffer_read_point_rtptime;
+ last_block_flushed_seq_no = seq_no;
+ flush_newly_requested = 1;
+ }
+ // if we have arrived at the nominated flush block
+ // (it must be the case that the block is in the pcm buffer)
+ if ((blocks_read != 0) && (seq_no == flushUntilSeq)) {
+ // we may need to flush the pcm_buffer up to flushUntilTS
+ int32_t flush_difference = flushUntilTS - pcm_buffer_read_point_rtptime;
+ if (flush_difference < 0) {
+ debug(1, "flushUntilTS is before the flushUntilSeq's start TS");
+ } else if (flush_difference > pcm_buffer_occupancy) {
+ debug(1,
+ "flushUntilTS is after the flushUntilSeq's last TS. flushUntilTS = %u, "
+ "pcm_buffer_read_point_rtptime = %u, pcm_buffer_occupancy = %u.",
+ flushUntilTS, pcm_buffer_read_point_rtptime, pcm_buffer_occupancy);
+ } else {
+ // move the buffer contents from flush_difference to pcm_buffer_occupancy
+ // to the front of the pcm_buffer
+ memcpy(pcm_buffer, pcm_buffer + flush_difference * 4,
+ pcm_buffer_occupancy - flush_difference * 4);
+ pcm_buffer_occupancy = pcm_buffer_occupancy - flush_difference * 4;
+ pcm_buffer_read_point = 0;
+ debug(1, "set pcm_buffer_read_point_rtptime from %u to %u.",
+ pcm_buffer_read_point_rtptime, flushUntilTS);
+ pcm_buffer_read_point_rtptime = flushUntilTS;
+ }
+ conn->ap2_flush_requested = 0;
+ } else {
+ if (seq_no > flushUntilSeq) {
+ debug(1,
+ "we have overshot the flush ending block. flushUntilSeq is %u, this block is: %u.",
+ flushUntilSeq, seq_no);
+ conn->ap2_flush_requested = 0; // but conn->ap2_play_enabled will remain at zero
+ }
+ }
+ }
+ flush_requested = conn->ap2_flush_requested;
+
+ // this isn't always the case, i.e. that play is re-enabled after a flush
+ // if (flush_requested)
+ // conn->ap2_play_enabled = 0;
+ // play is disabled until an anchor time is sent and play is restarted
+
+ int play_enabled = conn->ap2_play_enabled;
+ debug_mutex_unlock(&conn->flush_mutex, 3);
+
+ // do this outside the flush mutex
+ if (flush_newly_requested) {
+ player_full_flush(conn);
+ streaming_has_started = 0;
+ pcm_buffer_occupancy = 0;
+ pcm_buffer_read_point = 0;
+ }
+
+ // now, if a flush is not requested, we can do the normal stuff
+ if (flush_requested == 0) {
+ // is there space in the player thread's buffer system?
+ unsigned int player_buffer_size, player_buffer_occupancy;
+ get_audio_buffer_size_and_occupancy(&player_buffer_size, &player_buffer_occupancy, conn);
+ // debug(1,"player buffer size and occupancy: %u and %u", player_buffer_size,
+ // player_buffer_occupancy);
+ if (player_buffer_occupancy >
+ ((0.5 + 0.1) * 44100.0 / 352)) { // must be greater than the lead time.
+ // if there is enough stuff in the player's buffer, sleep for a while and try again
+ // debug(1,"sleep for 20 ms");
+ usleep(20000); // wait for a while
+ } else {
+ if ((pcm_buffer_occupancy - pcm_buffer_read_point) >= (352 * 4)) {
+ new_buffer_needed = 0;
+ // send a frame to the player if allowed
+ // it it's way too late, it means that a new anchor time is needed
+
+ /*
+ uint32_t at_rtp = conn->reference_timestamp;
+ at_rtp =
+ at_rtp - (44100 * 10); // allow it to start a few seconds late, but not
+ madly late int rtp_diff = pcm_buffer_read_point_rtptime - at_rtp;
+ */
+
+ if (play_enabled) {
+ uint64_t buffer_should_be_time;
+ if (frame_to_local_time(pcm_buffer_read_point_rtptime, &buffer_should_be_time, conn) ==
+ 0) {
+ int64_t lead_time = buffer_should_be_time - get_absolute_time_in_ns();
+ // debug(1,"lead time in buffered_audio is %f milliseconds.", lead_time * 0.000001);
+ // ask for 0.5 sec of leadtime
+ if ((lead_time >= (int64_t)(0.5 * 1000000000)) || (streaming_has_started == 1)) {
+ if (streaming_has_started == 0)
+ debug(1, "rtp lead time is %f ms.", 0.000001 * lead_time);
+ streaming_has_started = 1;
+// player_put_packet(0, 0, pcm_buffer_read_point_rtptime,
+ player_put_packet(0, pcm_buffer_read_point_rtptime,
+ pcm_buffer + pcm_buffer_read_point, 352, conn);
+ }
+
+ pcm_buffer_read_point_rtptime += 352;
+ pcm_buffer_read_point += 352 * 4;
+ }
+ // usleep(2000); // let other stuff happens
+ } else {
+ usleep(20000); // wait before asking if play is enabled again
+ }
+ } else {
+ new_buffer_needed = 1;
+ if (pcm_buffer_read_point != 0) {
+ // debug(1,"pcm_buffer_read_point (frames): %u, pcm_buffer_occupancy (frames): %u",
+ // pcm_buffer_read_point/4, pcm_buffer_occupancy/4);
+ // if there is anything to move down to the front of the buffer, do it now;
+ if ((pcm_buffer_occupancy - pcm_buffer_read_point) > 0) {
+ // move the remaining frames down to the start of the buffer
+ // debug(1,"move the remaining frames down to the start of the pcm_buffer");
+ memcpy(pcm_buffer, pcm_buffer + pcm_buffer_read_point,
+ pcm_buffer_occupancy - pcm_buffer_read_point);
+ pcm_buffer_occupancy = pcm_buffer_occupancy - pcm_buffer_read_point;
+ } else {
+ // debug(1,"nothing to move to the front of the buffer");
+ pcm_buffer_occupancy = 0;
+ }
+ pcm_buffer_read_point = 0;
+ }
+ }
+ }
+ }
+ if ((flush_requested) || (new_buffer_needed)) {
+
+ // debug(1,"pcm_buffer_read_point (frames): %u, pcm_buffer_occupancy (frames): %u",
+ // pcm_buffer_read_point/4, pcm_buffer_occupancy/4);
+ // ok, so here we know we need material from the sender
+ // do we will get in a packet of audio
+ uint16_t data_len;
+ // here we read from the buffer that our thread has been reading
+ nread = lread_sized_block(buffered_audio, &data_len, sizeof(data_len));
+ if (nread < 0) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data_len .",
+ errno, errorstring);
+ // if ((config.diagnostic_drop_packet_fraction == 0.0) ||
+ // (drand48() > config.diagnostic_drop_packet_fraction)) {
+ }
+ data_len = ntohs(data_len);
+ // debug(1,"buffered audio packet of size %u detected.", data_len - 2);
+ nread = lread_sized_block(buffered_audio, packet, data_len - 2);
+ // debug(1, "buffered audio packet of size %u received.", nread);
+ if (nread < 0) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data packet.",
+ errno, errorstring);
+ // if ((config.diagnostic_drop_packet_fraction == 0.0) ||
+ // (drand48() > config.diagnostic_drop_packet_fraction)) {
+ } else if (nread > 0) {
+ blocks_read++; // note, this doesn't mean they are valid audio blocks
+ // debug(1, "Realtime Audio Receiver Packet of length %d received.", nread);
+ // now get hold of its various bits and pieces
+ /*
+ uint8_t version = (packet[0] & 0b11000000) >> 6;
+ uint8_t padding = (packet[0] & 0b00100000) >> 5;
+ uint8_t extension = (packet[0] & 0b00010000) >> 4;
+ uint8_t csrc_count = packet[0] & 0b00001111;
+ */
+ seq_no = packet[1] * (1 << 16) + packet[2] * (1 << 8) + packet[3];
+ uint32_t timestamp = nctohl(&packet[4]);
+ // uint32_t ssrc = nctohl(&packet[8]);
+ // uint8_t marker = 0;
+ // uint8_t payload_type = 0;
+
+ if ((blocks_read != 0) && (seq_no != previous_seq_no + 1)) {
+ if (previous_seq_no != 0)
+ debug(1, "block discontinuity: from sequence number %u to sequence number %u.",
+ previous_seq_no, seq_no);
+ if (pcm_buffer_occupancy != 0) {
+ debug(1,
+ "rtptime discontinuity! Existing pcm buffer contents with timestamp "
+ "%u, seq_no %u to new block with timestamp %u, seq_no %u",
+ pcm_buffer_read_point_rtptime, previous_seq_no, timestamp, seq_no);
+ pcm_buffer_occupancy = 0;
+ }
+ // if still playing, this is a problem -- need to reset the player
+ if (play_enabled)
+ player_full_flush(conn);
+ }
+
+ previous_seq_no = seq_no;
+
+ // at this point, we can check if we can to flush this packet -- we won't have
+ // to decipher it first
+ // debug(1,"seq_no %u, timestamp %u", seq_no, timestamp);
+
+ if ((flush_requested) && (seq_no < flushUntilSeq)) {
+ last_block_flushed_seq_no = seq_no;
+ last_block_flushed_rtp_time = timestamp;
+ // debug(1, "flushing block %u, rtptime %u up to block %u, rtptime %u.", seq_no,
+ // timestamp, flushUntilSeq, flushUntilTS);
+ }
+
+ if (((flush_requested) && (seq_no == flushUntilSeq)) || (new_buffer_needed)) {
+
+ if ((flush_requested) && (seq_no == flushUntilSeq)) {
+ int32_t timestamp_difference = timestamp - flushUntilTS;
+ debug(1,
+ "end of flush reached. flushUntilSeq = %u, flushUntilTS = %u, last block flushed "
+ "seq_no = %u, last block flushed rtptime = %u, next block seq_no = %u, next "
+ "block timestamp = "
+ "%u, timestamp difference: %d, buffer_size: %u.",
+ flushUntilSeq, flushUntilTS, last_block_flushed_seq_no,
+ last_block_flushed_rtp_time, seq_no, timestamp, timestamp_difference,
+ buffered_audio->buffer_occupancy);
+ debug(2,
+ "pcm_buffer_read_point_rtptime = %u, pcm_buffer_occupancy = %u, "
+ "pcm_buffer_read_point = %u",
+ pcm_buffer_read_point_rtptime, pcm_buffer_occupancy, pcm_buffer_read_point);
+ }
+
+ // if we are here because of a flush request, it must be the case that
+ // flushing the pcm buffer wasn't enough, as the request would have been turned off by now
+ // so we better indicate that the pcm buffer is empty and its contents invalid
+ if (flush_requested) {
+ pcm_buffer_occupancy = 0;
+ }
+
+ // decode the block and add it to or put it in the pcm buffer
+
+ if (pcm_buffer_occupancy == 0) {
+ // they should match and the read point should be zero
+ if ((blocks_read != 0) && (pcm_buffer_read_point_rtptime != timestamp)) {
+ debug(2, "set pcm_buffer_read_point_rtptime from %u to %u.",
+ pcm_buffer_read_point_rtptime, timestamp);
+ pcm_buffer_read_point_rtptime = timestamp;
+ pcm_buffer_read_point = 0;
+ }
+ }
+
+ unsigned char nonce[12];
+ memset(nonce, 0, sizeof(nonce));
+ memcpy(nonce + 4, packet + nread - 8,
+ 8); // front-pad the 8-byte nonce received to get the 12-byte nonce expected
+
+ // https://libsodium.gitbook.io/doc/secret-key_cryptography/aead/chacha20-poly1305/ietf_chacha20-poly1305_construction
+ // Note: the eight-byte nonce must be front-padded out to 12 bytes.
+ unsigned long long new_payload_length = 0;
+ int response = crypto_aead_chacha20poly1305_ietf_decrypt(
+ m + 7, // m
+ &new_payload_length, // mlen_p
+ NULL, // nsec,
+ packet + 12, // the ciphertext starts 12 bytes in and is followed by the MAC tag,
+ nread - (8 + 12), // clen -- the last 8 bytes are the nonce
+ packet + 4, // authenticated additional data
+ 8, // authenticated additional data length
+ nonce,
+ conn->session_key); // *k
+ if (response != 0) {
+ debug(1, "Error decrypting audio packet %u -- packet length %d.", seq_no, nread);
+ } else {
+ // now pass it in to the regular processing chain
+
+ unsigned long long max_int = INT_MAX; // put in the right format
+ if (new_payload_length > max_int)
+ debug(1, "Madly long payload length!");
+ int payload_length = new_payload_length; // change from long long to int
+ int aac_packet_length = payload_length + 7;
+
+ // now, fill in the 7-byte ADTS information, which seems to be needed by the decoder
+ // we made room for it in the front of the buffer
+
+ addADTStoPacket(m, aac_packet_length);
+
+ // now we are ready to send this to the decoder
+
+ data_to_process = m;
+ data_remaining = aac_packet_length;
+ int ret = 0;
+ // there can be more than one av packet (? terminology) in a block
+ while (data_remaining > 0) {
+ if (decoded_frame == NULL) {
+ decoded_frame = av_frame_alloc();
+ if (decoded_frame == NULL)
+ debug(1, "could not allocate av_frame");
+ } else {
+ ret = av_parser_parse2(codec_parser_context, codec_context, &pkt->data, &pkt->size,
+ data_to_process, data_remaining, AV_NOPTS_VALUE,
+ AV_NOPTS_VALUE, 0);
+ if (ret < 0) {
+ debug(1, "error while parsing deciphered audio packet.");
+ } else {
+ data_to_process += ret;
+ data_remaining -= ret;
+ // debug(1, "frame found");
+ // now pass each packet to be decoded
+ if (pkt->size) {
+ ret = avcodec_send_packet(codec_context, pkt);
+ if (ret < 0) {
+ debug(1, "error sending packet to decoder");
+ } else {
+ while (ret >= 0) {
+ ret = avcodec_receive_frame(codec_context, decoded_frame);
+ if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
+ break;
+ else if (ret < 0) {
+ debug(1, "error %d during decoding", ret);
+ } else {
+ av_samples_alloc(&pcm_audio, &dst_linesize, codec_context->channels,
+ decoded_frame->nb_samples, AV_SAMPLE_FMT_S16, 1);
+ // remember to free pcm_audio
+ ret = swr_convert(swr, &pcm_audio, decoded_frame->nb_samples,
+ (const uint8_t **)decoded_frame->extended_data,
+ decoded_frame->nb_samples);
+ dst_bufsize = av_samples_get_buffer_size(
+ &dst_linesize, codec_context->channels, ret, AV_SAMPLE_FMT_S16, 1);
+ // debug(1,"generated %d bytes of PCM", dst_bufsize);
+ // copy the PCM audio into the PCM buffer.
+ // make sure it's big enough first
+ if ((pcm_buffer_size - pcm_buffer_occupancy) < dst_bufsize) {
+ debug(1,
+ "pcm_buffer_read_point (frames): %u, pcm_buffer_occupancy "
+ "(frames): %u",
+ pcm_buffer_read_point / 4, pcm_buffer_occupancy / 4);
+ pcm_buffer_size = dst_bufsize + pcm_buffer_occupancy;
+ debug(1, "fatal error! pcm buffer too small at %d bytes.",
+ pcm_buffer_size);
+ } else {
+ memcpy(pcm_buffer + pcm_buffer_occupancy, pcm_audio, dst_bufsize);
+ pcm_buffer_occupancy += dst_bufsize;
+ }
+ // debug(1,"decoded %d samples", decoded_frame->nb_samples);
+ // memcpy(sampleBuffer,outputBuffer16,dst_bufsize);
+ av_freep(&pcm_audio);
+ }
+ }
+ }
+ }
+ }
+ if (decoded_frame == NULL)
+ debug(1, "decoded_frame is NULL");
+ if (decoded_frame != NULL)
+ av_frame_free(&decoded_frame);
+ }
+ }
+
+ // revert the state of cancellability
+ }
+ }
+ } else {
+ // nread is 0 -- the port has been closed
+ // finished = 1;
+ debug(1, "buffered audio port closed!");
+ finished = 1;
+ }
+ }
+
+ } while (finished == 0);
+ debug(1, "Buffered Audio Receiver RTP thread \"normal\" exit.");
+ pthread_cleanup_pop(1); // deallocate the swr
+ pthread_cleanup_pop(1); // deallocate the av_packet
+ pthread_cleanup_pop(1); // av_parser_init_cleanup_handler
+ pthread_cleanup_pop(1); // avcodec_open2_cleanup_handler
+ pthread_cleanup_pop(1); // avcodec_alloc_context3_cleanup_handler
+ pthread_cleanup_pop(1); // thread creation
+ pthread_cleanup_pop(1); // buffer malloc
+ pthread_cleanup_pop(1); // not_full_cv
+ pthread_cleanup_pop(1); // not_empty_cv
+ pthread_cleanup_pop(1); // mutex
+ pthread_cleanup_pop(1); // descriptor malloc
+ pthread_cleanup_pop(1); // pthread_t malloc
+ pthread_cleanup_pop(1); // do the cleanup.
+ pthread_exit(NULL);
+}
+
+#endif
+#endif
+