From: Jaroslav Kysela Date: Sun, 3 Jan 2016 10:19:27 +0000 (+0100) Subject: timeshift: reduce and improve the logic - move more packet handling logic to writer... X-Git-Tag: v4.2.1~1240 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ea752fedef32dfc9501e28df442fa333f4ce0161;p=thirdparty%2Ftvheadend.git timeshift: reduce and improve the logic - move more packet handling logic to writer thread --- diff --git a/src/packet.h b/src/packet.h index fb12be1c5..14d3c6c69 100644 --- a/src/packet.h +++ b/src/packet.h @@ -56,9 +56,6 @@ typedef struct th_pkt { uint8_t pkt_componentindex; uint8_t pkt_frametype; uint8_t pkt_field; // Set if packet is only a half frame (a field) -#if ENABLE_TIMESHIFT - uint8_t pkt_delivered; -#endif uint8_t pkt_channels; uint8_t pkt_sri; diff --git a/src/timeshift.c b/src/timeshift.c index 19e956426..1ad81716c 100644 --- a/src/timeshift.c +++ b/src/timeshift.c @@ -233,7 +233,7 @@ timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm ) } static void -timeshift_packet_flush ( timeshift_t *ts, int64_t time, int deliver ) +timeshift_packet_flush ( timeshift_t *ts, int64_t time ) { streaming_message_t *lowest, *sm; struct streaming_message_queue *sq; @@ -251,15 +251,12 @@ timeshift_packet_flush ( timeshift_t *ts, int64_t time, int deliver ) if (!lowest) break; TAILQ_REMOVE(sq, lowest, sm_link); - if (deliver) - timeshift_packet_deliver(ts, lowest); - else - streaming_msg_free(lowest); + timeshift_packet_deliver(ts, lowest); } } static void -timeshift_packet( timeshift_t *ts, th_pkt_t *pkt, int deliver ) +timeshift_packet( timeshift_t *ts, th_pkt_t *pkt ) { streaming_message_t *sm; int64_t time; @@ -272,61 +269,16 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt, int deliver ) time = ts_rescale(pkt->pkt_pts, 1000000); if (time > ts->last_time) { ts->last_time = time; - timeshift_packet_flush(ts, time, deliver); + timeshift_packet_flush(ts, time); } sm->sm_time = time; if (time + MAX_TIME_DELTA < ts->last_time) { - if (deliver) - timeshift_packet_deliver(ts, sm); + timeshift_packet_deliver(ts, sm); } else { if (pkt->pkt_componentindex >= ts->backlog_max) ts->backlog_max = pkt->pkt_componentindex + 1; TAILQ_INSERT_TAIL(&ts->backlog[pkt->pkt_componentindex], sm, sm_link); - pkt->pkt_delivered = ts->state <= TS_LIVE; - } -} - -void -timeshift_packets_clone - ( timeshift_t *ts, struct streaming_message_queue *dst, int delivered ) -{ - streaming_message_t *lowest, *sm, *sm2; - struct streaming_message_queue *sq, *sq2, *backlogs; - th_pkt_t *pkt; - int i; - - lock_assert(&ts->state_mutex); - - /* init temporary queues and copy the backlog data */ - backlogs = alloca(ts->backlog_max * sizeof(*backlogs)); - for (i = 0; i < ts->backlog_max; i++) { - sq = &backlogs[i]; - sq2 = &ts->backlog[i]; - TAILQ_INIT(sq); - TAILQ_FOREACH(sm, sq2, sm_link) { - if (!delivered) { - pkt = sm->sm_data; - if (pkt->pkt_delivered) - continue; - } - sm2 = streaming_msg_clone(sm); - TAILQ_INSERT_TAIL(sq, sm2, sm_link); - } - } - /* push to destination (pts sorted) */ - while (1) { - lowest = NULL; - for (i = 0; i < ts->backlog_max; i++) { - sq = &backlogs[i]; - sm = TAILQ_FIRST(sq); - if (sm && (lowest == NULL || lowest->sm_time > sm->sm_time)) - lowest = sm; - } - if (!lowest) - break; - TAILQ_REMOVE(sq, lowest, sm_link); - TAILQ_INSERT_TAIL(dst, lowest, sm_link); } } @@ -336,31 +288,24 @@ timeshift_packets_clone static void timeshift_input ( void *opaque, streaming_message_t *sm ) { - int exit = 0, type = sm->sm_type; + int type = sm->sm_type; timeshift_t *ts = opaque; th_pkt_t *pkt = sm->sm_data, *pkt2; - pthread_mutex_lock(&ts->state_mutex); - /* Control */ if (type == SMT_SKIP) { - if (ts->state >= TS_LIVE) - timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data); + timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data); streaming_msg_free(sm); } else if (type == SMT_SPEED) { - if (ts->state >= TS_LIVE) - timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code); + timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code); streaming_msg_free(sm); - } - - else { + } else { - /* Start */ - if (type == SMT_START && ts->state == TS_INIT) - ts->state = TS_LIVE; + if (type == SMT_START) + timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA); /* Change PTS/DTS offsets */ - if (ts->packet_mode && ts->start_pts && type == SMT_PACKET) { + else if (ts->packet_mode && ts->start_pts && type == SMT_PACKET) { pkt2 = pkt_copy_shallow(pkt); pkt_ref_dec(pkt); sm->sm_data = pkt = pkt2; @@ -368,73 +313,37 @@ static void timeshift_input pkt->pkt_dts += ts->start_pts; } - /* Pass-thru */ - if (ts->state <= TS_LIVE) { - if (type == SMT_START) { - if (ts->smt_start) - streaming_start_unref(ts->smt_start); - ts->smt_start = sm->sm_data; - atomic_add(&ts->smt_start->ss_refcount, 1); - if (ts->packet_mode) { - timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA + 1000, ts->dobuf); - if (ts->last_time) - ts->start_pts = ts->last_time + 1000; - } - } - streaming_target_deliver2(ts->output, streaming_msg_clone(sm)); - } - /* Check for exit */ - if (type == SMT_EXIT || + else if (type == SMT_EXIT || (type == SMT_STOP && sm->sm_code != SM_CODE_SOURCE_RECONFIGURED)) - exit = 1; + ts->exit = 1; - if (type == SMT_MPEGTS) + else if (type == SMT_MPEGTS) ts->packet_mode = 0; - /* Buffer to disk */ - if ((ts->state > TS_LIVE) || (ts->dobuf && (ts->state == TS_LIVE))) { - if (ts->packet_mode) { - sm->sm_time = ts->last_time; - if (type == SMT_PACKET) { - timeshift_packet(ts, pkt, 1); - goto msg_free; - } - } else { - if (ts->ref_time == 0) { - ts->ref_time = getmonoclock(); - sm->sm_time = 0; - } else { - sm->sm_time = getmonoclock() - ts->ref_time; - } + /* Send to the writer thread */ + if (ts->packet_mode) { + sm->sm_time = ts->last_time; + if (type == SMT_PACKET) { + timeshift_packet(ts, pkt); + goto msg_free; } - streaming_target_deliver2(&ts->wr_queue.sq_st, sm); } else { - if (type == SMT_PACKET) { - timeshift_packet(ts, pkt, 0); - tvhtrace("timeshift", - "ts %d pkt in - stream %d type %c pts %10"PRId64 - " dts %10"PRId64" dur %10d len %6zu", - ts->id, - pkt->pkt_componentindex, - pkt_frametype_to_char(pkt->pkt_frametype), - ts_rescale(pkt->pkt_pts, 1000000), - ts_rescale(pkt->pkt_dts, 1000000), - pkt->pkt_duration, - pktbuf_len(pkt->pkt_payload)); + if (ts->ref_time == 0) { + ts->ref_time = getmonoclock(); + sm->sm_time = 0; + } else { + sm->sm_time = getmonoclock() - ts->ref_time; } -msg_free: - streaming_msg_free(sm); } + streaming_target_deliver2(&ts->wr_queue.sq_st, sm); +msg_free: + streaming_msg_free(sm); /* Exit/Stop */ - if (exit) { + if (ts->exit) timeshift_write_exit(ts->rd_pipe.wr); - ts->state = TS_EXIT; - } } - - pthread_mutex_unlock(&ts->state_mutex); } /** @@ -474,10 +383,6 @@ timeshift_destroy(streaming_target_t *pad) /* Flush files */ timeshift_filemgr_flush(ts, NULL); - /* Release SMT_START index */ - if (ts->smt_start) - streaming_start_unref(ts->smt_start); - if (ts->path) free(ts->path); free(ts); @@ -503,7 +408,8 @@ streaming_target_t *timeshift_create ts->output = out; ts->path = NULL; ts->max_time = max_time; - ts->state = TS_INIT; + ts->state = TS_LIVE; + ts->exit = 0; ts->full = 0; ts->vididx = -1; ts->id = timeshift_index; @@ -511,6 +417,7 @@ streaming_target_t *timeshift_create ts->dobuf = ts->ondemand ? 0 : 1; ts->packet_mode= 1; ts->last_time = 0; + ts->buf_time = 0; ts->start_pts = 0; ts->ref_time = 0; for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++) diff --git a/src/timeshift/private.h b/src/timeshift/private.h index cce7feee6..b094289cd 100644 --- a/src/timeshift/private.h +++ b/src/timeshift/private.h @@ -19,7 +19,7 @@ #ifndef __TVH_TIMESHIFT_PRIVATE_H__ #define __TVH_TIMESHIFT_PRIVATE_H__ -#define TIMESHIFT_PLAY_BUF 2000000 //< us to buffer in TX +#define TIMESHIFT_PLAY_BUF 500000 //< us to buffer in TX #define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file #define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams @@ -96,21 +96,20 @@ typedef struct timeshift { int64_t last_time; ///< Last time in us (PTS conversion) int64_t start_pts; ///< Start time for packets (PTS) int64_t ref_time; ///< Start time in us (monoclock) + int64_t buf_time; ///< Last buffered time in us (PTS conversion) struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting int backlog_max;///< Maximum component index in backlog enum { - TS_INIT, TS_EXIT, TS_LIVE, TS_PAUSE, TS_PLAY, } state; ///< Play state pthread_mutex_t state_mutex; ///< Protect state changes + uint8_t exit; ///< Exit from the main input thread uint8_t full; ///< Buffer is full - streaming_start_t *smt_start; ///< Current stream makeup - streaming_queue_t wr_queue; ///< Writer queue pthread_t wr_thread; ///< Writer thread @@ -129,11 +128,6 @@ typedef struct timeshift { extern uint64_t timeshift_total_size; extern uint64_t timeshift_total_ram_size; -/* - * - */ -void timeshift_packets_clone ( timeshift_t *ts, struct streaming_message_queue *dst, int delivered ); - /* * Write functions */ diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index 4efbe49f1..1b0a19e84 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -202,19 +202,6 @@ static ssize_t _read_msg ( timeshift_file_t *tsf, int fd, streaming_message_t ** * Utilities * *************************************************************************/ -static streaming_message_t *_timeshift_find_sstart - ( timeshift_file_t *tsf, int64_t time ) -{ - timeshift_index_data_t *ti; - - /* Find the SMT_START message that relates (comes before) the given time */ - ti = TAILQ_LAST(&tsf->sstart, timeshift_index_data_list); - while (ti && ti->data->sm_time > time) - ti = TAILQ_PREV(ti, timeshift_index_data_list, link); - - return ti ? ti->data : NULL; -} - static int64_t _timeshift_first_time ( timeshift_t *ts, int *active ) { @@ -421,21 +408,29 @@ static int _timeshift_read *wait = 0; tvhtrace("timeshift", "ts %d eof, cur_file %p", ts->id, *cur_file); - /* Check SMT_START index */ - } else { - streaming_message_t *ssm = _timeshift_find_sstart(*cur_file, (*sm)->sm_time); - if (ssm && ssm->sm_data != ts->smt_start) { - streaming_target_deliver2(ts->output, streaming_msg_clone(ssm)); - if (ts->smt_start) - streaming_start_unref(ts->smt_start); - ts->smt_start = ssm->sm_data; - atomic_add(&ts->smt_start->ss_refcount, 1); - } } } return 0; } +/* + * Trace packet + */ +static void timeshift_trace_pkt + ( timeshift_t *ts, streaming_message_t *sm ) +{ + th_pkt_t *pkt = sm->sm_data; + tvhtrace("timeshift", + "ts %d pkt out - stream %d type %c pts %10"PRId64 + " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64, + ts->id, + pkt->pkt_componentindex, + pkt_frametype_to_char(pkt->pkt_frametype), + ts_rescale(pkt->pkt_pts, 1000000), + ts_rescale(pkt->pkt_dts, 1000000), + pkt->pkt_duration, + pktbuf_len(pkt->pkt_payload), sm->sm_time); +} /* * Flush all data to live @@ -445,39 +440,17 @@ static int _timeshift_flush_to_live { streaming_message_t *sm; - time_t pts = 0; while (*cur_file) { if (_timeshift_read(ts, cur_file, &sm, wait) == -1) return -1; if (!sm) break; - if (sm->sm_type == SMT_PACKET) { - pts = ((th_pkt_t*)sm->sm_data)->pkt_pts; - tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t, - ts->id, sm->sm_time, pts); - } + if (tvhtrace_enabled() && sm->sm_type == SMT_PACKET) + timeshift_trace_pkt(ts, sm); streaming_target_deliver2(ts->output, sm); } return 0; } -/* - * Write packets from temporary queues - */ -static void _timeshift_write_queues - ( timeshift_t *ts ) -{ - struct streaming_message_queue sq; - streaming_message_t *sm; - - TAILQ_INIT(&sq); - timeshift_writer_clone(ts, &sq); - timeshift_packets_clone(ts, &sq, 0); - while ((sm = TAILQ_FIRST(&sq)) != NULL) { - TAILQ_REMOVE(&sq, sm, sm_link); - streaming_target_deliver2(ts->output, sm); - } -} - /* * Send the status message */ @@ -488,7 +461,7 @@ static void timeshift_fill_status int64_t start, end; start = _timeshift_first_time(ts, &active); - end = ts->last_time; + end = ts->buf_time; if (current_time < 0) current_time = 0; if (current_time > end) @@ -519,25 +492,6 @@ static void timeshift_status streaming_target_deliver2(ts->output, tsm); } -/* - * Trace packet - */ -static void timeshift_trace_pkt - ( timeshift_t *ts, streaming_message_t *sm ) -{ - th_pkt_t *pkt = sm->sm_data; - tvhtrace("timeshift", - "ts %d pkt out - stream %d type %c pts %10"PRId64 - " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64, - ts->id, - pkt->pkt_componentindex, - pkt_frametype_to_char(pkt->pkt_frametype), - ts_rescale(pkt->pkt_pts, 1000000), - ts_rescale(pkt->pkt_dts, 1000000), - pkt->pkt_duration, - pktbuf_len(pkt->pkt_payload), sm->sm_time); -} - /* ************************************************************************** * Thread * *************************************************************************/ @@ -549,7 +503,7 @@ static void timeshift_trace_pkt void *timeshift_reader ( void *p ) { timeshift_t *ts = p; - int nfds, end, run = 1, wait = -1, skip_delivered = 0; + int nfds, end, run = 1, wait = -1; timeshift_file_t *cur_file = NULL, *tmp_file; int cur_speed = 100, keyframe_mode = 0; int64_t mono_now, mono_play_time = 0, mono_last_status = 0; @@ -560,7 +514,6 @@ void *timeshift_reader ( void *p ) streaming_skip_t *skip = NULL; tvhpoll_t *pd; tvhpoll_event_t ev = { 0 }; - th_pkt_t *pkt; pd = tvhpoll_create(1); ev.fd = ts->rd_pipe.rd; @@ -628,13 +581,12 @@ void *timeshift_reader ( void *p ) ts->id); timeshift_writer_flush(ts); ts->dobuf = 1; - skip_delivered = 1; tmp_file = timeshift_filemgr_newest(ts); if (tmp_file != NULL) { i64 = tmp_file->last; tmp_file->refcount--; } else { - i64 = ts->last_time; + i64 = ts->buf_time; } cur_file = timeshift_filemgr_get(ts, i64); if (cur_file != NULL) { @@ -669,7 +621,6 @@ void *timeshift_reader ( void *p ) mono_play_time = mono_now; tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64, mono_now, pause_time); } else if (ts->state == TS_PAUSE) { - skip_delivered = 1; pause_time = last_time; } tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d", ts->id, speed); @@ -722,9 +673,8 @@ void *timeshift_reader ( void *p ) cur_file->roff = cur_file->size; last_time = cur_file->last; } else { - last_time = ts->last_time; + last_time = ts->buf_time; } - skip_delivered = 0; } /* May have failed */ @@ -735,7 +685,7 @@ void *timeshift_reader ( void *p ) /* Live (stage2) */ if (ts->state == TS_LIVE) { - if (skip_time >= ts->last_time - TIMESHIFT_PLAY_BUF) { + if (skip_time >= ts->buf_time - TIMESHIFT_PLAY_BUF) { tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id); skip = NULL; } else { @@ -865,17 +815,9 @@ void *timeshift_reader ( void *p ) ((cur_speed > 0) && (sm->sm_time <= deliver))))) { last_time = sm->sm_time; - if (sm->sm_type == SMT_PACKET) { - pkt = sm->sm_data; - if (skip_delivered && pkt->pkt_delivered) { - streaming_msg_free(sm); - goto skip_pkt; - } - if (tvhtrace_enabled()) - timeshift_trace_pkt(ts, sm); - } + if (sm->sm_type == SMT_PACKET && tvhtrace_enabled()) + timeshift_trace_pkt(ts, sm); streaming_target_deliver2(ts->output, sm); -skip_pkt: sm = NULL; wait = 0; @@ -915,9 +857,6 @@ skip_pkt: if (_timeshift_flush_to_live(ts, &cur_file, &wait) == -1) break; - /* Flush write / backlog queues */ - _timeshift_write_queues(ts); - ts->state = TS_LIVE; /* Close file (if open) */ diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c index 795965764..ac9dc298a 100644 --- a/src/timeshift/timeshift_writer.c +++ b/src/timeshift/timeshift_writer.c @@ -248,18 +248,6 @@ static void _handle_sstart ( timeshift_t *ts, timeshift_file_t *tsf, streaming_m } } -static void _copy_last_sstart ( timeshift_t *ts, timeshift_file_t *tsf ) -{ - streaming_message_t *sm; - streaming_start_t *ss = ts->smt_start; - - if (ss) { - atomic_add(&ss->ss_refcount, 1); - sm = streaming_msg_create_data(SMT_START, ss); - _handle_sstart(ts, tsf, sm); - } -} - /* ************************************************************************** * Thread * *************************************************************************/ @@ -277,8 +265,6 @@ static inline ssize_t _process_msg0 } else if (sm->sm_type == SMT_SIGNAL_STATUS) err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data); else if (sm->sm_type == SMT_PACKET) { - if (TAILQ_EMPTY(&tsf->sstart)) - _copy_last_sstart(ts, tsf); err = timeshift_write_packet(tsf, sm->sm_time, sm->sm_data); if (err > 0) { th_pkt_t *pkt = sm->sm_data; @@ -293,8 +279,6 @@ static inline ssize_t _process_msg0 } } } else if (sm->sm_type == SMT_MPEGTS) { - if (TAILQ_EMPTY(&tsf->sstart)) - _copy_last_sstart(ts, tsf); err = timeshift_write_mpegts(tsf, sm->sm_time, sm->sm_data); } else @@ -323,11 +307,11 @@ static void _process_msg /* Terminate */ case SMT_EXIT: if (run) *run = 0; - break; + goto live; case SMT_STOP: if (sm->sm_code != SM_CODE_SOURCE_RECONFIGURED && run) *run = 0; - break; + goto live; /* Timeshifting */ case SMT_SKIP: @@ -341,7 +325,7 @@ static void _process_msg case SMT_SERVICE_STATUS: case SMT_TIMESHIFT_STATUS: case SMT_DESCRAMBLE_INFO: - break; + goto live; /* Store */ case SMT_SIGNAL_STATUS: @@ -349,21 +333,31 @@ static void _process_msg case SMT_MPEGTS: case SMT_PACKET: pthread_mutex_lock(&ts->state_mutex); - if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) { - if ((err = _process_msg0(ts, tsf, &sm)) < 0) { - timeshift_filemgr_close(tsf); - tsf->bad = 1; - ts->full = 1; ///< Stop any more writing + ts->buf_time = sm->sm_time; + if (ts->state == TS_LIVE) + streaming_target_deliver2(ts->output, streaming_msg_clone(sm)); + if (ts->dobuf) { + if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) { + if ((err = _process_msg0(ts, tsf, &sm)) < 0) { + timeshift_filemgr_close(tsf); + tsf->bad = 1; + ts->full = 1; ///< Stop any more writing + } + tsf->refcount--; } - tsf->refcount--; } pthread_mutex_unlock(&ts->state_mutex); break; } /* Next */ - if (sm) - streaming_msg_free(sm); + streaming_msg_free(sm); + +live: + pthread_mutex_lock(&ts->state_mutex); + if (ts->state == TS_LIVE) + streaming_target_deliver2(ts->output, sm); + pthread_mutex_unlock(&ts->state_mutex); } void *timeshift_writer ( void *aux )