From 8a73d5972c09a2f863dcfceca01ac0efc72876fd Mon Sep 17 00:00:00 2001 From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Thu, 29 Apr 2021 15:04:50 +0100 Subject: [PATCH] rtp meged with lots of AP2 stuff still commented out. But it still works. --- common.c | 145 ++++++ common.h | 6 + player.c | 9 +- player.h | 77 ++- rtp.c | 1439 ++++++++++++++++++++++++++++++++++++++++++++++++------ rtp.h | 8 +- 6 files changed, 1507 insertions(+), 177 deletions(-) diff --git a/common.c b/common.c index 6e0f12ab..0fe331da 100644 --- a/common.c +++ b/common.c @@ -43,6 +43,9 @@ #include #include +#include +#include + #ifdef COMPILE_FOR_OSX #include #include @@ -266,6 +269,148 @@ uint16_t nextFreeUDPPort() { return UDPPortIndex; } +// if port is zero, pick any port +// otherwise, try the given port only +int bind_socket_and_port(int type, int ip_family, const char *self_ip_address, uint32_t scope_id, + uint16_t *port, int *sock) { + int ret = 0; // no error + int local_socket = socket(ip_family, type, 0); + if (local_socket == -1) + ret = errno; + if (ret == 0) { + SOCKADDR myaddr; + memset(&myaddr, 0, sizeof(myaddr)); + if (ip_family == AF_INET) { + struct sockaddr_in *sa = (struct sockaddr_in *)&myaddr; + sa->sin_family = AF_INET; + sa->sin_port = ntohs(*port); + inet_pton(AF_INET, self_ip_address, &(sa->sin_addr)); + ret = bind(local_socket, (struct sockaddr *)sa, sizeof(struct sockaddr_in)); + } +#ifdef AF_INET6 + if (ip_family == AF_INET6) { + struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&myaddr; + sa6->sin6_family = AF_INET6; + sa6->sin6_port = ntohs(*port); + inet_pton(AF_INET6, self_ip_address, &(sa6->sin6_addr)); + sa6->sin6_scope_id = scope_id; + ret = bind(local_socket, (struct sockaddr *)sa6, sizeof(struct sockaddr_in6)); + } +#endif + if (ret < 0) { + ret = errno; + close(local_socket); + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + warn("error %d: \"%s\". Could not bind a port!", errno, errorstring); + } else { + uint16_t sport; + SOCKADDR local; + socklen_t local_len = sizeof(local); + ret = getsockname(local_socket, (struct sockaddr *)&local, &local_len); + if (ret < 0) { + ret = errno; + close(local_socket); + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + warn("error %d: \"%s\". Could not retrieve socket's port!", errno, errorstring); + } else { +#ifdef AF_INET6 + if (local.SAFAMILY == AF_INET6) { + struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&local; + sport = ntohs(sa6->sin6_port); + } else +#endif + { + struct sockaddr_in *sa = (struct sockaddr_in *)&local; + sport = ntohs(sa->sin_port); + } + *sock = local_socket; + *port = sport; + } + } + } + return ret; +} + +uint16_t bind_UDP_port(int ip_family, const char *self_ip_address, uint32_t scope_id, int *sock) { + // look for a port in the range, if any was specified. + int ret = 0; + + int local_socket = socket(ip_family, SOCK_DGRAM, IPPROTO_UDP); + if (local_socket == -1) + die("Could not allocate a socket."); + + /* + int val = 1; + ret = setsockopt(local_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); + if (ret < 0) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "Error %d: \"%s\". Couldn't set SO_REUSEADDR"); + } + */ + + SOCKADDR myaddr; + int tryCount = 0; + uint16_t desired_port; + do { + tryCount++; + desired_port = nextFreeUDPPort(); + memset(&myaddr, 0, sizeof(myaddr)); + if (ip_family == AF_INET) { + struct sockaddr_in *sa = (struct sockaddr_in *)&myaddr; + sa->sin_family = AF_INET; + sa->sin_port = ntohs(desired_port); + inet_pton(AF_INET, self_ip_address, &(sa->sin_addr)); + ret = bind(local_socket, (struct sockaddr *)sa, sizeof(struct sockaddr_in)); + } +#ifdef AF_INET6 + if (ip_family == AF_INET6) { + struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&myaddr; + sa6->sin6_family = AF_INET6; + sa6->sin6_port = ntohs(desired_port); + inet_pton(AF_INET6, self_ip_address, &(sa6->sin6_addr)); + sa6->sin6_scope_id = scope_id; + ret = bind(local_socket, (struct sockaddr *)sa6, sizeof(struct sockaddr_in6)); + } +#endif + + } while ((ret < 0) && (errno == EADDRINUSE) && (desired_port != 0) && + (tryCount < config.udp_port_range)); + + // debug(1,"UDP port chosen: %d.",desired_port); + + if (ret < 0) { + close(local_socket); + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + die("error %d: \"%s\". Could not bind a UDP port! Check the udp_port_range is large enough -- " + "it must be " + "at least 3, and 10 or more is suggested -- or " + "check for restrictive firewall settings or a bad router! UDP base is %u, range is %u and " + "current suggestion is %u.", + errno, errorstring, config.udp_port_base, config.udp_port_range, desired_port); + } + + uint16_t sport; + SOCKADDR local; + socklen_t local_len = sizeof(local); + getsockname(local_socket, (struct sockaddr *)&local, &local_len); +#ifdef AF_INET6 + if (local.SAFAMILY == AF_INET6) { + struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&local; + sport = ntohs(sa6->sin6_port); + } else +#endif + { + struct sockaddr_in *sa = (struct sockaddr_in *)&local; + sport = ntohs(sa->sin_port); + } + *sock = local_socket; + return sport; +} + int get_requested_connection_state_to_output() { return requested_connection_state_to_output; } void set_requested_connection_state_to_output(int v) { requested_connection_state_to_output = v; } diff --git a/common.h b/common.h index 68666ac9..6602985a 100644 --- a/common.h +++ b/common.h @@ -447,4 +447,10 @@ int string_update_with_size(char **str, int *flag, char *s, size_t len); // from https://stackoverflow.com/questions/13663617/memdup-function-in-c, with thanks void *memdup(const void *mem, size_t size); +int bind_socket_and_port(int type, int ip_family, const char *self_ip_address, uint32_t scope_id, + uint16_t *port, int *sock); + +uint16_t bind_UDP_port(int ip_family, const char *self_ip_address, uint32_t scope_id, int *sock); + + #endif // _COMMON_H diff --git a/player.c b/player.c index 7c0fd007..ff889408 100644 --- a/player.c +++ b/player.c @@ -117,13 +117,6 @@ uint32_t modulo_32_offset(uint32_t from, uint32_t to) { return UINT32_MAX - from + to + 1; } -uint64_t modulo_64_offset(uint64_t from, uint64_t to) { - if (from <= to) - return to - from; - else - return UINT64_MAX - from + to + 1; -} - void do_flush(uint32_t timestamp, rtsp_conn_info *conn); static void ab_resync(rtsp_conn_info *conn) { @@ -1589,7 +1582,7 @@ void player_thread_cleanup_handler(void *arg) { if (conn->stream.type == ast_apple_lossless) terminate_decoders(conn); - clear_reference_timestamp(conn); + reset_anchor_info(conn); conn->rtp_running = 0; pthread_setcancelstate(oldState, NULL); } diff --git a/player.h b/player.h index 105ce869..e8942710 100644 --- a/player.h +++ b/player.h @@ -21,6 +21,11 @@ #include #endif +#ifdef CONFIG_AIRPLAY_2 +#include "pair_ap/pair-internal.h" +#include "plist/plist.h" +#endif + #include "alac.h" #include "audio.h" @@ -76,6 +81,11 @@ typedef enum { ast_apple_lossless, } audio_stream_type; +typedef enum { + ts_ntp, + ts_ptp +} timing_source_type; + typedef struct { int encrypted; uint8_t aesiv[16], aeskey[16]; @@ -83,8 +93,20 @@ typedef struct { audio_stream_type type; } stream_cfg; +#ifdef CONFIG_AIRPLAY_2 +typedef struct file_cipher_context { + struct pair_cipher_context *cipher_context; + int active; // can be created during a pair setup but not activated until next read + int fd; + void *input_plaintext_buffer; + void *input_plaintext_buffer_toq; + size_t input_plaintext_buffer_bytes_occupied; +} file_cipher_context; +#endif + typedef struct { int connection_number; // for debug ID purposes, nothing else... + timing_source_type type_of_timing; // are we using NTP or PTP? int resend_interval; // this is really just for debugging char *UserAgent; // free this on teardown int AirPlayVersion; // zero if not an AirPlay session. Used to help calculate latency @@ -94,7 +116,7 @@ typedef struct { uint32_t maximum_latency; // set if an a=max-latency: line appears in the ANNOUNCE message; zero // otherwise int software_mute_enabled; // if we don't have a real mute that we can use - int unachievable_audio_backend_latency_offset_notified; // set when a latency warning is issued + int fd; int authorized; // set if a password is required and has been supplied @@ -214,8 +236,56 @@ typedef struct { // this is what connects an rtp timestamp to the remote time - uint32_t reference_timestamp; - uint64_t remote_reference_timestamp_time; + int anchor_remote_info_is_valid; + uint64_t anchor_clock; + uint64_t anchor_time; // this is the time according to the clock + uint32_t anchor_rtptime; + + +#ifdef CONFIG_AIRPLAY_2 + pthread_t rtp_event_thread; + pthread_t rtp_ap2_control_thread; + pthread_t rtp_realtime_audio_thread; + pthread_t rtp_buffered_audio_thread; + + int pairing_mode; + file_cipher_context control_cipher_context; + struct verifier_setup_context *server_setup_ctx; + + int last_anchor_info_is_valid; + uint64_t last_anchor_clock_offset; + uint64_t last_anchor_time_of_update; + uint64_t last_anchor_clock; + + + ssize_t ap2_audio_buffer_size; + int ap2_flush_requested; + uint32_t ap2_flush_rtp_timestamp; + uint32_t ap2_flush_sequence_number; + int ap2_rate; // protect with flush mutex, 0 means don't play, 1 means play + int ap2_play_enabled; // protect with flush mutex + + int event_socket; + SOCKADDR ap2_remote_control_socket_addr; // a socket pointing to the control port of the client + socklen_t ap2_remote_control_socket_addr_length; + int ap2_control_socket; + int realtime_audio_socket; + int buffered_audio_socket; + + uint16_t local_event_port; + uint16_t local_ap2_control_port; + uint16_t local_realtime_audio_port; + uint16_t local_buffered_audio_port; + + plist_t client_setup_plist; + uint64_t audio_format; + uint64_t compression; + unsigned char *session_key; // needs to be free'd at the end + uint64_t frames_packet; + uint64_t type; + uint64_t networkTimeTimelineID; // the clock ID used by the player + +#endif // used as the initials values for calculating the rate at which the source thinks it's sending // frames @@ -272,7 +342,6 @@ typedef struct { } rtsp_conn_info; uint32_t modulo_32_offset(uint32_t from, uint32_t to); -uint64_t modulo_64_offset(uint64_t from, uint64_t to); int player_play(rtsp_conn_info *conn); int player_stop(rtsp_conn_info *conn); diff --git a/rtp.c b/rtp.c index 87fd25dd..b96a344d 100644 --- a/rtp.c +++ b/rtp.c @@ -44,6 +44,15 @@ #include #include #include +#ifdef CONFIG_AIRPLAY_2 +#include "ptp-utilities.h" +#include +#include +#include +#include +#include +#endif + struct Nvll { char *name; @@ -56,6 +65,20 @@ typedef struct Nvll nvll; uint64_t local_to_remote_time_jitter; uint64_t local_to_remote_time_jitter_count; +typedef struct { + int closed; + int error_code; + int sock_fd; + char *buffer; + char *toq; + char *eoq; + size_t buffer_max_size; + size_t buffer_occupancy; + pthread_mutex_t mutex; + pthread_cond_t not_empty_cv; + pthread_cond_t not_full_cv; +} buffered_tcp_desc; + void rtp_initialise(rtsp_conn_info *conn) { conn->rtp_time_of_last_resend_request_error_ns = 0; conn->rtp_running = 0; @@ -66,7 +89,7 @@ void rtp_initialise(rtsp_conn_info *conn) { } void rtp_terminate(rtsp_conn_info *conn) { - conn->reference_timestamp = 0; + conn->anchor_rtptime = 0; // destroy the timer mutex int rc = pthread_mutex_destroy(&conn->reference_time_mutex); if (rc) @@ -199,7 +222,8 @@ void *rtp_audio_receiver(void *arg) { if (plen >= 16) { if ((config.diagnostic_drop_packet_fraction == 0.0) || (drand48() > config.diagnostic_drop_packet_fraction)) - player_put_packet(seqno, actual_timestamp, pktp, plen, conn); + player_put_packet(seqno, actual_timestamp, pktp, plen, + conn); // the '1' means is original format else debug(3, "Dropping audio packet %u to simulate a bad connection.", seqno); continue; @@ -213,7 +237,9 @@ void *rtp_audio_receiver(void *arg) { } warn("Audio receiver -- Unknown RTP packet of type 0x%02X length %d.", type, nread); } else { - debug(1, "Error receiving an audio packet."); + char em[1024]; + strerror_r(errno, em, sizeof(em)); + debug(1, "Error %d receiving an audio packet: \"%s\".", errno, em); } } @@ -229,14 +255,14 @@ void *rtp_audio_receiver(void *arg) { } void rtp_control_handler_cleanup_handler(__attribute__((unused)) void *arg) { - debug(3, "Control Receiver Cleanup Done."); + debug(1, "Control Receiver Cleanup Done."); } void *rtp_control_receiver(void *arg) { pthread_cleanup_push(rtp_control_handler_cleanup_handler, arg); rtsp_conn_info *conn = (rtsp_conn_info *)arg; - conn->reference_timestamp = 0; // nothing valid received yet + conn->anchor_rtptime = 0; // nothing valid received yet uint8_t packet[2048], *pktp; // struct timespec tn; uint64_t remote_time_of_sync; @@ -397,11 +423,11 @@ void *rtp_control_receiver(void *arg) { } } else { uint64_t remote_frame_time_interval = - conn->remote_reference_timestamp_time - + conn->anchor_time - conn->initial_reference_time; // here, this should never be zero if (remote_frame_time_interval) { conn->remote_frame_rate = - (1.0E9 * (conn->reference_timestamp - conn->initial_reference_timestamp)) / + (1.0E9 * (conn->anchor_rtptime - conn->initial_reference_timestamp)) / remote_frame_time_interval; } else { conn->remote_frame_rate = 0.0; // use as a flag. @@ -409,14 +435,14 @@ void *rtp_control_receiver(void *arg) { } // this is for debugging - uint64_t old_remote_reference_time = conn->remote_reference_timestamp_time; - uint32_t old_reference_timestamp = conn->reference_timestamp; + uint64_t old_remote_reference_time = conn->anchor_time; + uint32_t old_reference_timestamp = conn->anchor_rtptime; // int64_t old_latency_delayed_timestamp = conn->latency_delayed_timestamp; - conn->remote_reference_timestamp_time = remote_time_of_sync; + conn->anchor_time = remote_time_of_sync; // conn->reference_timestamp_time = // remote_time_of_sync - local_to_remote_time_difference_now(conn); - conn->reference_timestamp = sync_rtp_timestamp; + conn->anchor_rtptime = sync_rtp_timestamp; conn->latency_delayed_timestamp = rtp_timestamp_less_latency; debug_mutex_unlock(&conn->reference_time_mutex, 0); @@ -443,7 +469,8 @@ void *rtp_control_receiver(void *arg) { // check if packet contains enough content to be reasonable if (plen >= 16) { - player_put_packet(seqno, actual_timestamp, pktp, plen, conn); + player_put_packet(seqno, actual_timestamp, pktp, plen, + conn); // the '1' means is original format continue; } else { debug(3, "Too-short retransmitted audio packet received in control port, ignored."); @@ -455,7 +482,10 @@ void *rtp_control_receiver(void *arg) { debug(3, "Control Receiver -- dropping a packet to simulate a bad network."); } } else { - debug(1, "Control Receiver -- error receiving a packet."); + + char em[1024]; + strerror_r(errno, em, sizeof(em)); + debug(1, "Control Receiver -- error %d receiving a packet: \"%s\".", errno, em); } } debug(1, "Control RTP thread \"normal\" exit -- this can't happen. Hah!"); @@ -849,85 +879,6 @@ void *rtp_timing_receiver(void *arg) { pthread_exit(NULL); } -static uint16_t bind_port(int ip_family, const char *self_ip_address, uint32_t scope_id, - int *sock) { - // look for a port in the range, if any was specified. - int ret = 0; - - int local_socket = socket(ip_family, SOCK_DGRAM, IPPROTO_UDP); - if (local_socket == -1) - die("Could not allocate a socket."); - - /* - int val = 1; - ret = setsockopt(local_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); - if (ret < 0) { - char errorstring[1024]; - strerror_r(errno, (char *)errorstring, sizeof(errorstring)); - debug(1, "Error %d: \"%s\". Couldn't set SO_REUSEADDR"); - } - */ - - SOCKADDR myaddr; - int tryCount = 0; - uint16_t desired_port; - do { - tryCount++; - desired_port = nextFreeUDPPort(); - memset(&myaddr, 0, sizeof(myaddr)); - if (ip_family == AF_INET) { - struct sockaddr_in *sa = (struct sockaddr_in *)&myaddr; - sa->sin_family = AF_INET; - sa->sin_port = ntohs(desired_port); - inet_pton(AF_INET, self_ip_address, &(sa->sin_addr)); - ret = bind(local_socket, (struct sockaddr *)sa, sizeof(struct sockaddr_in)); - } -#ifdef AF_INET6 - if (ip_family == AF_INET6) { - struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&myaddr; - sa6->sin6_family = AF_INET6; - sa6->sin6_port = ntohs(desired_port); - inet_pton(AF_INET6, self_ip_address, &(sa6->sin6_addr)); - sa6->sin6_scope_id = scope_id; - ret = bind(local_socket, (struct sockaddr *)sa6, sizeof(struct sockaddr_in6)); - } -#endif - - } while ((ret < 0) && (errno == EADDRINUSE) && (desired_port != 0) && - (tryCount < config.udp_port_range)); - - // debug(1,"UDP port chosen: %d.",desired_port); - - if (ret < 0) { - close(local_socket); - char errorstring[1024]; - strerror_r(errno, (char *)errorstring, sizeof(errorstring)); - die("error %d: \"%s\". Could not bind a UDP port! Check the udp_port_range is large enough -- " - "it must be " - "at least 3, and 10 or more is suggested -- or " - "check for restrictive firewall settings or a bad router! UDP base is %u, range is %u and " - "current suggestion is %u.", - errno, errorstring, config.udp_port_base, config.udp_port_range, desired_port); - } - - uint16_t sport; - SOCKADDR local; - socklen_t local_len = sizeof(local); - getsockname(local_socket, (struct sockaddr *)&local, &local_len); -#ifdef AF_INET6 - if (local.SAFAMILY == AF_INET6) { - struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&local; - sport = ntohs(sa6->sin6_port); - } else -#endif - { - struct sockaddr_in *sa = (struct sockaddr_in *)&local; - sport = ntohs(sa->sin_port); - } - *sock = local_socket; - return sport; -} - void rtp_setup(SOCKADDR *local, SOCKADDR *remote, uint16_t cport, uint16_t tport, rtsp_conn_info *conn) { @@ -1027,17 +978,17 @@ void rtp_setup(SOCKADDR *local, SOCKADDR *remote, uint16_t cport, uint16_t tport conn->remote_control_port = cport; conn->remote_timing_port = tport; - conn->local_control_port = bind_port(conn->connection_ip_family, conn->self_ip_string, + conn->local_control_port = bind_UDP_port(conn->connection_ip_family, conn->self_ip_string, conn->self_scope_id, &conn->control_socket); - conn->local_timing_port = bind_port(conn->connection_ip_family, conn->self_ip_string, + conn->local_timing_port = bind_UDP_port(conn->connection_ip_family, conn->self_ip_string, conn->self_scope_id, &conn->timing_socket); - conn->local_audio_port = bind_port(conn->connection_ip_family, conn->self_ip_string, + conn->local_audio_port = bind_UDP_port(conn->connection_ip_family, conn->self_ip_string, conn->self_scope_id, &conn->audio_socket); debug(3, "listening for audio, control and timing on ports %d, %d, %d.", conn->local_audio_port, conn->local_control_port, conn->local_timing_port); - conn->reference_timestamp = 0; + conn->anchor_rtptime = 0; conn->request_sent = 0; conn->rtp_running = 1; @@ -1049,31 +1000,28 @@ void rtp_setup(SOCKADDR *local, SOCKADDR *remote, uint16_t cport, uint16_t tport } } -void get_reference_timestamp_stuff(uint32_t *timestamp, uint64_t *timestamp_time, - uint64_t *remote_timestamp_time, rtsp_conn_info *conn) { - // types okay - debug_mutex_lock(&conn->reference_time_mutex, 1000, 0); - *timestamp = conn->reference_timestamp; - *remote_timestamp_time = conn->remote_reference_timestamp_time; - *timestamp_time = - conn->remote_reference_timestamp_time - local_to_remote_time_difference_now(conn); - debug_mutex_unlock(&conn->reference_time_mutex, 0); -} - -void clear_reference_timestamp(rtsp_conn_info *conn) { +void reset_ntp_anchor_info(rtsp_conn_info *conn) { debug_mutex_lock(&conn->reference_time_mutex, 1000, 1); - conn->reference_timestamp = 0; - conn->remote_reference_timestamp_time = 0; + conn->anchor_rtptime = 0; + conn->anchor_time = 0; debug_mutex_unlock(&conn->reference_time_mutex, 3); } -int have_timestamp_timing_information(rtsp_conn_info *conn) { - if (conn->reference_timestamp == 0) +void reset_anchor_info(rtsp_conn_info *conn) { + reset_ntp_anchor_info(conn); +} + +int have_ntp_timestamp_timing_information(rtsp_conn_info *conn) { + if (conn->anchor_rtptime == 0) return 0; else return 1; } +int have_timestamp_timing_information(rtsp_conn_info *conn) { + return have_ntp_timestamp_timing_information(conn); +} + // set this to zero to use the rates supplied by the sources, which might not always be completely // right... const int use_nominal_rate = 0; // specify whether to use the nominal input rate, usually 44100 fps @@ -1086,8 +1034,8 @@ int sanitised_source_rate_information(uint32_t *frames, uint64_t *time, rtsp_con if ((conn->initial_reference_time) && (conn->initial_reference_timestamp)) { // uint32_t local_frames = conn->reference_timestamp - conn->initial_reference_timestamp; uint32_t local_frames = - modulo_32_offset(conn->initial_reference_timestamp, conn->reference_timestamp); - uint64_t local_time = conn->remote_reference_timestamp_time - conn->initial_reference_time; + modulo_32_offset(conn->initial_reference_timestamp, conn->anchor_rtptime); + uint64_t local_time = conn->anchor_time - conn->initial_reference_time; if ((local_frames == 0) || (local_time == 0) || (use_nominal_rate)) { result = 1; } else { @@ -1110,87 +1058,198 @@ int sanitised_source_rate_information(uint32_t *frames, uint64_t *time, rtsp_con return result; } +void set_ntp_anchor_info(rtsp_conn_info *conn, uint32_t rtptime, + uint64_t networktime) { + conn->anchor_remote_info_is_valid = 1; + conn->anchor_rtptime = rtptime; + conn->anchor_time = networktime; +} + // the timestamp is a timestamp calculated at the input rate // the reference timestamps are denominated in terms of the input rate -int frame_to_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) { +int frame_to_ntp_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) { debug_mutex_lock(&conn->reference_time_mutex, 1000, 0); int result = 0; uint64_t time_difference; uint32_t frame_difference; result = sanitised_source_rate_information(&frame_difference, &time_difference, conn); - uint64_t timestamp_interval_time; uint64_t remote_time_of_timestamp; - uint32_t timestamp_interval = modulo_32_offset(conn->reference_timestamp, timestamp); - if (timestamp_interval <= - conn->input_rate * 3600) { // i.e. timestamp was really after the reference timestamp - timestamp_interval_time = (timestamp_interval * time_difference) / + int32_t timestamp_interval = timestamp - conn->anchor_rtptime; + int64_t timestamp_interval_time = timestamp_interval; + timestamp_interval_time = timestamp_interval_time * time_difference; + timestamp_interval_time = timestamp_interval_time / frame_difference; // this is the nominal time, based on the // fps specified between current and // previous sync frame. - remote_time_of_timestamp = conn->remote_reference_timestamp_time + + remote_time_of_timestamp = conn->anchor_time + timestamp_interval_time; // based on the reference timestamp time // plus the time interval calculated based // on the specified fps. - } else { // i.e. timestamp was actually before the reference timestamp - timestamp_interval = - modulo_32_offset(timestamp, conn->reference_timestamp); // fix the calculation - timestamp_interval_time = (timestamp_interval * time_difference) / - frame_difference; // this is the nominal time, based on the - // fps specified between current and - // previous sync frame. - remote_time_of_timestamp = conn->remote_reference_timestamp_time - - timestamp_interval_time; // based on the reference timestamp time - // plus the time interval calculated based - // on the specified fps. - } *time = remote_time_of_timestamp - local_to_remote_time_difference_now(conn); debug_mutex_unlock(&conn->reference_time_mutex, 0); return result; } -int local_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn) { +int local_ntp_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn) { debug_mutex_lock(&conn->reference_time_mutex, 1000, 0); int result = 0; - uint64_t time_difference; uint32_t frame_difference; result = sanitised_source_rate_information(&frame_difference, &time_difference, conn); - // first, get from [local] time to remote time. uint64_t remote_time = time + local_to_remote_time_difference_now(conn); // next, get the remote time interval from the remote_time to the reference time - uint64_t time_interval; - // here, we calculate the time interval, in terms of remote time - uint64_t offset = modulo_64_offset(conn->remote_reference_timestamp_time, remote_time); - int reference_time_was_earlier = (offset <= (uint64_t)3600000000000); - if (reference_time_was_earlier) // if we haven't had a reference within the last hour, it'll be - // taken as afterwards - time_interval = remote_time - conn->remote_reference_timestamp_time; - else - time_interval = conn->remote_reference_timestamp_time - remote_time; - + int64_t offset = remote_time - conn->anchor_time; // now, convert the remote time interval into frames using the frame rate we have observed or // which has been nominated - uint32_t frame_interval = 0; - if (time_difference) - frame_interval = (time_interval * frame_difference) / time_difference; + int64_t frame_interval = 0; + frame_interval = (offset * frame_difference) / time_difference; + int32_t frame_interval_32 = frame_interval; + uint32_t new_frame = conn->anchor_rtptime + frame_interval_32; + *frame = new_frame; + debug_mutex_unlock(&conn->reference_time_mutex, 0); + return result; +} + +void set_ptp_anchor_info(rtsp_conn_info *conn, uint64_t clock_id, uint32_t rtptime, + uint64_t networktime) { + if (conn->anchor_clock != clock_id) + debug(1, "Connection %d: Set Anchor Clock: %" PRIx64 ".", conn->connection_number, clock_id); + conn->anchor_remote_info_is_valid = 1; + conn->anchor_rtptime = rtptime; + conn->anchor_time = networktime; + conn->anchor_clock = clock_id; +} + +void reset_ptp_anchor_info(rtsp_conn_info *conn) { + debug(2, "Connection %d: Clear anchor information.", conn->connection_number); + conn->last_anchor_info_is_valid = 0; + conn->anchor_remote_info_is_valid = 0; +} + +int get_ptp_anchor_local_time_info(rtsp_conn_info *conn, uint32_t *anchorRTP, + uint64_t *anchorLocalTime) { + int response = 0; + uint64_t actual_clock_id, actual_offset; + + int ptp_status = ptp_get_clock_info(&actual_clock_id, &actual_offset); + + if (ptp_status == 0) { + if (conn->anchor_remote_info_is_valid != 0) { + if (actual_clock_id == conn->anchor_clock) { + conn->last_anchor_clock_offset = actual_offset; + conn->last_anchor_time_of_update = get_absolute_time_in_ns(); + conn->last_anchor_info_is_valid = 1; + if (anchorRTP != NULL) + *anchorRTP = conn->anchor_rtptime; + if (anchorLocalTime != NULL) + *anchorLocalTime = conn->anchor_time - conn->last_anchor_clock_offset; + } else { + if (conn->last_anchor_info_is_valid != 0) { + int64_t time_since_last_update = + get_absolute_time_in_ns() - conn->last_anchor_time_of_update; + if (time_since_last_update > 5000000000) { + debug(1, "change master clock to %" PRIx64 ".", actual_clock_id); + uint64_t new_anchor_time = conn->anchor_time; + new_anchor_time = new_anchor_time - conn->last_anchor_clock_offset; // to local + new_anchor_time = new_anchor_time + actual_offset; // to the next clock + conn->anchor_time = new_anchor_time; + conn->anchor_clock = actual_clock_id; + if (anchorRTP != NULL) + *anchorRTP = conn->anchor_rtptime; + if (anchorLocalTime != NULL) + *anchorLocalTime = conn->anchor_time - actual_offset; + } else { + if (anchorRTP != NULL) + *anchorRTP = conn->anchor_rtptime; + if (anchorLocalTime != NULL) + *anchorLocalTime = conn->anchor_time - conn->last_anchor_clock_offset; + } + } else { + debug(2, "no clock and no old anchor times"); + response = -1; + } + } + } else { + debug(2, "don't have anchor_remote_time_info"); + response = -1; + } + } else if (ptp_status == -1) { + debug(3, "don't have the ptp clock interface"); + response = -1; + } else { + debug(3, "ptp clock not valid"); + response = -1; + } + return response; +} + +int get_anchor_local_time_info(rtsp_conn_info *conn, uint32_t *anchorRTP, + uint64_t *anchorLocalTime) { + return get_ptp_anchor_local_time_info(conn, anchorRTP,anchorLocalTime); +} + +int have_ptp_timing_information(rtsp_conn_info *conn) { + if (get_anchor_local_time_info(conn, NULL, NULL) == 0) + return 1; else - debug(1, "local_time_to_frame: time_difference is zero"); - if (reference_time_was_earlier) { - // debug(1,"Frame interval is %" PRId64 " frames.",frame_interval); - *frame = (conn->reference_timestamp + frame_interval); + return 0; +} + +int have_timing_information(rtsp_conn_info *conn) { + return have_ptp_timing_information(conn); +} + +int frame_to_ptp_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) { + int result = -1; + uint32_t anchor_rtptime; + uint64_t anchor_local_time; + if (get_anchor_local_time_info(conn, &anchor_rtptime, &anchor_local_time) == 0) { + int32_t frame_difference = timestamp - anchor_rtptime; + int64_t time_difference = frame_difference; + time_difference = time_difference * 1000000000; + time_difference = time_difference / conn->input_rate; + uint64_t ltime = anchor_local_time + time_difference; + *time = ltime; + result = 0; } else { - // debug(1,"Frame interval is %" PRId64 " frames.",-frame_interval); - *frame = (conn->reference_timestamp - frame_interval); + debug(3, "frame_to_local_time can't get anchor local time information"); + } + return result; +} + +int frame_to_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn) { + return frame_to_ntp_local_time(timestamp, time, conn); +} + +int local_ptp_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn) { + int result = -1; + uint32_t anchor_rtptime; + uint64_t anchor_local_time; + if (get_anchor_local_time_info(conn, &anchor_rtptime, &anchor_local_time) == 0) { + int64_t time_difference = time - anchor_local_time; + int64_t frame_difference = time_difference; + frame_difference = frame_difference * conn->input_rate; // but this is by 10^9 + frame_difference = frame_difference / 1000000000; + int32_t fd32 = frame_difference; + uint32_t lframe = anchor_rtptime + fd32; + *frame = lframe; + result = 0; + } else { + debug(3, "local_time_to_frame can't get anchor local time information"); } - debug_mutex_unlock(&conn->reference_time_mutex, 0); return result; } +int local_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn) { + return local_ntp_time_to_frame(time, frame, conn); +} + void rtp_request_resend(seq_t first, uint32_t count, rtsp_conn_info *conn) { + // debug(1, "rtp_request_resend of %u packets from sequence number %u.", count, first); if (conn->rtp_running) { // if (!request_sent) { // debug(2, "requesting resend of %d packets starting at %u.", count, first); @@ -1199,15 +1258,25 @@ void rtp_request_resend(seq_t first, uint32_t count, rtsp_conn_info *conn) { char req[8]; // *not* a standard RTCP NACK req[0] = 0x80; - req[1] = (char)0x55 | (char)0x80; // Apple 'resend' +#ifdef CONFIG_AIRPLAY_2 + if (conn->ap2_remote_control_socket_addr_length == 0) { + debug(2, "No remote socket -- skipping the resend"); + return; // hack + } + req[1] = 0xD5; // Airplay 2 'resend' +#else + req[1] = (char)0x55 | (char)0x80; // Apple 'resend' +#endif *(unsigned short *)(req + 2) = htons(1); // our sequence number *(unsigned short *)(req + 4) = htons(first); // missed seqnum *(unsigned short *)(req + 6) = htons(count); // count +#ifndef CONFIG_AIRPLAY_2 socklen_t msgsize = sizeof(struct sockaddr_in); #ifdef AF_INET6 if (conn->rtp_client_control_socket.SAFAMILY == AF_INET6) { msgsize = sizeof(struct sockaddr_in6); } +#endif #endif uint64_t time_of_sending_ns = get_absolute_time_in_ns(); uint64_t resend_error_backoff_time = 300000000; // 0.3 seconds @@ -1221,13 +1290,21 @@ void rtp_request_resend(seq_t first, uint32_t count, rtsp_conn_info *conn) { struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 100000; - +#ifdef CONFIG_AIRPLAY_2 + if (setsockopt(conn->ap2_control_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, + sizeof(timeout)) < 0) + debug(1, "Can't set timeout on resend request socket."); + if (sendto(conn->ap2_control_socket, req, sizeof(req), 0, + (struct sockaddr *)&conn->ap2_remote_control_socket_addr, + conn->ap2_remote_control_socket_addr_length) == -1) { +#else if (setsockopt(conn->control_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) debug(1, "Can't set timeout on resend request socket."); if (sendto(conn->control_socket, req, sizeof(req), 0, (struct sockaddr *)&conn->rtp_client_control_socket, msgsize) == -1) { +#endif char em[1024]; strerror_r(errno, em, sizeof(em)); debug(2, "Error %d using sendto to request a resend: \"%s\".", errno, em); @@ -1252,3 +1329,1049 @@ void rtp_request_resend(seq_t first, uint32_t count, rtsp_conn_info *conn) { //} } } + +#if 0 +#ifdef CONFIG_AIRPLAY_2 + +void rtp_event_receiver_cleanup_handler(void *arg) { + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + debug(2, "Connection %d: Event Receiver Cleanup.", conn->connection_number); +} + +void *rtp_event_receiver(void *arg) { + debug(2, "Event Receiver started"); + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + pthread_cleanup_push(rtp_event_receiver_cleanup_handler, arg); + + listen(conn->event_socket, 5); + + uint8_t packet[4096]; + ssize_t nread; + SOCKADDR remote_addr; + memset(&remote_addr, 0, sizeof(remote_addr)); + socklen_t addr_size = sizeof(remote_addr); + + int fd = accept(conn->event_socket, (struct sockaddr *)&remote_addr, &addr_size); + pthread_cleanup_push(socket_cleanup, (void *)fd); + int finished = 0; + do { + nread = recv(fd, packet, sizeof(packet), 0); + + if (nread < 0) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "error in rtp_event_receiver %d: \"%s\". Could not recv a packet.", errno, + errorstring); + // if ((config.diagnostic_drop_packet_fraction == 0.0) || + // (drand48() > config.diagnostic_drop_packet_fraction)) { + } else if (nread > 0) { + + // ssize_t plen = nread; + debug(1, "Packet Received on Event Port."); + if (packet[1] == 0xD7) { + debug(1, "Event Receiver -- Time Announce RTP packet of type 0x%02X length %d received.", + packet[1], nread); + } else { + debug(1, "Event Receiver -- Unknown RTP packet of type 0x%02X length %d received.", + packet[1], nread); + } + // } else { + // debug(3, "Event Receiver Thread -- dropping incoming packet to simulate a bad network."); + // } + } else { + finished = 1; + } + } while (finished == 0); + debug(1, "Event Receiver RTP thread \"normal\" exit."); + pthread_cleanup_pop(1); // close the socket + + pthread_cleanup_pop(1); // do the cleanup + debug(1, "Connection %d: Event Receiver RTP thread exit.", conn->connection_number); + pthread_exit(NULL); +} + +void rtp_ap2_control_handler_cleanup_handler(void *arg) { + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + debug(2, "Connection %d: AP2 Control Receiver Cleanup.", conn->connection_number); +} + +int32_t decipher_player_put_packet(uint8_t *ciphered_audio_alt, ssize_t nread, + rtsp_conn_info *conn) { + + // this deciphers the packet -- it doesn't decode it from ALAC + uint16_t sequence_number = 0; + + // if the packet is too small, don't go ahead. + // it must contain an uint16_t sequence number and eight bytes of AAD followed by the + // ciphertext and then followed by an eight-byte nonce. Thus it must be greater than 18 + if (nread > 18) { + + memcpy(&sequence_number, ciphered_audio_alt, sizeof(uint16_t)); + sequence_number = ntohs(sequence_number); + + uint32_t timestamp; + memcpy(×tamp, ciphered_audio_alt + sizeof(uint16_t), sizeof(uint32_t)); + timestamp = ntohl(timestamp); + + /* + uint32_t ssrc; + memcpy(&ssrc, packet+8, sizeof(uint32_t)); + ssrc = ntohl(ssrc); + */ + + // debug(1, "Realtime Audio Receiver Packet received. Version: %u, Padding: %u, Extension: + // %u, Csrc Count: %u, Marker: %u, Payload Type: %u, Sequence Number: %u, Timestamp: %u, + // SSRC: %u.", version, padding, extension, csrc_count, marker, payload_type, + // sequence_number, timestamp, ssrc); + + unsigned char nonce[12]; + memset(nonce, 0, sizeof(nonce)); + memcpy(nonce + 4, ciphered_audio_alt + nread - 8, + 8); // front-pad the 8-byte nonce received to get the 12-byte nonce expected + + // https://libsodium.gitbook.io/doc/secret-key_cryptography/aead/chacha20-poly1305/ietf_chacha20-poly1305_construction + // Note: the eight-byte nonce must be front-padded out to 12 bytes. + + unsigned char m[4096]; + unsigned long long new_payload_length = 0; + int response = crypto_aead_chacha20poly1305_ietf_decrypt( + m, // m + &new_payload_length, // mlen_p + NULL, // nsec, + ciphered_audio_alt + + 10, // the ciphertext starts 10 bytes in and is followed by the MAC tag, + nread - (8 + 10), // clen -- the last 8 bytes are the nonce + ciphered_audio_alt + 2, // authenticated additional data + 8, // authenticated additional data length + nonce, + conn->session_key); // *k + if (response != 0) { + debug(1, "Error decrypting an audio packet."); + } + // now pass it in to the regular processing chain + + unsigned long long max_int = INT_MAX; // put in the right format + if (new_payload_length > max_int) + debug(1, "Madly long payload length!"); + int plen = new_payload_length; // + player_put_packet(sequence_number, timestamp, m, plen, + conn); // the '1' means is original format + return sequence_number; + } else { + debug(1, "packet was too small -- ignored"); + return -1; + } +} + +void *rtp_ap2_control_receiver(void *arg) { + pthread_cleanup_push(rtp_ap2_control_handler_cleanup_handler, arg); + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + uint8_t packet[4096]; + ssize_t nread; + while (1) { + SOCKADDR from_sock_addr; + socklen_t from_sock_addr_length = sizeof(SOCKADDR); + memset(&from_sock_addr, 0, sizeof(SOCKADDR)); + + nread = recvfrom(conn->ap2_control_socket, packet, sizeof(packet), 0, + (struct sockaddr *)&from_sock_addr, &from_sock_addr_length); + + if (nread >= 0) { + // debug(1,"rtp_ap2_control_receiver coded: %u, %u", packet[0], packet[1]); + + if ((config.diagnostic_drop_packet_fraction == 0.0) || + (drand48() > config.diagnostic_drop_packet_fraction)) { + // store the from_sock_addr if we haven't already done so + // v remember to zero this when you're finished! + if (conn->ap2_remote_control_socket_addr_length == 0) { + memcpy(&conn->ap2_remote_control_socket_addr, &from_sock_addr, from_sock_addr_length); + conn->ap2_remote_control_socket_addr_length = from_sock_addr_length; + } + switch (packet[1]) { + case 215: // code 215, effectively an anchoring announcement + { + // struct timespec tnr; + // clock_gettime(CLOCK_REALTIME, &tnr); + // uint64_t local_realtime_now = timespec_to_ns(&tnr); + + /* + char obf[4096]; + char *obfp = obf; + int obfc; + for (obfc=0;obfcinput_rate); + if (added_latency < (-(notified_latency + 11035))) + debug(1, "the audio_backend_latency_offset is causing a negative latency!"); + + /* + debug_mutex_lock(&conn->reference_time_mutex, 1000, 0); + conn->remote_reference_timestamp_time = remote_packet_time_ns; + conn->reference_timestamp = + frame_1 - 11035 - added_latency; // add the latency in to the anchortime + debug_mutex_unlock(&conn->reference_time_mutex, 0); + */ + + set_ptp_anchor_info(conn, clock_id, frame_1 - 11035 - added_latency, remote_packet_time_ns); + + } break; + case 0xd6: + // six bytes in is the sequence number at the start of the encrypted audio packet + // returns the sequence number but we're not really interested + decipher_player_put_packet(packet + 6, nread - 6, conn); + break; + default: { + char *packet_in_hex_cstring = + debug_malloc_hex_cstring(packet, nread); // remember to free this afterwards + debug(1, + "AP2 Control Receiver Packet of first byte 0x%02X, type 0x%02X length %d received: " + "\"%s\".", + packet[0], packet[1], nread, packet_in_hex_cstring); + free(packet_in_hex_cstring); + } break; + } + } else { + debug(1, "AP2 Control Receiver -- dropping a packet."); + } + } else { + debug(1, "AP2 Control Receiver -- error %d receiving a packet.", errno); + } + } + debug(1, "AP2 Control RTP thread \"normal\" exit -- this can't happen. Hah!"); + pthread_cleanup_pop(0); // don't execute anything here. + debug(1, "AP2 Control RTP thread exit."); + pthread_exit(NULL); +} + +void rtp_realtime_audio_cleanup_handler(__attribute__((unused)) void *arg) { + debug(1, "Realtime Audio Receiver Cleanup Start."); + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + close(conn->realtime_audio_socket); + conn->realtime_audio_socket = 0; + debug(1, "Realtime Audio Receiver Cleanup Done."); +} + +void *rtp_realtime_audio_receiver(void *arg) { + pthread_cleanup_push(rtp_realtime_audio_cleanup_handler, arg); + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + uint8_t packet[4096]; + int32_t last_seqno = -1; + ssize_t nread; + while (1) { + nread = recv(conn->realtime_audio_socket, packet, sizeof(packet), 0); + + if (nread > 36) { // 36 is the 12-byte header and and 24-byte footer + if ((config.diagnostic_drop_packet_fraction == 0.0) || + (drand48() > config.diagnostic_drop_packet_fraction)) { + + /* + char *packet_in_hex_cstring = + debug_malloc_hex_cstring(packet, nread); // remember to free this afterwards + debug(1, "Audio Receiver Packet of type 0x%02X length %d received: \"%s\".", + packet[1], nread, packet_in_hex_cstring); + free(packet_in_hex_cstring); + */ + + /* + // debug(1, "Realtime Audio Receiver Packet of type 0x%02X length %d received.", packet[1], + nread); + // now get hold of its various bits and pieces + uint8_t version = (packet[0] & 0b11000000) >> 6; + uint8_t padding = (packet[0] & 0b00100000) >> 5; + uint8_t extension = (packet[0] & 0b00010000) >> 4; + uint8_t csrc_count = packet[0] & 0b00001111; + uint8_t marker = (packet[1] & 0b1000000) >> 7; + uint8_t payload_type = packet[1] & 0b01111111; + */ + + int32_t seqno = decipher_player_put_packet(packet + 2, nread - 2, conn); + if (seqno >= 0) { + if (last_seqno == -1) { + last_seqno = seqno; + } else { + last_seqno = (last_seqno + 1) & 0xffff; + // if (seqno != last_seqno) + // debug(3, "RTP: Packets out of sequence: expected: %d, got %d.", last_seqno, seqno); + last_seqno = seqno; // reset warning... + } + } else { + debug(1, "Realtime Audio Receiver -- bad packet dropped."); + } + } else { + debug(3, "Realtime Audio Receiver -- dropping a packet."); + } + } else { + debug(1, "Realtime Audio Receiver -- error receiving a packet."); + } + } + pthread_cleanup_pop(0); // don't execute anything here. + pthread_exit(NULL); +} + +void rtp_buffered_audio_receiver_cleanup_handler(__attribute__((unused)) void *arg) { + debug(1, "Buffered Audio Receiver Cleanup Start."); + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + close(conn->buffered_audio_socket); + conn->buffered_audio_socket = 0; + debug(1, "Buffered Audio Receiver Cleanup Done."); +} + +ssize_t buffered_read(buffered_tcp_desc *descriptor, void *buf, size_t count) { + ssize_t response; + // usleep(1500000); + if (pthread_mutex_lock(&descriptor->mutex) != 0) + debug(1, "problem with mutex"); + pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex); + if ((descriptor->closed == 0)) { + while ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0)) { + if (pthread_cond_wait(&descriptor->not_empty_cv, &descriptor->mutex)) + debug(1, "Error waiting for buffered read"); + } + } + if (descriptor->buffer_occupancy != 0) { + ssize_t bytes_to_move = count; + + if (descriptor->buffer_occupancy < count) + bytes_to_move = descriptor->buffer_occupancy; + + ssize_t top_gap = descriptor->buffer + descriptor->buffer_max_size - descriptor->toq; + if (top_gap < bytes_to_move) + bytes_to_move = top_gap; + + memcpy(buf, descriptor->toq, bytes_to_move); + descriptor->toq += bytes_to_move; + if (descriptor->toq == descriptor->buffer + descriptor->buffer_max_size) + descriptor->toq = descriptor->buffer; + descriptor->buffer_occupancy -= bytes_to_move; + response = bytes_to_move; + if (pthread_cond_signal(&descriptor->not_full_cv)) + debug(1, "Error signalling"); + } else if (descriptor->error_code) { + errno = descriptor->error_code; + response = -1; + } else if (descriptor->closed != 0) { + response = 0; + } + + pthread_cleanup_pop(1); // release the mutex + return response; +} + +#define STANDARD_PACKET_SIZE 65536 + +void buffered_tcp_reader_cleanup_handler(__attribute__((unused)) void *arg) { + debug(2, "Buffered TCP Reader Thread Exit via Cleanup."); +} + +void *buffered_tcp_reader(void *arg) { + pthread_cleanup_push(buffered_tcp_reader_cleanup_handler, NULL); + buffered_tcp_desc *descriptor = (buffered_tcp_desc *)arg; + + listen(descriptor->sock_fd, 5); + ssize_t nread; + SOCKADDR remote_addr; + memset(&remote_addr, 0, sizeof(remote_addr)); + socklen_t addr_size = sizeof(remote_addr); + int finished = 0; + int fd = accept(descriptor->sock_fd, (struct sockaddr *)&remote_addr, &addr_size); + pthread_cleanup_push(socket_cleanup, (void *)fd); + + do { + if (pthread_mutex_lock(&descriptor->mutex) != 0) + debug(1, "problem with mutex"); + pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex); + while ((descriptor->buffer_occupancy == descriptor->buffer_max_size) || + (descriptor->error_code != 0) || (descriptor->closed != 0)) { + if (pthread_cond_wait(&descriptor->not_full_cv, &descriptor->mutex)) + debug(1, "Error waiting for buffered read"); + } + pthread_cleanup_pop(1); // release the mutex + + // now we know it is not full, so go ahead and try to read some more into it + + // wrap + if ((size_t)(descriptor->eoq - descriptor->buffer) == descriptor->buffer_max_size) + descriptor->eoq = descriptor->buffer; + + // figure out how much to ask for + size_t bytes_to_request = STANDARD_PACKET_SIZE; + size_t free_space = descriptor->buffer_max_size - descriptor->buffer_occupancy; + if (bytes_to_request > free_space) + bytes_to_request = free_space; // don't ask for more than will fit + + size_t gap_to_end_of_buffer = + descriptor->buffer + descriptor->buffer_max_size - descriptor->eoq; + if (gap_to_end_of_buffer < bytes_to_request) + bytes_to_request = + gap_to_end_of_buffer; // only ask for what will fill to the top of the buffer + + // do the read + // debug(1, "Request buffered read of up to %d bytes.", bytes_to_request); + nread = recv(fd, descriptor->eoq, bytes_to_request, 0); + // debug(1, "Received %d bytes for a buffer size of %d bytes.",nread, + // descriptor->buffer_occupancy + nread); + if (pthread_mutex_lock(&descriptor->mutex) != 0) + debug(1, "problem with not empty mutex"); + pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex); + if (nread < 0) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "error in buffered_read %d: \"%s\". Could not recv a packet.", errno, errorstring); + descriptor->error_code = errno; + } else if (nread == 0) { + descriptor->closed = 1; + } else if (nread > 0) { + descriptor->eoq += nread; + descriptor->buffer_occupancy += nread; + } else { + debug(1, "buffered audio port closed!"); + } + // signal if we got data or an error or the file closed + if (pthread_cond_signal(&descriptor->not_empty_cv)) + debug(1, "Error signalling"); + pthread_cleanup_pop(1); // release the mutex + } while (finished == 0); + + debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit Begin."); + pthread_cleanup_pop(1); // close the socket + pthread_cleanup_pop(1); // cleanup + debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit -- Shouldn't happen!."); + pthread_exit(NULL); +} + +void avcodec_alloc_context3_cleanup_handler(void *arg) { + debug(3, "avcodec_alloc_context3_cleanup_handler"); + AVCodecContext *codec_context = arg; + av_free(codec_context); +} + +void avcodec_open2_cleanup_handler(void *arg) { + debug(3, "avcodec_open2_cleanup_handler"); + AVCodecContext *codec_context = arg; + avcodec_close(codec_context); +} + +void av_parser_init_cleanup_handler(void *arg) { + debug(3, "av_parser_init_cleanup_handler"); + AVCodecParserContext *codec_parser_context = arg; + av_parser_close(codec_parser_context); +} + +void swr_alloc_cleanup_handler(void *arg) { + debug(3, "swr_alloc_cleanup_handler"); + SwrContext **swr = arg; + swr_free(swr); +} + +void av_packet_alloc_cleanup_handler(void *arg) { + debug(3, "av_packet_alloc_cleanup_handler"); + AVPacket **pkt = arg; + av_packet_free(pkt); +} + +// this will read a block of the size specified to the buffer +// and will return either with the block or on error +ssize_t lread_sized_block(buffered_tcp_desc *descriptor, void *buf, size_t count) { + ssize_t response, nread; + size_t inbuf = 0; // bytes already in the buffer + int keep_trying = 1; + + do { + nread = buffered_read(descriptor, buf + inbuf, count - inbuf); + if (nread == 0) { + // a blocking read that returns zero means eof -- implies connection closed + debug(3, "read_sized_block connection closed."); + keep_trying = 0; + } else if (nread < 0) { + if (errno == EAGAIN) { + debug(1, "read_sized_block getting Error 11 -- EAGAIN from a blocking read!"); + } + if ((errno != ECONNRESET) && (errno != EAGAIN) && (errno != EINTR)) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "read_sized_block read error %d: \"%s\".", errno, (char *)errorstring); + keep_trying = 0; + } + } else { + inbuf += (size_t)nread; + } + } while ((keep_trying != 0) && (inbuf < count)); + if (nread <= 0) + response = nread; + else + response = inbuf; + return response; +} + +// From +// https://stackoverflow.com/questions/18862715/how-to-generate-the-aac-adts-elementary-stream-with-android-mediacodec +// with thanks! +/** + * Add ADTS header at the beginning of each and every AAC packet. + * This is needed as MediaCodec encoder generates a packet of raw + * AAC data. + * + * Note the packetLen must count in the ADTS header itself. + **/ +void addADTStoPacket(uint8_t *packet, int packetLen) { + int profile = 2; // AAC LC + // 39=MediaCodecInfo.CodecProfileLevel.AACObjectELD; + int freqIdx = 4; // 44.1KHz + int chanCfg = 2; // CPE + + // fill in ADTS data + packet[0] = 0xFF; + packet[1] = 0xF9; + packet[2] = ((profile - 1) << 6) + (freqIdx << 2) + (chanCfg >> 2); + packet[3] = ((chanCfg & 3) << 6) + (packetLen >> 11); + packet[4] = (packetLen & 0x7FF) >> 3; + packet[5] = ((packetLen & 7) << 5) + 0x1F; + packet[6] = 0xFC; +} + +void rtp_buffered_audio_cleanup_handler(__attribute__((unused)) void *arg) { + debug(3, "Buffered Audio Receiver Cleanup Start."); + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + close(conn->buffered_audio_socket); + conn->buffered_audio_socket = 0; + debug(2, "Buffered Audio Receiver Cleanup Done."); +} + +void *rtp_buffered_audio_processor(void *arg) { + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + pthread_cleanup_push(rtp_buffered_audio_cleanup_handler, arg); + + pthread_t *buffered_reader_thread = malloc(sizeof(pthread_t)); + if (buffered_reader_thread == NULL) + debug(1, "cannot allocate a buffered_reader_thread!"); + memset(buffered_reader_thread, 0, sizeof(pthread_t)); + pthread_cleanup_push(malloc_cleanup, buffered_reader_thread); + + buffered_tcp_desc *buffered_audio = malloc(sizeof(buffered_tcp_desc)); + if (buffered_audio == NULL) + debug(1, "cannot allocate a buffered_tcp_desc!"); + // initialise the descriptor + memset(buffered_audio, 0, sizeof(buffered_tcp_desc)); + pthread_cleanup_push(malloc_cleanup, buffered_audio); + + if (pthread_mutex_init(&buffered_audio->mutex, NULL)) + debug(1, "Connection %d: error %d initialising buffered_audio mutex.", conn->connection_number, + errno); + pthread_cleanup_push(mutex_cleanup, &buffered_audio->mutex); + + if (pthread_cond_init(&buffered_audio->not_empty_cv, NULL)) + die("Connection %d: error %d initialising not_empty cv.", conn->connection_number, errno); + pthread_cleanup_push(cv_cleanup, &buffered_audio->not_empty_cv); + + if (pthread_cond_init(&buffered_audio->not_full_cv, NULL)) + die("Connection %d: error %d initialising not_full cv.", conn->connection_number, errno); + pthread_cleanup_push(cv_cleanup, &buffered_audio->not_full_cv); + + // initialise the buffer data structure + buffered_audio->buffer_max_size = conn->ap2_audio_buffer_size; + buffered_audio->buffer = malloc(conn->ap2_audio_buffer_size); + if (buffered_audio->buffer == NULL) + debug(1, "cannot allocate an audio buffer of %u bytes!", buffered_audio->buffer_max_size); + pthread_cleanup_push(malloc_cleanup, buffered_audio->buffer); + + // pthread_mutex_lock(&conn->buffered_audio_mutex); + buffered_audio->toq = buffered_audio->buffer; + buffered_audio->eoq = buffered_audio->buffer; + + buffered_audio->sock_fd = conn->buffered_audio_socket; + + pthread_create(buffered_reader_thread, NULL, &buffered_tcp_reader, buffered_audio); + pthread_cleanup_push(thread_cleanup, buffered_reader_thread); + + // ideas and some code from https://rodic.fr/blog/libavcodec-tutorial-decode-audio-file/ + // with thanks + + // initialize all muxers, demuxers and protocols for libavformat + // (does nothing if called twice during the course of one program execution) + // not needed in ffmpeg 4.0 and later... + // av_register_all(); + + AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC); + if (codec == NULL) { + debug(1, "Can't find an AAC decoder!"); + } + + AVCodecContext *codec_context = avcodec_alloc_context3(codec); + if (codec_context == NULL) { + debug(1, "Could not allocate audio codec context!"); + } + // push a deallocator -- av_free(codec_context) + pthread_cleanup_push(avcodec_alloc_context3_cleanup_handler, codec_context); + + if (avcodec_open2(codec_context, codec, NULL) < 0) { + debug(1, "Could not open a codec into the audio codec context"); + } + // push a closer -- avcodec_close(codec_context); + pthread_cleanup_push(avcodec_open2_cleanup_handler, codec_context); + + AVCodecParserContext *codec_parser_context = av_parser_init(codec->id); + if (codec_parser_context == NULL) { + debug(1, "Can't initialise a parser context!"); + } + // push a closer -- av_parser_close(codec_parser_context); + pthread_cleanup_push(av_parser_init_cleanup_handler, codec_parser_context); + + AVPacket *pkt = av_packet_alloc(); + if (pkt == NULL) { + debug(1, "Can't allocate an AV packet"); + } + // push a deallocator -- av_packet_free(pkt); + pthread_cleanup_push(av_packet_alloc_cleanup_handler, &pkt); + + AVFrame *decoded_frame = NULL; + int dst_linesize; + int dst_bufsize; + + // Prepare software resampler to convert floating point (?) to S16 + SwrContext *swr = swr_alloc(); + if (swr == NULL) { + debug(1, "can not allocate a swr context"); + } + // push a deallocator -- av_packet_free(pkt); + pthread_cleanup_push(swr_alloc_cleanup_handler, &swr); + + av_opt_set_int(swr, "in_channel_layout", AV_CH_LAYOUT_STEREO, 0); + av_opt_set_int(swr, "out_channel_layout", AV_CH_LAYOUT_STEREO, 0); + av_opt_set_int(swr, "in_sample_rate", 44100, 0); + av_opt_set_int(swr, "out_sample_rate", 44100, 0); + av_opt_set_sample_fmt(swr, "in_sample_fmt", AV_SAMPLE_FMT_FLTP, 0); + av_opt_set_sample_fmt(swr, "out_sample_fmt", AV_SAMPLE_FMT_S16, 0); + swr_init(swr); + + uint8_t packet[16 * 1024]; + unsigned char m[16 * 1024]; // leave the first 7 bytes blank to make room for the ADTS + uint8_t *pcm_audio = NULL; // the S16 output + unsigned char *data_to_process; + ssize_t data_remaining; + uint32_t seq_no, last_block_flushed_seq_no; // audio packet number + uint32_t previous_seq_no = 0; + int new_buffer_needed; + + uint32_t last_block_flushed_rtp_time; + + ssize_t nread; + + int finished = 0; + int pcm_buffer_size = + (1024 + 352) * 8; // This seems to be right. 8 is for 2 * 32-bit samples per frame + uint8_t pcm_buffer[pcm_buffer_size]; + + int pcm_buffer_occupancy = 0; + int pcm_buffer_read_point = 0; // offset to where the next buffer should come from + uint32_t pcm_buffer_read_point_rtptime = 0; + + uint64_t blocks_read = 0; + int flush_requested = 0; + int streaming_has_started = 0; + do { + int flush_newly_requested = 0; + // are we in in flush mode, or just about to leave it? + debug_mutex_lock(&conn->flush_mutex, 10000, 1); // 10ms is a long time to wait! + uint32_t flushUntilSeq = conn->ap2_flush_sequence_number; + uint32_t flushUntilTS = conn->ap2_flush_rtp_timestamp; + // if we are in flush mode + if (conn->ap2_flush_requested) { + if (flush_requested == 0) { + // here, a flush has been newly requested + debug(1, + "flush requested. flushUntilSeq = %u, flushUntilTS = %u, seq_no: %u, " + "pcm_buffer_read_point_rtptime = " + "%u, buffer_size: %u.", + flushUntilSeq, flushUntilTS, seq_no, pcm_buffer_read_point_rtptime, + buffered_audio->buffer_occupancy); + last_block_flushed_rtp_time = pcm_buffer_read_point_rtptime; + last_block_flushed_seq_no = seq_no; + flush_newly_requested = 1; + } + // if we have arrived at the nominated flush block + // (it must be the case that the block is in the pcm buffer) + if ((blocks_read != 0) && (seq_no == flushUntilSeq)) { + // we may need to flush the pcm_buffer up to flushUntilTS + int32_t flush_difference = flushUntilTS - pcm_buffer_read_point_rtptime; + if (flush_difference < 0) { + debug(1, "flushUntilTS is before the flushUntilSeq's start TS"); + } else if (flush_difference > pcm_buffer_occupancy) { + debug(1, + "flushUntilTS is after the flushUntilSeq's last TS. flushUntilTS = %u, " + "pcm_buffer_read_point_rtptime = %u, pcm_buffer_occupancy = %u.", + flushUntilTS, pcm_buffer_read_point_rtptime, pcm_buffer_occupancy); + } else { + // move the buffer contents from flush_difference to pcm_buffer_occupancy + // to the front of the pcm_buffer + memcpy(pcm_buffer, pcm_buffer + flush_difference * 4, + pcm_buffer_occupancy - flush_difference * 4); + pcm_buffer_occupancy = pcm_buffer_occupancy - flush_difference * 4; + pcm_buffer_read_point = 0; + debug(1, "set pcm_buffer_read_point_rtptime from %u to %u.", + pcm_buffer_read_point_rtptime, flushUntilTS); + pcm_buffer_read_point_rtptime = flushUntilTS; + } + conn->ap2_flush_requested = 0; + } else { + if (seq_no > flushUntilSeq) { + debug(1, + "we have overshot the flush ending block. flushUntilSeq is %u, this block is: %u.", + flushUntilSeq, seq_no); + conn->ap2_flush_requested = 0; // but conn->ap2_play_enabled will remain at zero + } + } + } + flush_requested = conn->ap2_flush_requested; + + // this isn't always the case, i.e. that play is re-enabled after a flush + // if (flush_requested) + // conn->ap2_play_enabled = 0; + // play is disabled until an anchor time is sent and play is restarted + + int play_enabled = conn->ap2_play_enabled; + debug_mutex_unlock(&conn->flush_mutex, 3); + + // do this outside the flush mutex + if (flush_newly_requested) { + player_full_flush(conn); + streaming_has_started = 0; + pcm_buffer_occupancy = 0; + pcm_buffer_read_point = 0; + } + + // now, if a flush is not requested, we can do the normal stuff + if (flush_requested == 0) { + // is there space in the player thread's buffer system? + unsigned int player_buffer_size, player_buffer_occupancy; + get_audio_buffer_size_and_occupancy(&player_buffer_size, &player_buffer_occupancy, conn); + // debug(1,"player buffer size and occupancy: %u and %u", player_buffer_size, + // player_buffer_occupancy); + if (player_buffer_occupancy > + ((0.5 + 0.1) * 44100.0 / 352)) { // must be greater than the lead time. + // if there is enough stuff in the player's buffer, sleep for a while and try again + // debug(1,"sleep for 20 ms"); + usleep(20000); // wait for a while + } else { + if ((pcm_buffer_occupancy - pcm_buffer_read_point) >= (352 * 4)) { + new_buffer_needed = 0; + // send a frame to the player if allowed + // it it's way too late, it means that a new anchor time is needed + + /* + uint32_t at_rtp = conn->reference_timestamp; + at_rtp = + at_rtp - (44100 * 10); // allow it to start a few seconds late, but not + madly late int rtp_diff = pcm_buffer_read_point_rtptime - at_rtp; + */ + + if (play_enabled) { + uint64_t buffer_should_be_time; + if (frame_to_local_time(pcm_buffer_read_point_rtptime, &buffer_should_be_time, conn) == + 0) { + int64_t lead_time = buffer_should_be_time - get_absolute_time_in_ns(); + // debug(1,"lead time in buffered_audio is %f milliseconds.", lead_time * 0.000001); + // ask for 0.5 sec of leadtime + if ((lead_time >= (int64_t)(0.5 * 1000000000)) || (streaming_has_started == 1)) { + if (streaming_has_started == 0) + debug(1, "rtp lead time is %f ms.", 0.000001 * lead_time); + streaming_has_started = 1; +// player_put_packet(0, 0, pcm_buffer_read_point_rtptime, + player_put_packet(0, pcm_buffer_read_point_rtptime, + pcm_buffer + pcm_buffer_read_point, 352, conn); + } + + pcm_buffer_read_point_rtptime += 352; + pcm_buffer_read_point += 352 * 4; + } + // usleep(2000); // let other stuff happens + } else { + usleep(20000); // wait before asking if play is enabled again + } + } else { + new_buffer_needed = 1; + if (pcm_buffer_read_point != 0) { + // debug(1,"pcm_buffer_read_point (frames): %u, pcm_buffer_occupancy (frames): %u", + // pcm_buffer_read_point/4, pcm_buffer_occupancy/4); + // if there is anything to move down to the front of the buffer, do it now; + if ((pcm_buffer_occupancy - pcm_buffer_read_point) > 0) { + // move the remaining frames down to the start of the buffer + // debug(1,"move the remaining frames down to the start of the pcm_buffer"); + memcpy(pcm_buffer, pcm_buffer + pcm_buffer_read_point, + pcm_buffer_occupancy - pcm_buffer_read_point); + pcm_buffer_occupancy = pcm_buffer_occupancy - pcm_buffer_read_point; + } else { + // debug(1,"nothing to move to the front of the buffer"); + pcm_buffer_occupancy = 0; + } + pcm_buffer_read_point = 0; + } + } + } + } + if ((flush_requested) || (new_buffer_needed)) { + + // debug(1,"pcm_buffer_read_point (frames): %u, pcm_buffer_occupancy (frames): %u", + // pcm_buffer_read_point/4, pcm_buffer_occupancy/4); + // ok, so here we know we need material from the sender + // do we will get in a packet of audio + uint16_t data_len; + // here we read from the buffer that our thread has been reading + nread = lread_sized_block(buffered_audio, &data_len, sizeof(data_len)); + if (nread < 0) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data_len .", + errno, errorstring); + // if ((config.diagnostic_drop_packet_fraction == 0.0) || + // (drand48() > config.diagnostic_drop_packet_fraction)) { + } + data_len = ntohs(data_len); + // debug(1,"buffered audio packet of size %u detected.", data_len - 2); + nread = lread_sized_block(buffered_audio, packet, data_len - 2); + // debug(1, "buffered audio packet of size %u received.", nread); + if (nread < 0) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data packet.", + errno, errorstring); + // if ((config.diagnostic_drop_packet_fraction == 0.0) || + // (drand48() > config.diagnostic_drop_packet_fraction)) { + } else if (nread > 0) { + blocks_read++; // note, this doesn't mean they are valid audio blocks + // debug(1, "Realtime Audio Receiver Packet of length %d received.", nread); + // now get hold of its various bits and pieces + /* + uint8_t version = (packet[0] & 0b11000000) >> 6; + uint8_t padding = (packet[0] & 0b00100000) >> 5; + uint8_t extension = (packet[0] & 0b00010000) >> 4; + uint8_t csrc_count = packet[0] & 0b00001111; + */ + seq_no = packet[1] * (1 << 16) + packet[2] * (1 << 8) + packet[3]; + uint32_t timestamp = nctohl(&packet[4]); + // uint32_t ssrc = nctohl(&packet[8]); + // uint8_t marker = 0; + // uint8_t payload_type = 0; + + if ((blocks_read != 0) && (seq_no != previous_seq_no + 1)) { + if (previous_seq_no != 0) + debug(1, "block discontinuity: from sequence number %u to sequence number %u.", + previous_seq_no, seq_no); + if (pcm_buffer_occupancy != 0) { + debug(1, + "rtptime discontinuity! Existing pcm buffer contents with timestamp " + "%u, seq_no %u to new block with timestamp %u, seq_no %u", + pcm_buffer_read_point_rtptime, previous_seq_no, timestamp, seq_no); + pcm_buffer_occupancy = 0; + } + // if still playing, this is a problem -- need to reset the player + if (play_enabled) + player_full_flush(conn); + } + + previous_seq_no = seq_no; + + // at this point, we can check if we can to flush this packet -- we won't have + // to decipher it first + // debug(1,"seq_no %u, timestamp %u", seq_no, timestamp); + + if ((flush_requested) && (seq_no < flushUntilSeq)) { + last_block_flushed_seq_no = seq_no; + last_block_flushed_rtp_time = timestamp; + // debug(1, "flushing block %u, rtptime %u up to block %u, rtptime %u.", seq_no, + // timestamp, flushUntilSeq, flushUntilTS); + } + + if (((flush_requested) && (seq_no == flushUntilSeq)) || (new_buffer_needed)) { + + if ((flush_requested) && (seq_no == flushUntilSeq)) { + int32_t timestamp_difference = timestamp - flushUntilTS; + debug(1, + "end of flush reached. flushUntilSeq = %u, flushUntilTS = %u, last block flushed " + "seq_no = %u, last block flushed rtptime = %u, next block seq_no = %u, next " + "block timestamp = " + "%u, timestamp difference: %d, buffer_size: %u.", + flushUntilSeq, flushUntilTS, last_block_flushed_seq_no, + last_block_flushed_rtp_time, seq_no, timestamp, timestamp_difference, + buffered_audio->buffer_occupancy); + debug(2, + "pcm_buffer_read_point_rtptime = %u, pcm_buffer_occupancy = %u, " + "pcm_buffer_read_point = %u", + pcm_buffer_read_point_rtptime, pcm_buffer_occupancy, pcm_buffer_read_point); + } + + // if we are here because of a flush request, it must be the case that + // flushing the pcm buffer wasn't enough, as the request would have been turned off by now + // so we better indicate that the pcm buffer is empty and its contents invalid + if (flush_requested) { + pcm_buffer_occupancy = 0; + } + + // decode the block and add it to or put it in the pcm buffer + + if (pcm_buffer_occupancy == 0) { + // they should match and the read point should be zero + if ((blocks_read != 0) && (pcm_buffer_read_point_rtptime != timestamp)) { + debug(2, "set pcm_buffer_read_point_rtptime from %u to %u.", + pcm_buffer_read_point_rtptime, timestamp); + pcm_buffer_read_point_rtptime = timestamp; + pcm_buffer_read_point = 0; + } + } + + unsigned char nonce[12]; + memset(nonce, 0, sizeof(nonce)); + memcpy(nonce + 4, packet + nread - 8, + 8); // front-pad the 8-byte nonce received to get the 12-byte nonce expected + + // https://libsodium.gitbook.io/doc/secret-key_cryptography/aead/chacha20-poly1305/ietf_chacha20-poly1305_construction + // Note: the eight-byte nonce must be front-padded out to 12 bytes. + unsigned long long new_payload_length = 0; + int response = crypto_aead_chacha20poly1305_ietf_decrypt( + m + 7, // m + &new_payload_length, // mlen_p + NULL, // nsec, + packet + 12, // the ciphertext starts 12 bytes in and is followed by the MAC tag, + nread - (8 + 12), // clen -- the last 8 bytes are the nonce + packet + 4, // authenticated additional data + 8, // authenticated additional data length + nonce, + conn->session_key); // *k + if (response != 0) { + debug(1, "Error decrypting audio packet %u -- packet length %d.", seq_no, nread); + } else { + // now pass it in to the regular processing chain + + unsigned long long max_int = INT_MAX; // put in the right format + if (new_payload_length > max_int) + debug(1, "Madly long payload length!"); + int payload_length = new_payload_length; // change from long long to int + int aac_packet_length = payload_length + 7; + + // now, fill in the 7-byte ADTS information, which seems to be needed by the decoder + // we made room for it in the front of the buffer + + addADTStoPacket(m, aac_packet_length); + + // now we are ready to send this to the decoder + + data_to_process = m; + data_remaining = aac_packet_length; + int ret = 0; + // there can be more than one av packet (? terminology) in a block + while (data_remaining > 0) { + if (decoded_frame == NULL) { + decoded_frame = av_frame_alloc(); + if (decoded_frame == NULL) + debug(1, "could not allocate av_frame"); + } else { + ret = av_parser_parse2(codec_parser_context, codec_context, &pkt->data, &pkt->size, + data_to_process, data_remaining, AV_NOPTS_VALUE, + AV_NOPTS_VALUE, 0); + if (ret < 0) { + debug(1, "error while parsing deciphered audio packet."); + } else { + data_to_process += ret; + data_remaining -= ret; + // debug(1, "frame found"); + // now pass each packet to be decoded + if (pkt->size) { + ret = avcodec_send_packet(codec_context, pkt); + if (ret < 0) { + debug(1, "error sending packet to decoder"); + } else { + while (ret >= 0) { + ret = avcodec_receive_frame(codec_context, decoded_frame); + if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) + break; + else if (ret < 0) { + debug(1, "error %d during decoding", ret); + } else { + av_samples_alloc(&pcm_audio, &dst_linesize, codec_context->channels, + decoded_frame->nb_samples, AV_SAMPLE_FMT_S16, 1); + // remember to free pcm_audio + ret = swr_convert(swr, &pcm_audio, decoded_frame->nb_samples, + (const uint8_t **)decoded_frame->extended_data, + decoded_frame->nb_samples); + dst_bufsize = av_samples_get_buffer_size( + &dst_linesize, codec_context->channels, ret, AV_SAMPLE_FMT_S16, 1); + // debug(1,"generated %d bytes of PCM", dst_bufsize); + // copy the PCM audio into the PCM buffer. + // make sure it's big enough first + if ((pcm_buffer_size - pcm_buffer_occupancy) < dst_bufsize) { + debug(1, + "pcm_buffer_read_point (frames): %u, pcm_buffer_occupancy " + "(frames): %u", + pcm_buffer_read_point / 4, pcm_buffer_occupancy / 4); + pcm_buffer_size = dst_bufsize + pcm_buffer_occupancy; + debug(1, "fatal error! pcm buffer too small at %d bytes.", + pcm_buffer_size); + } else { + memcpy(pcm_buffer + pcm_buffer_occupancy, pcm_audio, dst_bufsize); + pcm_buffer_occupancy += dst_bufsize; + } + // debug(1,"decoded %d samples", decoded_frame->nb_samples); + // memcpy(sampleBuffer,outputBuffer16,dst_bufsize); + av_freep(&pcm_audio); + } + } + } + } + } + if (decoded_frame == NULL) + debug(1, "decoded_frame is NULL"); + if (decoded_frame != NULL) + av_frame_free(&decoded_frame); + } + } + + // revert the state of cancellability + } + } + } else { + // nread is 0 -- the port has been closed + // finished = 1; + debug(1, "buffered audio port closed!"); + finished = 1; + } + } + + } while (finished == 0); + debug(1, "Buffered Audio Receiver RTP thread \"normal\" exit."); + pthread_cleanup_pop(1); // deallocate the swr + pthread_cleanup_pop(1); // deallocate the av_packet + pthread_cleanup_pop(1); // av_parser_init_cleanup_handler + pthread_cleanup_pop(1); // avcodec_open2_cleanup_handler + pthread_cleanup_pop(1); // avcodec_alloc_context3_cleanup_handler + pthread_cleanup_pop(1); // thread creation + pthread_cleanup_pop(1); // buffer malloc + pthread_cleanup_pop(1); // not_full_cv + pthread_cleanup_pop(1); // not_empty_cv + pthread_cleanup_pop(1); // mutex + pthread_cleanup_pop(1); // descriptor malloc + pthread_cleanup_pop(1); // pthread_t malloc + pthread_cleanup_pop(1); // do the cleanup. + pthread_exit(NULL); +} + +#endif +#endif + diff --git a/rtp.h b/rtp.h index 171bd651..4d5f778f 100644 --- a/rtp.h +++ b/rtp.h @@ -17,17 +17,11 @@ void rtp_setup(SOCKADDR *local, SOCKADDR *remote, uint16_t controlport, uint16_t void rtp_request_resend(seq_t first, uint32_t count, rtsp_conn_info *conn); void rtp_request_client_pause(rtsp_conn_info *conn); // ask the client to pause -void get_reference_timestamp_stuff(uint32_t *timestamp, uint64_t *timestamp_time, - uint64_t *remote_timestamp_time, rtsp_conn_info *conn); -void clear_reference_timestamp(rtsp_conn_info *conn); +void reset_anchor_info(rtsp_conn_info *conn); int have_timestamp_timing_information(rtsp_conn_info *conn); -int get_frame_play_time(int64_t timestamp, int sample_ratio, uint64_t *time_to_play); - int frame_to_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info *conn); int local_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn); -int sanitised_source_rate_information(uint32_t *frames, uint64_t *time, rtsp_conn_info *conn); - #endif // _RTP_H -- 2.47.3