From: Jaroslav Kysela Date: Wed, 30 Dec 2015 10:14:49 +0000 (+0100) Subject: timeshift: write all 'temporary' packets (outside storage) when going to live X-Git-Tag: v4.2.1~1270 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=83ce30a2e1ecd90aa53e78162104d029dd1182aa;p=thirdparty%2Ftvheadend.git timeshift: write all 'temporary' packets (outside storage) when going to live --- diff --git a/src/timeshift.c b/src/timeshift.c index bd5d1865a..94ff2af83 100644 --- a/src/timeshift.c +++ b/src/timeshift.c @@ -282,6 +282,35 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt ) } } +void +timeshift_packets_clone( timeshift_t *ts, struct streaming_message_queue *dst ) +{ + streaming_message_t *lowest, *sm; + struct streaming_message_queue *sq, *backlogs; + int i; + + lock_assert(&ts->buffering_mutex); + + /* init temporary queues */ + backlogs = alloca(ts->backlog_max * sizeof(*backlogs)); + for (i = 0; i < ts->backlog_max; i++) + TAILQ_INIT(&backlogs[i]); + /* push to destination (pts sorted) */ + while (1) { + lowest = NULL; + for (i = 0; i < ts->backlog_max; i++) { + sq = &backlogs[i]; + sm = TAILQ_FIRST(sq); + if (sm && (lowest == NULL || lowest->sm_time > sm->sm_time)) + lowest = sm; + } + if (!lowest) + break; + TAILQ_REMOVE(sq, lowest, sm_link); + TAILQ_INSERT_TAIL(dst, lowest, sm_link); + } +} + /* * Receive data */ @@ -312,6 +341,8 @@ static void timeshift_input ts->state = TS_LIVE; } + pthread_mutex_lock(&ts->buffering_mutex); + /* Pass-thru */ if (ts->state <= TS_LIVE) { if (sm->sm_type == SMT_START) { @@ -367,6 +398,8 @@ static void timeshift_input } pktcont: + pthread_mutex_unlock(&ts->buffering_mutex); + /* Exit/Stop */ if (exit) { timeshift_write_exit(ts->rd_pipe.wr); @@ -455,6 +488,7 @@ streaming_target_t *timeshift_create TAILQ_INIT(&ts->backlog[i]); pthread_mutex_init(&ts->rdwr_mutex, NULL); pthread_mutex_init(&ts->state_mutex, NULL); + pthread_mutex_init(&ts->buffering_mutex, NULL); /* Initialise output */ tvh_pipe(O_NONBLOCK, &ts->rd_pipe); diff --git a/src/timeshift/private.h b/src/timeshift/private.h index 7cd409367..7a3165b6a 100644 --- a/src/timeshift/private.h +++ b/src/timeshift/private.h @@ -96,6 +96,7 @@ typedef struct timeshift { int64_t ref_time; ///< Start time in us (monoclock) struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting int backlog_max;///< Maximum component index in backlog + pthread_mutex_t buffering_mutex;///< Protect backlog / write queues enum { TS_INIT, @@ -128,6 +129,11 @@ typedef struct timeshift { extern uint64_t timeshift_total_size; extern uint64_t timeshift_total_ram_size; +/* + * + */ +void timeshift_packets_clone ( timeshift_t *ts, struct streaming_message_queue *dst ); + /* * Write functions */ @@ -142,6 +148,7 @@ ssize_t timeshift_write_exit ( int fd ); ssize_t timeshift_write_eof ( timeshift_file_t *tsf ); void timeshift_writer_flush ( timeshift_t *ts ); +void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst ); /* * Threads diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index 405ab575a..c8e1faf25 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -426,6 +426,24 @@ static int _timeshift_flush_to_live return 0; } +/* + * Write packets from temporary queues + */ +static void _timeshift_write_queues + ( timeshift_t *ts ) +{ + struct streaming_message_queue sq; + streaming_message_t *sm; + + TAILQ_INIT(&sq); + timeshift_writer_clone(ts, &sq); + timeshift_packets_clone(ts, &sq); + while ((sm = TAILQ_FIRST(&sq)) != NULL) { + TAILQ_REMOVE(&sq, sm, sm_link); + streaming_target_deliver2(ts->output, sm); + } +} + /* * Send the status message */ @@ -850,11 +868,21 @@ void *timeshift_reader ( void *p ) streaming_target_deliver2(ts->output, ctrl); ctrl = NULL; tvhtrace("timeshift", "reader - set TS_LIVE"); - ts->state = TS_LIVE; + + /* Critical section - protect write / backlog queues */ + pthread_mutex_lock(&ts->buffering_mutex); /* Flush timeshift buffer to live */ - if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1) + if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1) { + pthread_mutex_unlock(&ts->buffering_mutex); break; + } + + /* Flush write / backlog queues */ + _timeshift_write_queues(ts); + + ts->state = TS_LIVE; + pthread_mutex_unlock(&ts->buffering_mutex); /* Close file (if open) */ if (cur_file && cur_file->rfd >= 0) { diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c index 40355bcf0..3107c5549 100644 --- a/src/timeshift/timeshift_writer.c +++ b/src/timeshift/timeshift_writer.c @@ -384,3 +384,15 @@ void timeshift_writer_flush ( timeshift_t *ts ) pthread_mutex_unlock(&sq->sq_mutex); } +void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst ) +{ + streaming_message_t *sm, *sm2; + streaming_queue_t *sq = &ts->wr_queue; + + pthread_mutex_lock(&sq->sq_mutex); + TAILQ_FOREACH(sm, &sq->sq_queue, sm_link) { + sm2 = streaming_msg_clone(sm); + TAILQ_INSERT_TAIL(dst, sm2, sm_link); + } + pthread_mutex_unlock(&sq->sq_mutex); +}