From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Sun, 16 Nov 2025 16:58:34 +0000 (+0000) Subject: Do a long-overdue rebuild of the buffered audio processor and, with everything we... X-Git-Tag: 5.0-post-dev~63 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e12a3ed7b21eaaf2adb280f193aed91fdb50e439;p=thirdparty%2Fshairport-sync.git Do a long-overdue rebuild of the buffered audio processor and, with everything we have figured out, implement a cleaner method to handle both immediate and deferred flush requests. Remove lots of old flags and redundant code. Leave in a few debug messages. --- diff --git a/alac.c b/alac.c index daca81c9..0b5587f3 100644 --- a/alac.c +++ b/alac.c @@ -44,13 +44,13 @@ static const int host_bigendian = 0; #define _Swap32(v) \ do { \ - v = (((v) & 0x000000FF) << 0x18) | (((v) & 0x0000FF00) << 0x08) | \ - (((v) & 0x00FF0000) >> 0x08) | (((v) & 0xFF000000) >> 0x18); \ + v = (((v)&0x000000FF) << 0x18) | (((v)&0x0000FF00) << 0x08) | (((v)&0x00FF0000) >> 0x08) | \ + (((v)&0xFF000000) >> 0x18); \ } while (0) #define _Swap16(v) \ do { \ - v = (((v) & 0x00FF) << 0x08) | (((v) & 0xFF00) >> 0x08); \ + v = (((v)&0x00FF) << 0x08) | (((v)&0xFF00) >> 0x08); \ } while (0) struct { @@ -603,11 +603,11 @@ static void deinterlace_24(int32_t *buffer_a, int32_t *buffer_b, int uncompresse right |= uncompressed_bytes_buffer_b[i] & mask; } - ((uint8_t *)buffer_out)[i * numchannels * 3] = (left) & 0xFF; + ((uint8_t *)buffer_out)[i * numchannels * 3] = (left)&0xFF; ((uint8_t *)buffer_out)[i * numchannels * 3 + 1] = (left >> 8) & 0xFF; ((uint8_t *)buffer_out)[i * numchannels * 3 + 2] = (left >> 16) & 0xFF; - ((uint8_t *)buffer_out)[i * numchannels * 3 + 3] = (right) & 0xFF; + ((uint8_t *)buffer_out)[i * numchannels * 3 + 3] = (right)&0xFF; ((uint8_t *)buffer_out)[i * numchannels * 3 + 4] = (right >> 8) & 0xFF; ((uint8_t *)buffer_out)[i * numchannels * 3 + 5] = (right >> 16) & 0xFF; } @@ -631,11 +631,11 @@ static void deinterlace_24(int32_t *buffer_a, int32_t *buffer_b, int uncompresse right |= uncompressed_bytes_buffer_b[i] & mask; } - ((uint8_t *)buffer_out)[i * numchannels * 3] = (left) & 0xFF; + ((uint8_t *)buffer_out)[i * numchannels * 3] = (left)&0xFF; ((uint8_t *)buffer_out)[i * numchannels * 3 + 1] = (left >> 8) & 0xFF; ((uint8_t *)buffer_out)[i * numchannels * 3 + 2] = (left >> 16) & 0xFF; - ((uint8_t *)buffer_out)[i * numchannels * 3 + 3] = (right) & 0xFF; + ((uint8_t *)buffer_out)[i * numchannels * 3 + 3] = (right)&0xFF; ((uint8_t *)buffer_out)[i * numchannels * 3 + 4] = (right >> 8) & 0xFF; ((uint8_t *)buffer_out)[i * numchannels * 3 + 5] = (right >> 16) & 0xFF; } @@ -798,7 +798,7 @@ void alac_decode_frame(alac_file *alac, unsigned char *inbuffer, void *outbuffer sample |= alac->uncompressed_bytes_buffer_a[i] & mask; } - ((uint8_t *)outbuffer)[i * alac->numchannels * 3] = (sample) & 0xFF; + ((uint8_t *)outbuffer)[i * alac->numchannels * 3] = (sample)&0xFF; ((uint8_t *)outbuffer)[i * alac->numchannels * 3 + 1] = (sample >> 8) & 0xFF; ((uint8_t *)outbuffer)[i * alac->numchannels * 3 + 2] = (sample >> 16) & 0xFF; } diff --git a/audio.c b/audio.c index fadda368..b188ac73 100644 --- a/audio.c +++ b/audio.c @@ -334,7 +334,8 @@ uint32_t get_rate_settings(const char *stanza_name, const char *setting_name) { #endif } else { warn("In the \"%s\" setting in the \"%s\" section of the configuration file, an invalid " - "character string -- \"%s\" -- has been detected. (Note that numbers must not be enclosed in quotes.)", + "character string -- \"%s\" -- has been detected. (Note that numbers must not be " + "enclosed in quotes.)", setting_name, stanza_name, config_setting_get_string(rate_setting)); } } else { @@ -373,10 +374,10 @@ uint32_t get_rate_settings(const char *stanza_name, const char *setting_name) { } free(rates); } else { - warn( - "in the \"%s\" setting in the \"%s\" section of the configuration file, an error " - "has been detected at argument %d. (Note that numbers must not be enclosed in quotes.)", - setting_name, stanza_name, -rate_settings_count); + warn("in the \"%s\" setting in the \"%s\" section of the configuration file, an error " + "has been detected at argument %d. (Note that numbers must not be enclosed in " + "quotes.)", + setting_name, stanza_name, -rate_settings_count); } } } diff --git a/audio_alsa.c b/audio_alsa.c index 5081ad96..de9361da 100644 --- a/audio_alsa.c +++ b/audio_alsa.c @@ -255,7 +255,7 @@ static int get_permissible_configuration_settings() { int card_number = snd_pcm_info_get_card(local_alsa_info); if (card_number >= 0) { - debug(1, "output device is card %d.", card_number); + debug(2, "output device is card %d.", card_number); char device_name[64] = ""; snprintf(device_name, sizeof(device_name) - 1, "hw:%d", card_number); snd_ctl_t *handle; @@ -266,7 +266,7 @@ static int get_permissible_configuration_settings() { snd_ctl_card_info_alloca(&info); err = snd_ctl_card_info(handle, info); if (err == 0) { - debug(1, "card name: \"%s\", long name: \"%s\".", snd_ctl_card_info_get_name(info), + debug(2, "card name: \"%s\", long name: \"%s\".", snd_ctl_card_info_get_name(info), snd_ctl_card_info_get_longname(info)); } snd_ctl_close(handle); @@ -274,7 +274,7 @@ static int get_permissible_configuration_settings() { } debug( - 1, "device: \"%s\", name: \"%s\", type: \"%s\", id: \"%s\", CARD=%d,DEV=%u,SUBDEV=%u.", + 2, "device: \"%s\", name: \"%s\", type: \"%s\", id: \"%s\", CARD=%d,DEV=%u,SUBDEV=%u.", alsa_out_dev, snd_pcm_info_get_name(local_alsa_info), device_types[device_type], snd_pcm_info_get_id(local_alsa_info), snd_pcm_info_get_card(local_alsa_info), snd_pcm_info_get_device(local_alsa_info), snd_pcm_info_get_subdevice(local_alsa_info)); diff --git a/common.c b/common.c index 371ac15d..0e5c75dd 100644 --- a/common.c +++ b/common.c @@ -2833,21 +2833,17 @@ void sanity_check_ir_files(const int option_print_level, ir_file_info_t *files, SF_INFO sfinfo = {}; // sfinfo.format = 0; - + SNDFILE *file = sf_open(files[i].filename, SFM_READ, &sfinfo); if (file) { // files[i].evaluation = ev_okay; files[i].samplerate = sfinfo.samplerate; files[i].channels = sfinfo.channels; - debug( - option_print_level, - "convolution impulse response file \"%s\": %" PRId64 " frames (%.1f seconds), %d channel%s at %d frames per second.", - files[i].filename, - sfinfo.frames, - (float) sfinfo.frames / sfinfo.samplerate, - sfinfo.channels, - sfinfo.channels == 1 ? "" : "s", - sfinfo.samplerate); + debug(option_print_level, + "convolution impulse response file \"%s\": %" PRId64 + " frames (%.1f seconds), %d channel%s at %d frames per second.", + files[i].filename, sfinfo.frames, (float)sfinfo.frames / sfinfo.samplerate, + sfinfo.channels, sfinfo.channels == 1 ? "" : "s", sfinfo.samplerate); sf_close(file); } else { // files[i].evaluation = ev_invalid; diff --git a/dacp.c b/dacp.c index 61ce22bc..a7574f9b 100644 --- a/dacp.c +++ b/dacp.c @@ -117,7 +117,8 @@ void response_body(void *opaque, const char *data, int size) { static void response_header(__attribute__((unused)) void *opaque, __attribute__((unused)) const char *ckey, __attribute__((unused)) int nkey, __attribute__((unused)) const char *cvalue, - __attribute__((unused)) int nvalue) { /* example doesn't care about headers */ } + __attribute__((unused)) int nvalue) { /* example doesn't care about headers */ +} static void response_code(void *opaque, int code) { struct HttpResponse *response = (struct HttpResponse *)opaque; @@ -505,7 +506,7 @@ void dacp_monitor_port_update_callback(char *dacp_id, uint16_t port) { void *dacp_monitor_thread_code(__attribute__((unused)) void *na) { // #include // debug(1, "dacp_monitor_thread_code PID %d", syscall(SYS_gettid)); - int scan_index = 0; + // int scan_index = 0; int always_use_revision_number_1 = 0; // char server_reply[10000]; // debug(1, "DACP monitor thread started."); @@ -582,7 +583,7 @@ void *dacp_monitor_thread_code(__attribute__((unused)) void *na) { mdns_dacp_monitor_set_id(dacp_server.dacp_id); } } else { - scan_index++; + // scan_index++; // debug(1,"DACP Scan Result: %d.", result); if ((result == 200) || (result == 400)) { diff --git a/dbus-service.c b/dbus-service.c index 71bd8a05..f3b4066b 100644 --- a/dbus-service.c +++ b/dbus-service.c @@ -633,7 +633,7 @@ gboolean notify_convolution_impulse_response_files_callback(__attribute__((unuse __attribute__((unused)) gpointer user_data) { __attribute__((unused)) char *th = - (char *)shairport_sync_get_convolution_impulse_response_file(skeleton); + (char *)shairport_sync_get_convolution_impulse_response_files(skeleton); return TRUE; } #endif @@ -940,7 +940,6 @@ static gboolean on_handle_set_frame_position_update_interval(ShairportSync *skel static void on_dbus_name_acquired(GDBusConnection *connection, const gchar *name, __attribute__((unused)) gpointer user_data) { - const char *str = NULL; debug(2, "Shairport Sync native D-Bus interface \"%s\" acquired on the %s bus.", name, (dbus_bus_type == G_BUS_TYPE_SESSION) ? "session" : "system"); @@ -1136,6 +1135,7 @@ static void on_dbus_name_acquired(GDBusConnection *connection, const gchar *name shairport_sync_set_convolution_enabled(SHAIRPORT_SYNC(shairportSyncSkeleton), TRUE); } + const char *str = NULL; if ((config.cfg != NULL) && (config_lookup_non_empty_string(config.cfg, "dsp.convolution_ir_files", &str))) { shairport_sync_set_convolution_impulse_response_files(SHAIRPORT_SYNC(shairportSyncSkeleton), diff --git a/loudness.c b/loudness.c index 5bb37242..67c1ff09 100644 --- a/loudness.c +++ b/loudness.c @@ -92,9 +92,9 @@ void loudness_process_blocks(float *fbufs, unsigned int channel_length, void loudness_reset() { unsigned int i; for (i = 0; i < MAXCHANNELS; i++) { - loudness_filters[i].i1 = 0.0; - loudness_filters[i].i2 = 0.0; - loudness_filters[i].o1 = 0.0; - loudness_filters[i].o2 = 0.0; + loudness_filters[i].i1 = 0.0; + loudness_filters[i].i2 = 0.0; + loudness_filters[i].o1 = 0.0; + loudness_filters[i].o2 = 0.0; } } \ No newline at end of file diff --git a/mqtt.c b/mqtt.c index fcfb8666..4f42390e 100644 --- a/mqtt.c +++ b/mqtt.c @@ -54,10 +54,10 @@ void on_message(__attribute__((unused)) struct mosquitto *mosq, debug(2, "[MQTT]: received Message on topic %s: %s\n", msg->topic, payload); // All recognized commands - char *commands[] = {"command", "beginff", "beginrew", "mutetoggle", "nextitem", - "previtem", "pause", "playpause", "play", "stop", - "playresume", "shuffle_songs", "volumedown", "volumeup", "disconnect", - NULL}; + char *commands[] = {"command", "beginff", "beginrew", "mutetoggle", + "nextitem", "previtem", "pause", "playpause", + "play", "stop", "playresume", "shuffle_songs", + "volumedown", "volumeup", "disconnect", NULL}; int it = 0; diff --git a/player.c b/player.c index ecb6a5c9..6462e5e1 100644 --- a/player.c +++ b/player.c @@ -501,6 +501,25 @@ uint32_t get_ssrc_rate(ssrc_t ssrc) { return response; } +size_t get_ssrc_block_length(ssrc_t ssrc) { + size_t response = 0; + switch (ssrc) { + case ALAC_44100_S16_2: + case ALAC_48000_S24_2: + response = 352; + break; + case AAC_44100_F24_2: + case AAC_48000_F24_2: + case AAC_48000_F24_5P1: + case AAC_48000_F24_7P1: + response = 1024; + break; + default: + break; + } + return response; +} + int setup_software_resampler(rtsp_conn_info *conn, ssrc_t ssrc) { int response = 0; @@ -1073,6 +1092,12 @@ void prepare_decoding_chain(rtsp_conn_info *conn, ssrc_t ssrc) { pthread_cleanup_push(avcodec_open2_cleanup_handler, conn->codec_context); conn->input_rate = get_ssrc_rate(ssrc); + if ((ssrc == ALAC_48000_S24_2) || (ssrc == ALAC_44100_S16_2)) { + conn->frames_per_packet = 352; + } else { + conn->frames_per_packet = 1024; + } + conn->codec_context->sample_rate = conn->input_rate; conn->ffmpeg_decoding_chain_initialised = 1; @@ -2158,7 +2183,7 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn, int resync_requested) { if (thePacket == NULL) debug(2, "Connection %d: packet %u is empty.", conn->connection_number, conn->ab_read); else - debug(2, "Connection %d: packet %u not ready.", conn->connection_number, conn->ab_read); + debug(3, "Connection %d: packet %u not ready.", conn->connection_number, conn->ab_read); conn->ab_read++; conn->last_seqno_read++; // don' let it trigger the missing packet warning... } @@ -3353,7 +3378,11 @@ void *player_thread_func(void *arg) { uint64_t time_of_last_metadata_progress_update = 0; // the assignment is to stop a compiler warning... #endif + +#ifdef CONFIG_CONVOLUTION double highest_convolver_output_db = 0.0; +#endif + uint64_t previous_frames_played = 0; // initialised to avoid a "possibly uninitialised" warning uint64_t previous_raw_measurement_time = 0; // initialised to avoid a "possibly uninitialised" warning @@ -3379,10 +3408,14 @@ void *player_thread_func(void *arg) { conn->frames_per_packet = 352; // for ALAC -- will be changed if necessary #ifdef CONFIG_AIRPLAY_2 - conn->ap2_flush_requested = 0; - conn->ap2_flush_from_valid = 0; conn->ap2_rate = 0; conn->ap2_play_enabled = 0; + + unsigned int f = 0; + for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) { + conn->ap2_deferred_flush_requests[f].inUse = 0; + conn->ap2_deferred_flush_requests[f].active = 0; + } #endif const unsigned int sync_history_length = 40; @@ -4444,9 +4477,9 @@ void *player_thread_func(void *arg) { inframe->timestamp, centered_sync_error_time * 1000, occ); unsigned int s; for (s = 0; s < conn->sync_samples_count; s++) { - debug(2, "sample: %u, value: %.3f ms", s, sync_samples[s] * 0.000001); + debug(3, "sample: %u, value: %.3f ms", s, sync_samples[s] * 0.000001); } - debug(2, "sync_history_length: %u, samples_count: %u, sample_index: %u", + debug(3, "sync_history_length: %u, samples_count: %u, sample_index: %u", sync_history_length, conn->sync_samples_count, last_sample_index); } sync_error_out_of_bounds = 0; @@ -4551,8 +4584,8 @@ void *player_thread_func(void *arg) { config.convolution_max_length_in_seconds, inframe->length); convolver_wait_for_all(); // if (convolver_is_valid) - // debug(1, "convolver_init for %u channels was successful.", conn->input_num_channels); - // convolver_is_valid = convolver_init( + // debug(1, "convolver_init for %u channels was successful.", + // conn->input_num_channels); convolver_is_valid = convolver_init( // convolver_file_found, conn->input_num_channels, // config.convolution_max_length_in_seconds, inframe->length); } @@ -4564,14 +4597,14 @@ void *player_thread_func(void *arg) { debug(1, "convolver: \"%s\".", convolver_file_found); } if (convolver_is_valid != 0) { - for (j = 0; j < conn->input_num_channels; j++) { + for (j = 0; j < conn->input_num_channels; j++) { // convolver_process(j, fbufs[j], inframe->length); convolver_process(j, fbufs[j], inframe->length); } convolver_wait_for_all(); } - - // apply convolution gain even if no convolution is done... + + // apply convolution gain even if no convolution is done... float gain = pow(10.0, config.convolution_gain / 20.0); for (i = 0; i < inframe->length; ++i) { for (j = 0; j < conn->input_num_channels; j++) { @@ -4583,13 +4616,15 @@ void *player_thread_func(void *arg) { if (output_level_db > highest_convolver_output_db) { highest_convolver_output_db = output_level_db; if ((highest_convolver_output_db + config.convolution_gain) > 0.0) - warn("clipping %.1f dB with convolution gain set to %.1f dB!", highest_convolver_output_db + config.convolution_gain, config.convolution_gain); - } + warn("clipping %.1f dB with convolution gain set to %.1f dB!", + highest_convolver_output_db + config.convolution_gain, + config.convolution_gain); + } fbufs[j][i] *= gain; } } } - + #endif if (conn->do_loudness) { loudness_process_blocks((float *)fbufs, inframe->length, diff --git a/player.h b/player.h index 01342620..e7b0a13f 100644 --- a/player.h +++ b/player.h @@ -17,6 +17,7 @@ #endif #ifdef CONFIG_AIRPLAY_2 +#define MAX_DEFERRED_FLUSH_REQUESTS 10 #include "pair_ap/pair.h" #include #endif @@ -242,16 +243,14 @@ typedef struct { char *data_cipher_salt; } ap2_pairing; -// flush requests are stored in order of flushFromSeq -// on the basis that block numbers are monotonic modulo 2^24 -typedef struct flush_request_t { - int flushNow; // if true, the flushFrom stuff is invalid - uint32_t flushFromSeq; +typedef struct { + uint32_t inUse; // record free or contains a current flush record + uint32_t active; // set if blocks within the given range are being flushed. uint32_t flushFromTS; - uint32_t flushUntilSeq; + uint32_t flushFromSeq; uint32_t flushUntilTS; - struct flush_request_t *next; -} flush_request_t; + uint32_t flushUntilSeq; +} ap2_flush_request_t; #endif @@ -345,7 +344,7 @@ typedef struct { uint32_t flush_rtp_timestamp; uint64_t time_of_last_audio_packet; seq_t ab_read, ab_write; - + int do_loudness; // if loudness is requested and there is no external mixer #ifdef CONFIG_MBEDTLS @@ -431,15 +430,15 @@ typedef struct { uint64_t last_anchor_time_of_update; uint64_t last_anchor_validity_start_time; + int ap2_immediate_flush_requested; + uint32_t ap2_immediate_flush_until_rtp_timestamp; + uint32_t ap2_immediate_flush_until_sequence_number; + + ap2_flush_request_t ap2_deferred_flush_requests[MAX_DEFERRED_FLUSH_REQUESTS]; + ssize_t ap2_audio_buffer_size; ssize_t ap2_audio_buffer_minimum_size; - flush_request_t *flush_requests; // if non-null, there are flush requests, mutex protected - int ap2_flush_requested; - int ap2_flush_from_valid; - uint32_t ap2_flush_from_rtp_timestamp; - uint32_t ap2_flush_from_sequence_number; - uint32_t ap2_flush_until_rtp_timestamp; - uint32_t ap2_flush_until_sequence_number; + int ap2_rate; // protect with flush mutex, 0 means don't play, 1 means play int ap2_play_enabled; // protect with flush mutex @@ -612,6 +611,8 @@ int64_t monotonic_timestamp(uint32_t timestamp, double suggested_volume(rtsp_conn_info *conn); // volume suggested for the connection const char *get_ssrc_name(ssrc_t ssrc); +size_t get_ssrc_block_length(ssrc_t ssrc); + int ssrc_is_recognised(ssrc_t ssrc); int ssrc_is_aac(ssrc_t ssrc); // used to decide if a mute might be needed (AAC only) void prepare_decoding_chain(rtsp_conn_info *conn, ssrc_t ssrc); // also sets up timing stuff diff --git a/rtp.c b/rtp.c index 080dadf4..e763f2d9 100644 --- a/rtp.c +++ b/rtp.c @@ -61,7 +61,6 @@ #include "FFTConvolver/convolver.h" #endif - struct Nvll { char *name; double value; @@ -778,7 +777,7 @@ void rtp_timing_receiver_cleanup_handler(void *arg) { while ((gradients) && (strcasecmp((const char *)&conn->client_ip_string, gradients->name) != 0)) gradients = gradients->next; - // if gradients comes out of this non-null, it is pointing to the DACP and it's last-known + // if gradients comes out of this non-null, it is pointing to the DACP and its last-known // gradient if (gradients) { gradients->value = conn->local_to_remote_time_gradient; @@ -829,7 +828,7 @@ void *rtp_timing_receiver(void *arg) { while ((gradients) && (strcasecmp((const char *)&conn->client_ip_string, gradients->name) != 0)) gradients = gradients->next; - // if gradients comes out of this non-null, it is pointing to the IP and it's last-known gradient + // if gradients comes out of this non-null, it is pointing to the IP and its last-known gradient if (gradients) { conn->local_to_remote_time_gradient = gradients->value; // debug(1,"Using a stored drift of %.2f ppm for \"%s\".", (conn->local_to_remote_time_gradient @@ -1437,10 +1436,13 @@ void set_ptp_anchor_info(rtsp_conn_info *conn, uint64_t clock_id, uint32_t rtpti conn->anchor_remote_info_is_valid = 1; // these can be modified if the master clock changes over time + conn->anchor_rtptime = rtptime; conn->anchor_time = networktime; conn->anchor_clock = clock_id; + debug(2, "set_ptp_anchor_info done."); } + int long_time_notifcation_done = 0; uint64_t previous_offset = 0; @@ -1620,7 +1622,7 @@ int frame_to_ptp_local_time(uint32_t timestamp, uint64_t *time, rtsp_conn_info * *time = ltime; result = 0; } else { - debug(3, "frame_to_ptp_local_time can't get anchor local time information"); + debug(2, "frame_to_ptp_local_time can't get anchor local time information"); } return result; } @@ -1639,7 +1641,7 @@ int local_ptp_time_to_frame(uint64_t time, uint32_t *frame, rtsp_conn_info *conn *frame = lframe; result = 0; } else { - debug(3, "local_ptp_time_to_frame can't get anchor local time information"); + debug(2, "local_ptp_time_to_frame can't get anchor local time information"); } return result; } @@ -1752,7 +1754,7 @@ void *rtp_event_receiver(void *arg) { debug(3, "Connection %d: Packet Received on Event Port with contents: \"%s\".", conn->connection_number, packet); } else { - debug(2, "Connection %d: Event Port connection closed by client", + debug(1, "Connection %d: Event Port connection closed by client", conn->connection_number); finished = 1; } @@ -2103,19 +2105,32 @@ ssize_t buffered_read(buffered_tcp_desc *descriptor, void *buf, size_t count, if (debug_mutex_lock(&descriptor->mutex, 50000, 1) != 0) debug(1, "problem with mutex"); pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex); - if (descriptor->closed == 0) { - if ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0)) { - debug(2, "buffered_read: waiting for %u bytes.", count); - } - while ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0)) { - if (pthread_cond_wait(&descriptor->not_empty_cv, &descriptor->mutex)) - debug(1, "Error waiting for buffered read"); - else - debug(2, "buffered_read: signalled with %u bytes after waiting.", - descriptor->buffer_occupancy); - } + // wipe the slate dlean before reading... + descriptor->error_code = 0; + descriptor->closed = 0; + + if (descriptor->buffer_occupancy == 0) { + debug(2, "buffered_read: buffer empty -- waiting for %u bytes.", count); } - if (descriptor->buffer_occupancy != 0) { + + while ((descriptor->buffer_occupancy == 0) && (descriptor->error_code == 0) && + (descriptor->closed == 0)) { + if (pthread_cond_wait(&descriptor->not_empty_cv, &descriptor->mutex)) + debug(1, "Error waiting for buffered read"); + else + debug(2, "buffered_read: signalled with %u bytes after waiting.", + descriptor->buffer_occupancy); + } + + if (descriptor->error_code) { + errno = descriptor->error_code; + debug(1, "buffered_read: error %d.", errno); + response = -1; + } else if (descriptor->closed != 0) { + debug(1, "buffered_read: connection closed."); + errno = 0; // no error -- just closed + response = 0; + } else if (descriptor->buffer_occupancy != 0) { ssize_t bytes_to_move = count; if (descriptor->buffer_occupancy < count) { @@ -2136,11 +2151,6 @@ ssize_t buffered_read(buffered_tcp_desc *descriptor, void *buf, size_t count, response = bytes_to_move; if (pthread_cond_signal(&descriptor->not_full_cv)) debug(1, "Error signalling"); - } else if (descriptor->error_code) { - errno = descriptor->error_code; - response = -1; - } else if (descriptor->closed != 0) { - response = 0; } pthread_cleanup_pop(1); // release the mutex @@ -2176,10 +2186,9 @@ void *buffered_tcp_reader(void *arg) { if (debug_mutex_lock(&descriptor->mutex, 500000, 1) != 0) debug(1, "problem with mutex"); pthread_cleanup_push(mutex_unlock, (void *)&descriptor->mutex); - while ((descriptor->buffer_occupancy == descriptor->buffer_max_size) || - (descriptor->error_code != 0) || (descriptor->closed != 0)) { + while (descriptor->buffer_occupancy == descriptor->buffer_max_size) { if (pthread_cond_wait(&descriptor->not_full_cv, &descriptor->mutex)) - debug(1, "Error waiting for buffered read"); + debug(1, "Error waiting for not_full_cv"); } pthread_cleanup_pop(1); // release the mutex @@ -2202,7 +2211,8 @@ void *buffered_tcp_reader(void *arg) { gap_to_end_of_buffer; // only ask for what will fill to the top of the buffer // do the read - // debug(1, "Request buffered read of up to %d bytes.", bytes_to_request); + if (descriptor->buffer_occupancy == 0) + debug(2, "recv of up to %d bytes with an buffer empty.", bytes_to_request); nread = recv(fd, descriptor->eoq, bytes_to_request, 0); // debug(1, "Received %d bytes for a buffer size of %d bytes.",nread, // descriptor->buffer_occupancy + nread); @@ -2217,12 +2227,13 @@ void *buffered_tcp_reader(void *arg) { descriptor->error_code = errno; } else if (nread == 0) { descriptor->closed = 1; + debug(1, "buffered audio port closed. Terminating the buffered_tcp_reader thread."); + finished = 1; } else if (nread > 0) { descriptor->eoq += nread; descriptor->buffer_occupancy += nread; - } else { - debug(1, "buffered audio port closed!"); } + // signal if we got data or an error or the file closed if (pthread_cond_signal(&descriptor->not_empty_cv)) debug(1, "Error signalling"); @@ -2233,10 +2244,10 @@ void *buffered_tcp_reader(void *arg) { usleep(10000); // give other threads a chance to run... } while (finished == 0); - debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit Begin."); + debug(3, "Buffered TCP Reader Thread Exit \"Normal\" Exit Begin."); pthread_cleanup_pop(1); // close the socket pthread_cleanup_pop(1); // cleanup - debug(1, "Buffered TCP Reader Thread Exit \"Normal\" Exit -- Shouldn't happen!."); + debug(2, "Buffered TCP Reader Thread Exit \"Normal\" Exit."); pthread_exit(NULL); } @@ -2252,7 +2263,7 @@ ssize_t lread_sized_block(buffered_tcp_desc *descriptor, void *buf, size_t count nread = buffered_read(descriptor, buf + inbuf, count - inbuf, bytes_remaining); if (nread == 0) { // a blocking read that returns zero means eof -- implies connection closed - debug(3, "read_sized_block connection closed."); + debug(1, "read_sized_block connection closed."); keep_trying = 0; } else if (nread < 0) { if (errno == EAGAIN) { @@ -2275,38 +2286,6 @@ ssize_t lread_sized_block(buffered_tcp_desc *descriptor, void *buf, size_t count return response; } -// not used right now, but potentially useful for understanding flush requests -void display_flush_requests(int activeOnly, uint32_t currentSeq, uint32_t currentTS, - rtsp_conn_info *conn) { - if (conn->flush_requests == NULL) { - if (activeOnly == 0) - debug(1, "No flush requests."); - } else { - flush_request_t *t = conn->flush_requests; - do { - if (t->flushNow) { - debug(1, "immediate flush to untilSeq: %u, untilTS: %u.", t->flushUntilSeq, - t->flushUntilTS); - } else { - if (activeOnly == 0) - debug(1, "fromSeq: %u, fromTS: %u, to untilSeq: %u, untilTS: %u.", t->flushFromSeq, - t->flushFromTS, t->flushUntilSeq, t->flushUntilTS); - else if ((activeOnly == 1) && - (currentSeq >= - (t->flushFromSeq - - 1))) // the -1 is because you might have to trim the end of the previous block - debug(1, - "fromSeq: %u, fromTS: %u, to untilSeq: %u, untilTS: %u, with currentSeq: %u, " - "currentTS: %u.", - t->flushFromSeq, t->flushFromTS, t->flushUntilSeq, t->flushUntilTS, currentSeq, - currentTS); - } - t = t->next; - } while (t != NULL); - } -} - -// From // https://stackoverflow.com/questions/18862715/how-to-generate-the-aac-adts-elementary-stream-with-android-mediacodec // with thanks! @@ -2362,12 +2341,31 @@ void rtp_buffered_audio_cleanup_handler(__attribute__((unused)) void *arg) { debug(2, "Buffered Audio Receiver Cleanup Start."); rtsp_conn_info *conn = (rtsp_conn_info *)arg; close(conn->buffered_audio_socket); - debug(2, "Connection %d: TCP Buffered Audio port closed: %u.", conn->connection_number, + debug(1, "Connection %d: closing TCP Buffered Audio port: %u.", conn->connection_number, conn->local_buffered_audio_port); conn->buffered_audio_socket = 0; debug(2, "Connection %d: Buffered Audio Receiver Cleanup Done.", conn->connection_number); } +#define MOD_23BIT 0x7FFFFF // 2^23 - 1 + +// Assumes 'a' and 'b' are within 2^22 of each other +int32_t a_minus_b_mod23(uint32_t a, uint32_t b) { + + // Mask to 23 bits + a &= MOD_23BIT; + b &= MOD_23BIT; + + // Compute difference modulo 2^23 + uint32_t diff = (a - b) & MOD_23BIT; + + // Interpret as signed 23-bit value + // If the top bit (bit 22) is set, it's negative + int32_t signed_diff = (diff & 0x400000) ? (diff | 0xFF800000) : diff; + + return signed_diff; +} + void *rtp_buffered_audio_processor(void *arg) { // #include // debug(1, "rtp_buffered_audio_processor PID %d", syscall(SYS_gettid)); @@ -2429,226 +2427,65 @@ void *rtp_buffered_audio_processor(void *arg) { unsigned long long payload_length = 0; uint32_t payload_ssrc = SSRC_NONE; // this is the SSRC of the payload, needed to decide if it should be muted + uint32_t previous_ssrc = SSRC_NONE; uint32_t seq_no = 0; // audio packet number. Initialised to avoid a "possibly uninitialised" warning. - int seqno_valid = 0; uint32_t previous_seqno = 0; - int previous_seqno_valid = 0; + uint16_t sequence_number_for_player = 0; + + uint32_t timestamp = 0; // initialised to avoid a "possibly uninitialised" warning. + uint32_t previous_timestamp = 0; + + uint32_t expected_timestamp = 0; + uint64_t previous_buffer_should_be_time = 0; - int new_buffer_needed = 0; ssize_t nread; + int new_audio_block_needed = 0; // goes true when a block is needed, false one is read in, but + // will be made true by flushing or by playing the block int finished = 0; + uint64_t blocks_read_since_play_began = 0; uint64_t blocks_read = 0; - // not used... - // uint64_t blocks_read_in_sequence = - // 0; // since the start of this sequence -- reset by start or flush - int flush_requested = 0; - uint32_t timestamp = 0; // initialised to avoid a "possibly uninitialised" warning. - int timestamp_valid = 0; - uint32_t previous_timestamp = 0; - int previous_timestamp_valid = 0; - - uint32_t expected_timestamp = 0; - int packets_played_in_this_sequence = 0; + int ap2_immediate_flush_requested = 0; // for diagnostics, probably - // uint32_t first_block_in_this_sequence = 0; uint32_t first_timestamp_in_this_sequence = 0; + int packets_played_in_this_sequence = 0; int play_enabled = 0; - // uint32_t flush_from_timestamp = 0; // initialised to avoid a "possibly uninitialised" warning. - double requested_lead_time = 0.0; // normal lead time minimum -- maybe it should be about 0.1 - - uint32_t old_ssrc = 0; // diagnostic + // double requested_lead_time = 0.0; // normal lead time minimum -- maybe it should be about 0.1 // wait until our timing information is valid - - // debug(1,"rtp_buffered_audio_processor ready."); - while (have_ptp_timing_information(conn) == 0) usleep(1000); reset_buffer(conn); // in case there is any garbage in the player do { - uint16_t last_seqno_put = 0; - int flush_is_delayed = 0; - int flush_newly_requested = 0; - int flush_newly_complete = 0; - int play_newly_stopped = 0; - // are we in in flush mode, or just about to leave it? - pthread_cleanup_debug_mutex_lock(&conn->flush_mutex, 25000, 1); // 25 ms is a long time to wait! + if ((play_enabled == 0) && (conn->ap2_play_enabled != 0)) { + // play newly started + debug(2, "Play started."); + new_audio_block_needed = 1; + blocks_read_since_play_began = 0; + } if ((play_enabled != 0) && (conn->ap2_play_enabled == 0)) { - play_newly_stopped = 1; debug(2, "Play stopped."); - // blocks_read_in_sequence = - // 0; // This may be set to 1 by a flush, so don't zero it during start. - packets_played_in_this_sequence = 0; - new_buffer_needed = 0; + packets_played_in_this_sequence = 0; // not all blocks read are played... #ifdef CONFIG_CONVOLUTION convolver_clear_state(); #endif - } - - if ((play_enabled == 0) && (conn->ap2_play_enabled != 0)) { - // play newly started - debug(2, "Play started."); - } - - play_enabled = conn->ap2_play_enabled; - - uint32_t flushUntilSeq = (conn->ap2_flush_until_sequence_number - 1) & 0x7fffff; - uint32_t flushUntilTS = conn->ap2_flush_until_rtp_timestamp; - - int flush_request_active = 0; - if (conn->ap2_flush_requested) { - if (conn->ap2_flush_from_valid == 0) { // i.e. a flush from right now - if (play_enabled) - debug(2, "Connection %d: immediate flush activated while play_enabled is true.", - conn->connection_number); - flush_request_active = 1; - flush_is_delayed = 0; - } else { - flush_is_delayed = 1; - // flush_from_timestamp = conn->ap2_flush_from_rtp_timestamp; - int32_t blocks_to_start_of_flush = conn->ap2_flush_from_sequence_number - seq_no; - if (blocks_to_start_of_flush <= 0) { - debug(3, "Connection %d: deferred flush activated.", conn->connection_number); - if (play_enabled) - debug(3, "Connection %d: deferred flush activated while play_enabled is true.", - conn->connection_number); - flush_request_active = 1; - } - } - } - // if we are in flush mode - if (flush_request_active) { - if (flush_requested == 0) { - // here, a flush has been newly requested - - debug(3, "Connection %d: Flush requested.", conn->connection_number); - if (conn->ap2_flush_from_valid) { - debug(3, " fromTS: %u", conn->ap2_flush_from_rtp_timestamp); - debug(3, " fromSeq: %u", conn->ap2_flush_from_sequence_number); - debug(3, "--"); - } - debug(3, " untilTS: %u", conn->ap2_flush_until_rtp_timestamp); - debug(3, " untilSeq: %u", conn->ap2_flush_until_sequence_number); - debug(3, "--"); - debug(3, " currentTS_Start: %u", timestamp); - debug(3, " currentSeq: %u", seq_no); - - flush_newly_requested = 1; - } - // blocks_read to ensure seq_no is valid - // (seq_no - flushUntilSeq) & 0x400000 -- if this is 0, seq_no is >= flushUntilSeq - if ((blocks_read != 0) && (((seq_no - flushUntilSeq) & 0x400000) == 0)) { - // we have reached or overshot the flushUntilSeq block - if (seq_no == flushUntilSeq) { // if the flush ended as expected, just before - // conn->ap2_flush_until_sequence_number - debug(3, - "Connection %d: flush request ended normally at %u/%u with " - "ap2_flush_until_sequence_number: %u/%u, " - "flushUntilTS: %u, incoming " - "timestamp: %u", - conn->connection_number, seq_no, seq_no & 0xffff, - conn->ap2_flush_until_sequence_number, - conn->ap2_flush_until_sequence_number & 0xffff, flushUntilTS, timestamp); - } else { - // sometimes, the block number jumps directly to the - // conn->ap2_flush_until_sequence_number, skipping the preceding one (which is what is in - // flushUntilSeq...) - if (seq_no == conn->ap2_flush_until_sequence_number) - debug(3, - "Connection %d: flush request ended normally at %u/%u with " - "ap2_flush_until_sequence_number: %u/%u, " - "flushUntilTS: %u, incoming " - "timestamp: %u", - conn->connection_number, seq_no, seq_no & 0xffff, - conn->ap2_flush_until_sequence_number, - conn->ap2_flush_until_sequence_number & 0xffff, flushUntilTS, timestamp); - else - debug(1, - "Connection %d: flush request ended with a discontinuity at %u/%u with " - "ap2_flush_until_sequence_number: %u/%u, " - "flushUntilTS: %u, incoming " - "timestamp: %u", - conn->connection_number, seq_no, seq_no & 0xffff, - conn->ap2_flush_until_sequence_number, - conn->ap2_flush_until_sequence_number & 0xffff, flushUntilTS, timestamp); - new_buffer_needed = 0; // use this first block in the new sequence - } - conn->ap2_flush_requested = 0; - flush_request_active = 0; - flush_newly_requested = 0; - } - } - - if ((flush_requested) && (flush_request_active == 0)) { - if (play_enabled) - debug(3, "Connection %d: flush completed while play_enabled is true.", - conn->connection_number); - flush_newly_complete = 1; - // blocks_read_in_sequence = - // 1; // the last block always (?) becomes the first block after the flush - } - flush_requested = flush_request_active; // for next time... - - // debug_mutex_unlock(&conn->flush_mutex, 3); - pthread_cleanup_pop(1); // the mutex - - // do this outside the flush mutex - if (flush_newly_complete) { - debug(3, "Connection %d: flush complete.", conn->connection_number); - } - - if (play_newly_stopped != 0) reset_buffer(conn); // stop play ASAP - - if (flush_newly_requested) { - reset_buffer( - conn); // stop play when an immediate flush starts or when a deferred flush is activated. - if (flush_is_delayed == 0) { - debug(3, "Connection %d: immediate buffered audio flush started.", conn->connection_number); - packets_played_in_this_sequence = 0; - } else { - debug(1, "Connection %d: deferred buffered audio flush started.", conn->connection_number); - packets_played_in_this_sequence = 0; - } - } - - // now, if a flush is not requested, see if we need to get a block - if (flush_requested == 0) { - - // is there space in the player thread's buffer system? - unsigned int player_buffer_size, player_buffer_occupancy; - get_audio_buffer_size_and_occupancy(&player_buffer_size, &player_buffer_occupancy, conn); - // debug(1,"player buffer size and occupancy: %u and %u", player_buffer_size, - // player_buffer_occupancy); - - if ((play_enabled != 0) && - (player_buffer_occupancy <= 2 * ((config.audio_backend_buffer_desired_length) * - conn->input_rate / conn->frames_per_packet)) && - (payload_pointer == NULL) && - (flush_requested == 0)) { // must be greater than the lead time - new_buffer_needed = 1; - } else { - usleep(20000); // wait for a while - } } - int64_t lead_time = 0; - - // so we need to read a block, as either a flush or a new buffer is needed... - if ((flush_requested) || (new_buffer_needed)) { + play_enabled = conn->ap2_play_enabled; - // start here to read (and later, decipher) a block. + // now, if get_next_block is non-zero, read a block. We may flush or use it + if (new_audio_block_needed != 0) { // a block is preceded by its length in a uint16_t uint16_t data_len; // here we read from the buffer that our thread has been reading @@ -2657,14 +2494,12 @@ void *rtp_buffered_audio_processor(void *arg) { nread = lread_sized_block(buffered_audio, &data_len, sizeof(data_len), &bytes_remaining_in_buffer); data_len = ntohs(data_len); + // diagnostic if ((conn->ap2_audio_buffer_minimum_size < 0) || (bytes_remaining_in_buffer < (size_t)conn->ap2_audio_buffer_minimum_size)) conn->ap2_audio_buffer_minimum_size = bytes_remaining_in_buffer; - // if (flush_requested) - // debug(1, "read %u bytes for a flush of a block length of %u.", nread, data_len); - if (nread > 0) { // get the block itself // debug(1,"buffered audio packet of size %u detected.", data_len - 2); @@ -2675,120 +2510,193 @@ void *rtp_buffered_audio_processor(void *arg) { (bytes_remaining_in_buffer < (size_t)conn->ap2_audio_buffer_minimum_size)) conn->ap2_audio_buffer_minimum_size = bytes_remaining_in_buffer; // debug(1, "buffered audio packet of size %u received.", nread); - if (nread > 0) { + if (nread > 0) { // got the block - blocks_read++; // note, this doesn't mean they are valid audio blocks - // blocks_read_in_sequence++; + blocks_read++; // note, this doesn't mean they are valid audio blocks + blocks_read_since_play_began++; // 1 means previous seq_no and timestamps are invalid // get the sequence number // see https://en.wikipedia.org/wiki/Real-time_Transport_Protocol#Packet_header // the Marker bit is always set, and it and the remaining 23 bits form the sequence number - if (seqno_valid) { - previous_seqno = seq_no; - previous_seqno_valid = 1; - } - + previous_seqno = seq_no; seq_no = nctohl(&packet[0]) & 0x7FFFFF; - seqno_valid = 1; - // if (flush_requested) - // debug(1, "read %u bytes for a flush of block %u up to block %u.", nread, seq_no, - // conn->ap2_flush_until_sequence_number); + previous_timestamp = timestamp; + timestamp = nctohl(&packet[4]); - // int unexpected_seqno = 0; + previous_ssrc = payload_ssrc; + payload_ssrc = nctohl(&packet[8]); + + if (blocks_read_since_play_began == 1) { + debug(2, "Preparing initial decoding chain for %s.", get_ssrc_name(payload_ssrc)); + prepare_decoding_chain(conn, payload_ssrc); + sequence_number_for_player = + seq_no & 0xffff; // this is arbitrary -- the sequence_number_for_player numbers will + // be sequential irrespective of seq_no jumps... + } + + if (blocks_read_since_play_began > 1) { + + if (payload_ssrc != previous_ssrc) { + if (ssrc_is_recognised(payload_ssrc) == 0) { + debug(2, "Unrecognised SSRC: %u.", payload_ssrc); + } else { + debug(2, + "Reading a block: new encoding: %s, old encoding: %s. Preparing a new " + "decoding chain.", + get_ssrc_name(payload_ssrc), get_ssrc_name(previous_ssrc)); + prepare_decoding_chain(conn, payload_ssrc); + } + } - if (previous_seqno_valid) { uint32_t t_expected_seqno = (previous_seqno + 1) & 0x7fffff; if (t_expected_seqno != seq_no) { - // unexpected_seqno = 1; - if (flush_requested == 0) - debug(1, "seq_no %u differs from expected_seq_no %u.", seq_no, t_expected_seqno); + debug(2, + "reading block %u, the sequence number differs from the expected sequence " + "number %u. The previous sequence number was %u", + seq_no, t_expected_seqno, previous_seqno); + } + uint32_t t_expected_timestamp = + previous_timestamp + get_ssrc_block_length(previous_ssrc); + int32_t diff = timestamp - t_expected_timestamp; + if (diff != 0) { + debug(2, "reading block %u, the timestamp %u differs from expected_timestamp %u.", + seq_no, timestamp, t_expected_timestamp); } } + new_audio_block_needed = 0; // block has been read. + } + } - // timestamp - if (timestamp_valid) { - previous_timestamp_valid = 1; - previous_timestamp = timestamp; - } - - timestamp = nctohl(&packet[4]); + if (nread == 0) { + // nread is 0 -- the port has been closed + debug(1, "buffered audio port closed!"); + finished = 1; + } else if (nread < 0) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data_len .", + errno, errorstring); + finished = 1; + } + } - if (previous_timestamp_valid) { - uint32_t t_expected_timestamp = previous_timestamp + conn->frames_per_packet; - if (t_expected_timestamp != timestamp) - debug(1, "timestamp %u differs from expected_timestamp %u.", timestamp, - t_expected_timestamp); + if (finished == 0) { + pthread_cleanup_debug_mutex_lock(&conn->flush_mutex, 25000, + 1); // 25 ms is a long time to wait! + if (blocks_read != 0) { + if (conn->ap2_immediate_flush_requested != 0) { + if (ap2_immediate_flush_requested == 0) { + debug(2, "immediate flush started at sequence number %u until sequence number of %u.", + seq_no, conn->ap2_immediate_flush_until_sequence_number); } + if ((blocks_read != 0) && (seq_no == conn->ap2_immediate_flush_until_sequence_number)) { + debug(2, "immediate flush complete at seq_no of %u.", seq_no); - // debug(1,"seqno: %u, seqno16: %u, timestamp: %u.", seq_no, seq_no & 0xffff, timestamp); - - payload_ssrc = nctohl(&packet[8]); + conn->ap2_immediate_flush_requested = 0; + ap2_immediate_flush_requested = 0; - if ((payload_ssrc != old_ssrc) && (payload_ssrc != SSRC_NONE) && - (old_ssrc != SSRC_NONE)) { - if (ssrc_is_recognised(payload_ssrc) == 0) - debug(1, "Unrecognised SSRC: %u.", payload_ssrc); - else - debug(3, "Reading a block: new encoding: %s, old encoding: %s.", - get_ssrc_name(payload_ssrc), get_ssrc_name(old_ssrc)); - } - old_ssrc = payload_ssrc; - - prepare_decoding_chain(conn, payload_ssrc); - - // change the (0) to (1) to process blocks with unrecognised SSRCs - if ((1) && (ssrc_is_recognised(payload_ssrc) == 0)) { - unsigned char nonce[12]; - memset(nonce, 0, sizeof(nonce)); - memcpy(nonce + 4, packet + nread - 8, - 8); // front-pad the 8-byte nonce received to get the 12-byte nonce expected - int response = crypto_aead_chacha20poly1305_ietf_decrypt( - m, // m - &payload_length, // mlen_p - NULL, // nsec, - packet + 12, // the ciphertext starts 12 bytes in and is followed by the MAC tag, - nread - (8 + 12), // clen -- the last 8 bytes are the nonce - packet + 4, // authenticated additional data - 8, // authenticated additional data length - nonce, - conn->session_key); // *k - if (response != 0) { - debug( - 1, - "Can't decipher block %u with ssrc \"%s\". Byte length: %u bytes, timestamp: %u", - seq_no, get_ssrc_name(payload_ssrc), payload_length, data_len); - } else { - if (payload_length == 0) { - debug(2, "packet %u: unrecognised SSRC %u, and, when deciphered, has no content.", - seq_no & 0xFFFF, payload_ssrc); - } else { - debug(1, - "packet %u: unrecognised SSRC %u, packet length %d, deciphered length %lld. " - "Raw contents and then deciphered contents follow:", - seq_no & 0xFFFF, payload_ssrc, nread, payload_length); - debug_print_buffer(1, packet, nread); - debug_print_buffer(1, m, payload_length); - } + /* + // turn off all deferred requests. Not sure if this is right... + unsigned int f = 0; + for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) { + conn->ap2_deferred_flush_requests[f].inUse = 0; + conn->ap2_deferred_flush_requests[f].active = 0; } + */ + + } else { + debug(3, "immediate flush of block %u until block %u", seq_no, + conn->ap2_immediate_flush_until_sequence_number); + ap2_immediate_flush_requested = 1; + new_audio_block_needed = 1; // } + } + } + // now, even if an immediate flush has been requested and is active, we still need to process + // deferred flush requests as they may refer to sequences that are going to be purged anyway + + unsigned int f = 0; + for (f = 0; f < MAX_DEFERRED_FLUSH_REQUESTS; f++) { + if (conn->ap2_deferred_flush_requests[f].inUse != 0) { + if ((conn->ap2_deferred_flush_requests[f].flushFromSeq == seq_no) && + (conn->ap2_deferred_flush_requests[f].flushUntilSeq != seq_no)) { + debug(2, + "deferred flush activated: flushFromTS: %12u, flushFromSeq: %12u, " + "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.", + conn->ap2_deferred_flush_requests[f].flushFromTS, + conn->ap2_deferred_flush_requests[f].flushFromSeq, + conn->ap2_deferred_flush_requests[f].flushUntilTS, + conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp); + conn->ap2_deferred_flush_requests[f].active = 1; + new_audio_block_needed = 1; + } + if (conn->ap2_deferred_flush_requests[f].flushUntilSeq == seq_no) { + debug(2, + "deferred flush terminated: flushFromTS: %12u, flushFromSeq: %12u, " + "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.", + conn->ap2_deferred_flush_requests[f].flushFromTS, + conn->ap2_deferred_flush_requests[f].flushFromSeq, + conn->ap2_deferred_flush_requests[f].flushUntilTS, + conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp); + conn->ap2_deferred_flush_requests[f].active = 0; + conn->ap2_deferred_flush_requests[f].inUse = 0; + } else if (a_minus_b_mod23(seq_no, conn->ap2_deferred_flush_requests[f].flushUntilSeq) > + 0) { + // now, do a modulo 2^23 unsigned int calculation to see if we may have overshot the + // flushUntilSeq + debug(2, + "deferred flush terminated due to overshoot at block %u: flushFromTS: %12u, " + "flushFromSeq: %12u, " + "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.", + seq_no, conn->ap2_deferred_flush_requests[f].flushFromTS, + conn->ap2_deferred_flush_requests[f].flushFromSeq, + conn->ap2_deferred_flush_requests[f].flushUntilTS, + conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp); + conn->ap2_deferred_flush_requests[f].active = 0; + conn->ap2_deferred_flush_requests[f].inUse = 0; + debug(2, "immediate flush was %s.", ap2_immediate_flush_requested == 0 ? "off" : "on"); + } else if (conn->ap2_deferred_flush_requests[f].active != 0) { + new_audio_block_needed = 1; + debug(3, + "deferred flush of block: %u. flushFromTS: %12u, flushFromSeq: %12u, " + "flushUntilTS: %12u, flushUntilSeq: %12u, timestamp: %12u.", + seq_no, conn->ap2_deferred_flush_requests[f].flushFromTS, + conn->ap2_deferred_flush_requests[f].flushFromSeq, + conn->ap2_deferred_flush_requests[f].flushUntilTS, + conn->ap2_deferred_flush_requests[f].flushUntilSeq, timestamp); + } + } + } + pthread_cleanup_pop(1); // the mutex + + // now, if the block is not invalidated by the flush code, see if we need + // to decode it and pass it to the player + if (new_audio_block_needed == 0) { + // is there space in the player thread's buffer system? + unsigned int player_buffer_size, player_buffer_occupancy; + get_audio_buffer_size_and_occupancy(&player_buffer_size, &player_buffer_occupancy, conn); + // debug(1,"player buffer size and occupancy: %u and %u", player_buffer_size, + // player_buffer_occupancy); + + // If we are playing and there is room in the player buffer, go ahead and decode the block + // and send it to the player. Otherwise, keep the block and sleep for a while. + if ((play_enabled != 0) && + (player_buffer_occupancy <= 2 * ((config.audio_backend_buffer_desired_length) * + conn->input_rate / conn->frames_per_packet))) { uint64_t buffer_should_be_time; frame_to_local_time(timestamp, &buffer_should_be_time, conn); - lead_time = buffer_should_be_time - get_absolute_time_in_ns(); - payload_pointer = NULL; - payload_length = 0; - - // decipher it only if it's needed, i.e. if it is not to be discarded - // if a new buffer is needed, the block needs to be deciphered + // try to identify blocks that are timed to before the last buffer, and drop 'em + int64_t time_from_last_buffer_time = + buffer_should_be_time - previous_buffer_should_be_time; - // if ((new_buffer_needed) || ((flush_requested != 0) && (unexpected_seqno != 0))) { - if (new_buffer_needed) { - // debug(1,"nbn seqno: %u, seqno16: %u, timestamp: %u.", seq_no, seq_no & 0xffff, - // timestamp); + if ((packets_played_in_this_sequence == 0) || (time_from_last_buffer_time > 0)) { + int64_t lead_time = buffer_should_be_time - get_absolute_time_in_ns(); + payload_length = 0; if (ssrc_is_recognised(payload_ssrc) != 0) { prepare_decoding_chain(conn, payload_ssrc); unsigned long long new_payload_length = 0; @@ -2849,121 +2757,92 @@ void *rtp_buffered_audio_processor(void *arg) { addADTStoPacket(payload_pointer, payload_length, conn->input_rate, channelConfiguration); } - // debug(1, "creating seqno %u, seqno16 %u, with timestamp %u, leadtime %f.", - // seq_no, - // seq_no & 0xffff, timestamp, lead_time * 0.000000001); + int mute = + ((packets_played_in_this_sequence == 0) && (ssrc_is_aac(payload_ssrc))); + if (mute) { + debug(2, "Connection %d: muting first AAC block -- block %u -- timestamp %u.", + conn->connection_number, seq_no, timestamp); + } + int32_t timestamp_difference = 0; + if (packets_played_in_this_sequence == 0) { + // first_block_in_this_sequence = seq_no; + first_timestamp_in_this_sequence = timestamp; + debug(2, + "Connection %d: " + "first block %u, first timestamp %u.", + conn->connection_number, seq_no, timestamp); + } else { + timestamp_difference = timestamp - expected_timestamp; + if (timestamp_difference != 0) { + debug(2, + "Connection %d: " + "unexpected timestamp in block %u. Actual: %u, expected: %u " + "difference: %d, " + "%f ms. " + "Positive means later, i.e. a gap. First timestamp was %u, payload " + "type: \"%s\".", + conn->connection_number, seq_no, timestamp, expected_timestamp, + timestamp_difference, 1000.0 * timestamp_difference / conn->input_rate, + first_timestamp_in_this_sequence, get_ssrc_name(payload_ssrc)); + // mute the first packet after a discontinuity + if (ssrc_is_aac(payload_ssrc)) { + debug(2, + "Connection %d: muting first AAC block -- block %u -- following a " + "timestamp discontinuity, timestamp %u.", + conn->connection_number, seq_no, timestamp); + mute = 1; + } + } + } + int skip_this_block = 0; + if (timestamp_difference < 0) { + int32_t abs_timestamp_difference = -timestamp_difference; + if ((size_t)abs_timestamp_difference > get_ssrc_block_length(payload_ssrc)) { + skip_this_block = 1; + debug(2, + "skipping block %u because it was too far in the past. Timestamp " + "difference: %d, length of block: %u.", + seq_no, timestamp_difference, get_ssrc_block_length(payload_ssrc)); + } + } + if (skip_this_block == 0) { + uint32_t packet_size = player_put_packet( + payload_ssrc, sequence_number_for_player, timestamp, payload_pointer, + payload_length, mute, timestamp_difference, conn); + debug(3, "block %u, timestamp %u, length %u sent to the player.", seq_no, + timestamp, packet_size); + sequence_number_for_player++; // simply increment + expected_timestamp = timestamp + packet_size; // for the next time + packets_played_in_this_sequence++; + } + } + } else { + debug(3, + "skipped deciphering block %u with timestamp %u because its lead time is " + "out of range at %f " + "seconds.", + seq_no, timestamp, lead_time * 1.0E-9); + uint32_t currentAnchorRTP = 0; + uint64_t currentAnchorLocalTime = 0; + if (get_ptp_anchor_local_time_info(conn, ¤tAnchorRTP, + ¤tAnchorLocalTime) == clock_ok) { + debug(3, "anchorRTP: %u, anchorLocalTime: % " PRIu64 ".", currentAnchorRTP, + currentAnchorLocalTime); + } else { + debug(3, "Clock not okay"); } } } else { - debug(3, "Unrecognised or invalid ssrc: %s, packet length %d.", - get_ssrc_name(payload_ssrc), nread); + debug(2, "Unrecognised or invalid ssrc: %s.", get_ssrc_name(payload_ssrc)); } } else { - if (seq_no % 10 == 0) - debug(3, "Dropping seqno %u, seqno16 %u, with timestamp %u, leadtime %f.", seq_no, - seq_no & 0xffff, timestamp, lead_time * 0.000000001); + debug(1, "dropping buffer that should have played before the last actually played."); } + new_audio_block_needed = 1; // the block has been used up and is no longer current + } else { + usleep(20000); // wait for a while } } - - if (nread == 0) { - // nread is 0 -- the port has been closed - debug(3, "buffered audio port closed!"); - finished = 1; - } else if (nread < 0) { - char errorstring[1024]; - strerror_r(errno, (char *)errorstring, sizeof(errorstring)); - debug(1, "error in rtp_buffered_audio_processor %d: \"%s\". Could not recv a data_len .", - errno, errorstring); - finished = 1; - } - } - - if ((play_enabled != 0) && (payload_pointer != NULL) && (finished == 0) && - (new_buffer_needed != 0)) { - - // it seems that some garbage blocks can be left after the flush, so - // only accept them if they have sensible lead times - if ((lead_time < (int64_t)30000000000L) && (lead_time >= 0)) { - // if it's the very first block (thus no priming needed) - // if ((blocks_read == 1) || (blocks_read_in_sequence > 3)) { - if ((lead_time >= (int64_t)(requested_lead_time * 1000000000L)) || - (packets_played_in_this_sequence != 0)) { - int mute = ((packets_played_in_this_sequence == 0) && (ssrc_is_aac(payload_ssrc))); - // if (mute) { - // debug(1, "Muting first AAC block, timestamp %u.", timestamp); - // } - int32_t timestamp_difference = 0; - if (packets_played_in_this_sequence == 0) { - // first_block_in_this_sequence = seq_no; - first_timestamp_in_this_sequence = timestamp; - } else { - timestamp_difference = timestamp - expected_timestamp; - if (timestamp_difference != 0) { - debug( - 1, - "Connection %d: " - "unexpected timestamp in packet %u. Actual: %u, expected: %u difference: %d, " - "%f ms. " - "Positive means later, i.e. a gap. First timestamp was %u, payload type: \"%s\".", - conn->connection_number, seq_no & 0xffff, timestamp, expected_timestamp, - timestamp_difference, 1000.0 * timestamp_difference / conn->input_rate, - first_timestamp_in_this_sequence, get_ssrc_name(payload_ssrc)); - // mute the first packet after a discontinuity - if (ssrc_is_aac(payload_ssrc)) { - // debug(1, "Muting first AAC block after a timestamp discontinuity, timestamp %u.", - // timestamp); - mute = 1; - } - } - } - - // frames_per_packet should be set during setup. - // if ((timestamp_difference >= 0) || (conn->frames_per_packet + timestamp_difference > - // 0)) { - if (timestamp_difference < 0) - debug(3, - "The next %d frames are late, even though the sequence numbers are in line. The " - "\"late\" frames should be decoded to prime the AAC decoder and then dropped.", - -timestamp_difference); - uint32_t packet_size = - player_put_packet(payload_ssrc, seq_no & 0xFFFF, timestamp, payload_pointer, - payload_length, mute, timestamp_difference, conn); - // debug(1, "put packet: %u, packets_played_in_this_sequence is %d.", seq_no & 0xFFFF, - // packets_played_in_this_sequence); - if (last_seqno_put != 0) { - uint16_t seqno_expected = last_seqno_put + 1; - if (seqno_expected != (seq_no & 0xffff)) - debug(1, "Packet puts not in sequence. Expected: %u, actual: %u.", seqno_expected, - (seq_no & 0xffff)); - } - last_seqno_put = seq_no & 0xffff; - expected_timestamp = timestamp + packet_size; // for the next time - new_buffer_needed = 0; - packets_played_in_this_sequence++; - /* - } else { - // expected_seq_no = seq_no + 1; - debug(1, - "Connection %d: " - "dropping packet %u, seq_no %u, that arrived %.3f ms later than " - "expected due to a discontinuity in the timestamp sequence. First " - "timestamp was %u.", - conn->connection_number, seq_no, timestamp, - -1000.0 * timestamp_difference / conn->input_rate, - first_timestamp_in_this_sequence); - } - */ - } - // } - } else { - debug(3, - "Dropping packet %u from seqno %u, seqno16 %u, with out-of-range lead_time: %.3f " - "seconds.", - timestamp, seq_no, seq_no & 0xffff, 0.000000001 * lead_time); - expected_timestamp = timestamp + conn->frames_per_packet; // for the next time - } - payload_pointer = NULL; // payload consumed } } while (finished == 0); debug(2, "Buffered Audio Receiver RTP thread \"normal\" exit."); diff --git a/rtsp.c b/rtsp.c index 966a9662..a101bdab 100644 --- a/rtsp.c +++ b/rtsp.c @@ -103,7 +103,6 @@ #include "FFTConvolver/convolver.h" #endif - #ifdef CONFIG_DBUS_INTERFACE #include "dbus-service.h" #endif @@ -1196,11 +1195,11 @@ enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, rtsp_mes } else if (nread == 0) { if (errno == 0) { // a blocking read that returns zero means eof -- implies connection closed by client - debug(3, "Connection %d closed by client.", conn->connection_number); + debug(1, "Connection %d closed by client.", conn->connection_number); } else { char errorstring[1024]; strerror_r(errno, (char *)errorstring, sizeof(errorstring)); - debug(3, "Connection %d closed by client with error %d: \"%s\".", + debug(1, "Connection %d closed by client with error %d: \"%s\".", conn->connection_number, errno, (char *)errorstring); } reply = rtsp_read_request_response_channel_closed; @@ -1892,40 +1891,46 @@ void handle_flushbuffered(rtsp_conn_info *conn, rtsp_message *req, rtsp_message } debug_mutex_lock(&conn->flush_mutex, 1000, 1); - // a flush with from... components will not be followed by a setanchor (i.e. a play) - // if it's a flush that will be followed by a setanchor (i.e. a play) then stop play now. - if (flushFromValid == 0) - conn->ap2_play_enabled = 0; - - // now, if it's an immediate flush, replace the existing request, if any - // but it if's a deferred flush and there is an existing deferred request, - // only update the flushUntil stuff -- that seems to preserve - // the intended semantics - - // so, always replace these - conn->ap2_flush_until_sequence_number = flushUntilSeq & 0x7fffff; - conn->ap2_flush_until_rtp_timestamp = flushUntilTS; - - if ((conn->ap2_flush_requested != 0) && (conn->ap2_flush_from_valid != 0) && - (flushFromValid != 0)) { - // if there is a request already, and it's a deferred request, and the current request is also - // deferred... do nothing! -- leave the starting point in place. Yeah, yeah, we know de - // Morgan's Law, but this seems clearer + + if (flushFromValid == 0) { + // an immediate flush is requested + conn->ap2_immediate_flush_requested = 1; + conn->ap2_immediate_flush_until_sequence_number = flushUntilSeq & 0x7fffff; + conn->ap2_immediate_flush_until_rtp_timestamp = flushUntilTS; + debug(2, + "Connection %d: immediate flush request created: flushUntilTS: %u, flushUntilSeq: %u.", + conn->connection_number, flushUntilTS, flushUntilSeq & 0x7fffff); + conn->ap2_play_enabled = 0; // stop trying to play audio + ptp_send_control_message_string( + "P"); // signify clock no longer valid and will be restarted by a subsequent play } else { - conn->ap2_flush_from_sequence_number = flushFromSeq & 0x7fffff; - conn->ap2_flush_from_rtp_timestamp = flushFromTS; + // look for a record slot that isn't in use + unsigned int i = 0; + unsigned int found = 0; + while ((i < MAX_DEFERRED_FLUSH_REQUESTS) && (found == 0)) { + if (conn->ap2_deferred_flush_requests[i].inUse == 0) { + found = 1; + } else { + i++; + } + } + if (found != 0) { + conn->ap2_deferred_flush_requests[i].inUse = 1; + conn->ap2_deferred_flush_requests[i].active = 0; + conn->ap2_deferred_flush_requests[i].flushFromSeq = flushFromSeq & 0x7fffff; + conn->ap2_deferred_flush_requests[i].flushFromTS = flushFromTS; + conn->ap2_deferred_flush_requests[i].flushUntilSeq = flushUntilSeq & 0x7fffff; + conn->ap2_deferred_flush_requests[i].flushUntilTS = flushUntilTS; + debug(2, + "Connection %d: deferred flush request created: flushFromSeq: %u, flushUntilSeq: %u.", + conn->connection_number, flushFromSeq, flushUntilSeq); + } else { + debug(1, "Connection %d: no more room for deferred flush request records", + conn->connection_number); + } } - conn->ap2_flush_from_valid = flushFromValid; - conn->ap2_flush_requested = 1; - debug_mutex_unlock(&conn->flush_mutex, 3); - - if (flushFromValid) - debug(2, "Deferred Flush Requested"); - else - debug(2, "Immediate Flush Requested"); - plist_free(messagePlist); } @@ -2077,7 +2082,7 @@ struct pairings { uint8_t public_key[32]; struct pairings *next; -} *pairings; +} * pairings; static struct pairings *pairing_find(const char *device_id) { for (struct pairings *pairing = pairings; pairing; pairing = pairing->next) { @@ -5364,7 +5369,7 @@ static void *rtsp_conversation_thread_func(void *pconn) { hdr = msg_get_header(req, "CSeq"); if (hdr) msg_add_header(resp, "CSeq", hdr); - // msg_add_header(resp, "Audio-Jack-Status", "connected; type=analog"); + // msg_add_header(resp, "Audio-Jack-Status", "connected; type=analog"); #ifdef CONFIG_AIRPLAY_2 char server_string[128]; snprintf(server_string, sizeof(server_string), "AirTunes/%s", config.srcvers); diff --git a/shairport.c b/shairport.c index 7d895e2a..20cb0193 100644 --- a/shairport.c +++ b/shairport.c @@ -1266,7 +1266,8 @@ int parse_options(int argc, char **argv) { if ((value >= 1) && (value <= 64)) { config.convolution_threads = value; } else { - warn("Invalid value \"%u\" for \"convolution_thread_pool_size\". It must be between 1 and 64." + warn("Invalid value \"%u\" for \"convolution_thread_pool_size\". It must be between 1 " + "and 64." "The default of %u will be used instead.", value, config.convolution_threads); }