From: Mike Brady Date: Sat, 12 May 2018 14:35:51 +0000 (+0100) Subject: Add a read-write mutex to protect a player thread when it's being created or deleted... X-Git-Tag: 3.2RC8~2^2~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b6fcabca0d829a2b9ea0f912a2a7b9ac18748406;p=thirdparty%2Fshairport-sync.git Add a read-write mutex to protect a player thread when it's being created or deleted; make player_put_packet get a read lock. --- diff --git a/player.c b/player.c index 062e63b5..1950c80b 100644 --- a/player.c +++ b/player.c @@ -464,147 +464,155 @@ static void free_audio_buffers(rtsp_conn_info *conn) { void player_put_packet(seq_t seqno, uint32_t actual_timestamp, int64_t timestamp, uint8_t *data, int len, rtsp_conn_info *conn) { + pthread_rwlock_rdlock(&conn->player_thread_lock); + if (conn->player_thread != NULL) { - // all timestamps are done at the output rate - // the "actual_timestamp" is the one that comes in the packet, and is carried over for debugging - // and checking only. + // all timestamps are done at the output rate + // the "actual_timestamp" is the one that comes in the packet, and is carried over for debugging + // and checking only. - int64_t ltimestamp = timestamp * conn->output_sample_ratio; + int64_t ltimestamp = timestamp * conn->output_sample_ratio; - // ignore a request to flush that has been made before the first packet... - if (conn->packet_count == 0) { - pthread_mutex_lock(&conn->flush_mutex); - conn->flush_requested = 0; - conn->flush_rtp_timestamp = 0; - pthread_mutex_unlock(&conn->flush_mutex); - } + // ignore a request to flush that has been made before the first packet... + if (conn->packet_count == 0) { + pthread_mutex_lock(&conn->flush_mutex); + conn->flush_requested = 0; + conn->flush_rtp_timestamp = 0; + pthread_mutex_unlock(&conn->flush_mutex); + } - pthread_mutex_lock(&conn->ab_mutex); - conn->packet_count++; - conn->time_of_last_audio_packet = get_absolute_time_in_fp(); - if (conn->connection_state_to_output) { // if we are supposed to be processing these packets - - // if (flush_rtp_timestamp != 0) - // debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp); - - if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) { - debug(3, - "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to " - "timestamp: %lld.", - seqno, ltimestamp, conn->flush_rtp_timestamp); - } else { - if ((conn->flush_rtp_timestamp != 0x0) && - (ltimestamp > conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time - conn->flush_rtp_timestamp = 0x0; + pthread_mutex_lock(&conn->ab_mutex); + conn->packet_count++; + conn->time_of_last_audio_packet = get_absolute_time_in_fp(); + if (conn->connection_state_to_output) { // if we are supposed to be processing these packets - abuf_t *abuf = 0; + // if (flush_rtp_timestamp != 0) + // debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp); - if (!conn->ab_synced) { - debug(3, "syncing to seqno %u.", seqno); - conn->ab_write = seqno; - conn->ab_read = seqno; - conn->ab_synced = 1; - } + if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) { + debug(3, + "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to " + "timestamp: %lld.", + seqno, ltimestamp, conn->flush_rtp_timestamp); + } else { + if ((conn->flush_rtp_timestamp != 0x0) && + (ltimestamp > + conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time + conn->flush_rtp_timestamp = 0x0; + + abuf_t *abuf = 0; + + if (!conn->ab_synced) { + debug(3, "syncing to seqno %u.", seqno); + conn->ab_write = seqno; + conn->ab_read = seqno; + conn->ab_synced = 1; + } - // here, we should check for missing frames - if (!conn->ab_buffering) { - int j; - for (j = 1; j <= 8; j++) { - // check j times, after a short period of has elapsed, assuming 352 frames per packet - int back_step = (((250 * 44100) / 352) / 1000) * (j); // approx 250 ms intervals - int k; - for (k = -2; k <= 2; k++) { - if (back_step < - seq_diff(conn->ab_read, conn->ab_write, - conn->ab_read)) { // if it's within the range of frames in use... - int item_to_check = (conn->ab_write - back_step) & 0xffff; - seq_t next = item_to_check; - abuf_t *check_buf = conn->audio_buffer + BUFIDX(next); - if ((!check_buf->ready) && - (check_buf->resend_level < - j)) { // prevent multiple requests from the same level of lookback - check_buf->resend_level = j; - if (config.disable_resend_requests == 0) { - rtp_request_resend(next, 1, conn); - if (j >= 6) - debug(2, "Resend request level #%d for packet %u in range %u to %u.", j, next, - conn->ab_read, conn->ab_write); - conn->resend_requests++; + // here, we should check for missing frames + if (!conn->ab_buffering) { + int j; + for (j = 1; j <= 8; j++) { + // check j times, after a short period of has elapsed, assuming 352 frames per packet + int back_step = (((250 * 44100) / 352) / 1000) * (j); // approx 250 ms intervals + int k; + for (k = -2; k <= 2; k++) { + if (back_step < + seq_diff(conn->ab_read, conn->ab_write, + conn->ab_read)) { // if it's within the range of frames in use... + int item_to_check = (conn->ab_write - back_step) & 0xffff; + seq_t next = item_to_check; + abuf_t *check_buf = conn->audio_buffer + BUFIDX(next); + if ((!check_buf->ready) && + (check_buf->resend_level < + j)) { // prevent multiple requests from the same level of lookback + check_buf->resend_level = j; + if (config.disable_resend_requests == 0) { + rtp_request_resend(next, 1, conn); + if (j >= 6) + debug(2, "Resend request level #%d for packet %u in range %u to %u.", j, next, + conn->ab_read, conn->ab_write); + conn->resend_requests++; + } } } } } } - } - if (conn->ab_write == seqno) { // expected packet - abuf = conn->audio_buffer + BUFIDX(seqno); - conn->ab_write = SUCCESSOR(seqno); - } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected - // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8) - // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and seqno: - // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno); - int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read); - if (gap <= 0) - debug(1, "Unexpected gap size: %d.", gap); - int i; - for (i = 0; i < gap; i++) { - abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i)); - abuf->ready = 0; // to be sure, to be sure - abuf->resend_level = 0; - abuf->timestamp = 0; - abuf->given_timestamp = 0; - abuf->sequence_number = 0; + if (conn->ab_write == seqno) { // expected packet + abuf = conn->audio_buffer + BUFIDX(seqno); + conn->ab_write = SUCCESSOR(seqno); + } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected + // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8) + // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and seqno: + // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno); + int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read); + if (gap <= 0) + debug(1, "Unexpected gap size: %d.", gap); + int i; + for (i = 0; i < gap; i++) { + abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i)); + abuf->ready = 0; // to be sure, to be sure + abuf->resend_level = 0; + abuf->timestamp = 0; + abuf->given_timestamp = 0; + abuf->sequence_number = 0; + } + // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write); + abuf = conn->audio_buffer + BUFIDX(seqno); + // rtp_request_resend(ab_write, gap); + // resend_requests++; + conn->ab_write = SUCCESSOR(seqno); + } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played + conn->late_packets++; + abuf = conn->audio_buffer + BUFIDX(seqno); + /* + if (abuf->ready) + debug(1,"Late apparently duplicate packet received that is %d packets + late.",seq_diff(seqno, conn->ab_write, conn->ab_read)); + else + debug(1,"Late packet received that is %d packets late.",seq_diff(seqno, + conn->ab_write, conn->ab_read)); + */ + } else { // too late. + + // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno, + // conn->ab_write, conn->ab_read)); + conn->too_late_packets++; } - // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write); - abuf = conn->audio_buffer + BUFIDX(seqno); - // rtp_request_resend(ab_write, gap); - // resend_requests++; - conn->ab_write = SUCCESSOR(seqno); - } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played - conn->late_packets++; - abuf = conn->audio_buffer + BUFIDX(seqno); - /* - if (abuf->ready) - debug(1,"Late apparently duplicate packet received that is %d packets - late.",seq_diff(seqno, conn->ab_write, conn->ab_read)); - else - debug(1,"Late packet received that is %d packets late.",seq_diff(seqno, - conn->ab_write, conn->ab_read)); - */ - } else { // too late. - - // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno, - // conn->ab_write, conn->ab_read)); - conn->too_late_packets++; - } - // pthread_mutex_unlock(&ab_mutex); - - if (abuf) { - int datalen = conn->max_frames_per_packet; - if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) { - abuf->ready = 1; - abuf->length = datalen; - abuf->timestamp = ltimestamp; - abuf->given_timestamp = actual_timestamp; - abuf->sequence_number = seqno; - } else { - debug(1, "Bad audio packet detected and discarded."); - abuf->ready = 0; - abuf->resend_level = 0; - abuf->timestamp = 0; - abuf->given_timestamp = 0; - abuf->sequence_number = 0; + // pthread_mutex_unlock(&ab_mutex); + + if (abuf) { + int datalen = conn->max_frames_per_packet; + if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) { + abuf->ready = 1; + abuf->length = datalen; + abuf->timestamp = ltimestamp; + abuf->given_timestamp = actual_timestamp; + abuf->sequence_number = seqno; + } else { + debug(1, "Bad audio packet detected and discarded."); + abuf->ready = 0; + abuf->resend_level = 0; + abuf->timestamp = 0; + abuf->given_timestamp = 0; + abuf->sequence_number = 0; + } } - } - // pthread_mutex_lock(&ab_mutex); + // pthread_mutex_lock(&ab_mutex); + } + int rc = pthread_cond_signal(&conn->flowcontrol); + if (rc) + debug(1, "Error signalling flowcontrol."); } - int rc = pthread_cond_signal(&conn->flowcontrol); - if (rc) - debug(1, "Error signalling flowcontrol."); + pthread_mutex_unlock(&conn->ab_mutex); + } else { + debug(1, + "No player thread while adding a player_put_packet calle was made -- packet discarded."); } - pthread_mutex_unlock(&conn->ab_mutex); + pthread_rwlock_unlock(&conn->player_thread_lock); } int32_t rand_in_range(int32_t exclusive_range_limit) { @@ -749,8 +757,6 @@ static inline void process_sample(int32_t sample, char **outp, enum sps_format_t } // get the next frame, when available. return 0 if underrun/stream reset. -// remember that this function will be running in the audio receiver thread. - static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { // int16_t buf_fill; uint64_t local_time_now;