From: Jaroslav Kysela Date: Sat, 19 Dec 2015 20:08:00 +0000 (+0100) Subject: timeshift: change time source - use PTS as the synchronization source X-Git-Tag: v4.2.1~1271 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=582562ff3ef3b0f0b1c5113585b527bbf2048f44;p=thirdparty%2Ftvheadend.git timeshift: change time source - use PTS as the synchronization source --- diff --git a/src/atomic.h b/src/atomic.h index 67b972d1f..b5b8f9699 100644 --- a/src/atomic.h +++ b/src/atomic.h @@ -48,6 +48,21 @@ atomic_add_u64(volatile uint64_t *ptr, uint64_t incr) #endif } +static inline int64_t +atomic_add_s64(volatile int64_t *ptr, int64_t incr) +{ +#if ENABLE_ATOMIC64 + return __sync_fetch_and_add(ptr, incr); +#else + uint64_t ret; + pthread_mutex_lock(&atomic_lock); + ret = *ptr; + *ptr += incr; + pthread_mutex_unlock(&atomic_lock); + return ret; +#endif +} + static inline time_t atomic_add_time_t(volatile time_t *ptr, time_t incr) { @@ -123,6 +138,12 @@ atomic_exchange_u64(volatile uint64_t *ptr, uint64_t new) return __sync_lock_test_and_set(ptr, new); } +static inline int +atomic_exchange_s64(volatile int64_t *ptr, int64_t new) +{ + return __sync_lock_test_and_set(ptr, new); +} + static inline int atomic_exchange_time_t(volatile time_t *ptr, int new) { diff --git a/src/htsp_server.c b/src/htsp_server.c index d7302daa0..33be5f8c8 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -2459,7 +2459,7 @@ htsp_method_skip(htsp_connection_t *htsp, htsmsg_t *in) memset(&skip, 0, sizeof(skip)); if(!htsmsg_get_s64(in, "time", &s64)) { skip.type = abs ? SMT_SKIP_ABS_TIME : SMT_SKIP_REL_TIME; - skip.time = hs->hs_90khz ? s64 : ts_rescale_i(s64, 1000000); + skip.time = hs->hs_90khz ? s64 : ts_rescale_inv(s64, 1000000); tvhtrace("htsp-sub", "skip: %s %"PRId64" (%s)", abs ? "abs" : "rel", skip.time, hs->hs_90khz ? "90kHz" : "1MHz"); } else if (!htsmsg_get_s64(in, "size", &s64)) { @@ -4005,6 +4005,26 @@ htsp_subscription_speed(htsp_subscription_t *hs, int speed) htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0); } +/** + * + */ +#if ENABLE_TIMESHIFT +static void +htsp_subscription_timeshift_status(htsp_subscription_t *hs, timeshift_status_t *status) +{ + htsmsg_t *m = htsmsg_create_map(); + htsmsg_add_str(m, "method", "timeshiftStatus"); + htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); + htsmsg_add_u32(m, "full", status->full); + htsmsg_add_s64(m, "shift", hs->hs_90khz ? status->shift : ts_rescale(status->shift, 1000000)); + if (status->pts_start != PTS_UNSET) + htsmsg_add_s64(m, "start", hs->hs_90khz ? status->pts_start : ts_rescale(status->pts_start, 1000000)) ; + if (status->pts_end != PTS_UNSET) + htsmsg_add_s64(m, "end", hs->hs_90khz ? status->pts_end : ts_rescale(status->pts_end, 1000000)) ; + htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0); +} +#endif + /** * */ @@ -4017,8 +4037,10 @@ htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip) htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); /* Flush pkt buffers */ - if (skip->type != SMT_SKIP_ERROR) + if (skip->type != SMT_SKIP_ERROR) { htsp_flush_queue(hs->hs_htsp, &hs->hs_q, 0); + htsp_subscription_timeshift_status(hs, &skip->timeshift); + } if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_ABS_SIZE) htsmsg_add_u32(m, "absolute", 1); @@ -4031,26 +4053,6 @@ htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip) htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0); } -/** - * - */ -#if ENABLE_TIMESHIFT -static void -htsp_subscription_timeshift_status(htsp_subscription_t *hs, timeshift_status_t *status) -{ - htsmsg_t *m = htsmsg_create_map(); - htsmsg_add_str(m, "method", "timeshiftStatus"); - htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); - htsmsg_add_u32(m, "full", status->full); - htsmsg_add_s64(m, "shift", hs->hs_90khz ? status->shift : ts_rescale(status->shift, 1000000)); - if (status->pts_start != PTS_UNSET) - htsmsg_add_s64(m, "start", hs->hs_90khz ? status->pts_start : ts_rescale(status->pts_start, 1000000)) ; - if (status->pts_end != PTS_UNSET) - htsmsg_add_s64(m, "end", hs->hs_90khz ? status->pts_end : ts_rescale(status->pts_end, 1000000)) ; - htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0); -} -#endif - /** * */ diff --git a/src/timeshift.c b/src/timeshift.c index 39550bdf6..bd5d1865a 100644 --- a/src/timeshift.c +++ b/src/timeshift.c @@ -24,6 +24,7 @@ #include "settings.h" #include "atomic.h" #include "access.h" +#include "atomic.h" #include #include @@ -205,34 +206,80 @@ const idclass_t timeshift_conf_class = { }; /* - * Decode initial time diff - * - * Gather some packets and select the lowest pts to identify - * the correct start. Note that for timeshift, the tsfix - * stream plugin is applied, so the starting pts should be - * near zero. If not - it's a bug. + * Process a packet with time sorting */ + +#define MAX_TIME_DELTA (2*1000000) /* 2 seconds */ +#define BACKLOG_COUNT ARRAY_SIZE(timeshift_t->backlog) + static void -timeshift_set_pts_delta ( timeshift_t *ts, int64_t pts ) +timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm ) { + th_pkt_t *pkt = sm->sm_data; + tvhtrace("timeshift", + "ts %d pkt buf - stream %d type %c pts %10"PRId64 + " dts %10"PRId64" dur %10d len %6zu time %14"PRId64, + ts->id, + pkt->pkt_componentindex, + pkt_frametype_to_char(pkt->pkt_frametype), + ts_rescale(pkt->pkt_pts, 1000000), + ts_rescale(pkt->pkt_dts, 1000000), + pkt->pkt_duration, + pktbuf_len(pkt->pkt_payload), + sm->sm_time); + streaming_target_deliver2(&ts->wr_queue.sq_st, sm); +} + +static void +timeshift_packet_flush ( timeshift_t *ts, int64_t time ) +{ + streaming_message_t *lowest, *sm; + struct streaming_message_queue *sq; int i; - int64_t smallest = INT64_MAX; - if (pts == PTS_UNSET) + while (1) { + lowest = NULL; + for (i = 0; i < ts->backlog_max; i++) { + sq = &ts->backlog[i]; + sm = TAILQ_FIRST(sq); + if (sm && sm->sm_time + MAX_TIME_DELTA < time) + if (lowest == NULL || lowest->sm_time > sm->sm_time) + lowest = sm; + } + if (!lowest) + break; + TAILQ_REMOVE(sq, lowest, sm_link); + timeshift_packet_deliver(ts, lowest); + } +} + +static void +timeshift_packet( timeshift_t *ts, th_pkt_t *pkt ) +{ + streaming_message_t *sm; + int64_t time; + + if (pkt->pkt_componentindex >= TIMESHIFT_BACKLOG_MAX) { + pkt_ref_dec(pkt); return; + } - for (i = 0; i < ARRAY_SIZE(ts->pts_val); i++) { - int64_t i64 = ts->pts_val[i]; - if (i64 == PTS_UNSET) { - ts->pts_val[i] = pts; - break; - } - if (i64 < smallest) - smallest = i64; + sm = streaming_msg_create_pkt(pkt); + + time = ts_rescale(pkt->pkt_pts, 1000000); + if (time > ts->last_time) { + atomic_exchange_s64(&ts->last_time, time); + timeshift_packet_flush(ts, time); } - if (i >= ARRAY_SIZE(ts->pts_val)) - ts->pts_delta = getmonoclock() - ts_rescale(smallest, 1000000); + sm->sm_time = time; + if (time + MAX_TIME_DELTA < ts->last_time) { + timeshift_packet_deliver(ts, sm); + } else { + if (pkt->pkt_componentindex >= ts->backlog_max) + ts->backlog_max = pkt->pkt_componentindex + 1; + TAILQ_INSERT_TAIL(&ts->backlog[pkt->pkt_componentindex], sm, sm_link); + } } /* @@ -281,25 +328,26 @@ static void timeshift_input (sm->sm_type == SMT_STOP && sm->sm_code == 0)) exit = 1; - /* Record (one-off) PTS delta */ - if (sm->sm_type == SMT_PACKET && ts->pts_delta == 0) - timeshift_set_pts_delta(ts, pkt->pkt_pts); + if (sm->sm_type == SMT_MPEGTS) + ts->packet_mode = 0; /* Buffer to disk */ if ((ts->state > TS_LIVE) || (!ts->ondemand && (ts->state == TS_LIVE))) { - sm->sm_time = getmonoclock(); - if (sm->sm_type == SMT_PACKET) { - tvhtrace("timeshift", - "ts %d pkt buf - stream %d type %c pts %10"PRId64 - " dts %10"PRId64" dur %10d len %6zu time %14"PRId64, - ts->id, - pkt->pkt_componentindex, - pkt_frametype_to_char(pkt->pkt_frametype), - ts_rescale(pkt->pkt_pts, 1000000), - ts_rescale(pkt->pkt_dts, 1000000), - pkt->pkt_duration, - pktbuf_len(pkt->pkt_payload), - sm->sm_time - ts->pts_delta); + if (ts->packet_mode) { + sm->sm_time = ts->last_time; + if (sm->sm_type == SMT_PACKET) { + timeshift_packet(ts, pkt); + sm->sm_data = NULL; + streaming_msg_free(sm); + goto pktcont; + } + } else { + if (ts->ref_time == 0) { + ts->ref_time = getmonoclock(); + sm->sm_time = 0; + } else { + sm->sm_time = getmonoclock() - ts->ref_time; + } } streaming_target_deliver2(&ts->wr_queue.sq_st, sm); } else { @@ -317,6 +365,7 @@ static void timeshift_input } streaming_msg_free(sm); } +pktcont: /* Exit/Stop */ if (exit) { @@ -336,6 +385,7 @@ timeshift_destroy(streaming_target_t *pad) { timeshift_t *ts = (timeshift_t*)pad; streaming_message_t *sm; + int i; /* Must hold global lock */ lock_assert(&global_lock); @@ -355,6 +405,8 @@ timeshift_destroy(streaming_target_t *pad) /* Shut stuff down */ streaming_queue_deinit(&ts->wr_queue); + for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++) + streaming_queue_clear(&ts->backlog[i]); close(ts->rd_pipe.rd); close(ts->rd_pipe.wr); @@ -396,9 +448,11 @@ streaming_target_t *timeshift_create ts->vididx = -1; ts->id = timeshift_index; ts->ondemand = timeshift_conf.ondemand; - ts->pts_delta = 0; - for (i = 0; i < ARRAY_SIZE(ts->pts_val); i++) - ts->pts_val[i] = PTS_UNSET; + ts->packet_mode= 1; + ts->last_time = 0; + ts->ref_time = 0; + for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++) + TAILQ_INIT(&ts->backlog[i]); pthread_mutex_init(&ts->rdwr_mutex, NULL); pthread_mutex_init(&ts->state_mutex, NULL); diff --git a/src/timeshift.h b/src/timeshift.h index d91c62ca4..baa2e932f 100644 --- a/src/timeshift.h +++ b/src/timeshift.h @@ -40,14 +40,6 @@ typedef struct timeshift_conf { extern struct timeshift_conf timeshift_conf; extern const idclass_t timeshift_conf_class; -typedef struct timeshift_status -{ - int full; - int64_t shift; - int64_t pts_start; - int64_t pts_end; -} timeshift_status_t; - void timeshift_init ( void ); void timeshift_term ( void ); diff --git a/src/timeshift/private.h b/src/timeshift/private.h index b112b060e..7cd409367 100644 --- a/src/timeshift/private.h +++ b/src/timeshift/private.h @@ -19,8 +19,9 @@ #ifndef __TVH_TIMESHIFT_PRIVATE_H__ #define __TVH_TIMESHIFT_PRIVATE_H__ -#define TIMESHIFT_PLAY_BUF 200000 // us to buffer in TX -#define TIMESHIFT_FILE_PERIOD 60 // number of secs in each buffer file +#define TIMESHIFT_PLAY_BUF 2000000 //< us to buffer in TX +#define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file +#define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams /** * Indexes of import data in the stream @@ -55,7 +56,7 @@ typedef struct timeshift_file int rfd; ///< Read descriptor char *path; ///< Full path to file - time_t time; ///< Files coarse timestamp + int64_t time; ///< Files coarse timestamp size_t size; ///< Current file size; int64_t last; ///< Latest timestamp off_t woff; ///< Write offset @@ -90,8 +91,11 @@ typedef struct timeshift { char *path; ///< Directory containing buffer time_t max_time; ///< Maximum period to shift int ondemand; ///< Whether this is an on-demand timeshift - int64_t pts_delta; ///< Delta between system clock and PTS - int64_t pts_val[6]; ///< Decision PTS values for multiple packets + int packet_mode;///< Packet mode (otherwise MPEG-TS data mode) + int64_t last_time; ///< Last time in us (PTS conversion) + int64_t ref_time; ///< Start time in us (monoclock) + struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting + int backlog_max;///< Maximum component index in backlog enum { TS_INIT, @@ -153,7 +157,7 @@ void timeshift_filemgr_term ( void ); int timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len ); timeshift_file_t *timeshift_filemgr_get - ( timeshift_t *ts, int create ); + ( timeshift_t *ts, int64_t start_time ); timeshift_file_t *timeshift_filemgr_oldest ( timeshift_t *ts ); timeshift_file_t *timeshift_filemgr_newest diff --git a/src/timeshift/timeshift_filemgr.c b/src/timeshift/timeshift_filemgr.c index 620423229..56f347eb6 100644 --- a/src/timeshift/timeshift_filemgr.c +++ b/src/timeshift/timeshift_filemgr.c @@ -210,13 +210,13 @@ void timeshift_filemgr_flush ( timeshift_t *ts, timeshift_file_t *end ) * */ static timeshift_file_t * timeshift_filemgr_file_init - ( timeshift_t *ts, time_t time ) + ( timeshift_t *ts, int64_t start_time ) { timeshift_file_t *tsf; tsf = calloc(1, sizeof(timeshift_file_t)); - tsf->time = time; - tsf->last = getmonoclock(); + tsf->time = start_time / (1000000LL * TIMESHIFT_FILE_PERIOD); + tsf->last = start_time; tsf->wfd = -1; tsf->rfd = -1; TAILQ_INIT(&tsf->iframes); @@ -229,17 +229,16 @@ static timeshift_file_t * timeshift_filemgr_file_init /* * Get current / new file */ -timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create ) +timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int64_t start_time ) { int fd; - struct timespec tp; timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp; timeshift_index_data_t *ti; char path[PATH_MAX]; - time_t time; + int64_t time; /* Return last file */ - if (!create) + if (start_time < 0) return timeshift_filemgr_newest(ts); /* No space */ @@ -247,10 +246,9 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create ) return NULL; /* Store to file */ - clock_gettime(CLOCK_MONOTONIC_COARSE, &tp); - time = tp.tv_sec / TIMESHIFT_FILE_PERIOD; tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list); - if (!tsf_tl || tsf_tl->time != time || + time = start_time / (1000000LL * TIMESHIFT_FILE_PERIOD); + if (!tsf_tl || tsf_tl->time < time || (tsf_tl->ram && tsf_tl->woff >= timeshift_conf.ram_segment_size)) { tsf_hd = TAILQ_FIRST(&ts->files); @@ -298,15 +296,15 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create ) if (timeshift_conf.ram_size >= 8*1024*1024 && atomic_pre_add_u64(×hift_total_ram_size, 0) < timeshift_conf.ram_size + (timeshift_conf.ram_segment_size / 2)) { - tsf_tmp = timeshift_filemgr_file_init(ts, time); + tsf_tmp = timeshift_filemgr_file_init(ts, start_time); tsf_tmp->ram_size = MIN(16*1024*1024, timeshift_conf.ram_segment_size); tsf_tmp->ram = malloc(tsf_tmp->ram_size); if (!tsf_tmp->ram) { free(tsf_tmp); tsf_tmp = NULL; } else { - tvhtrace("timeshift", "ts %d create RAM segment with %"PRId64" bytes (time %"PRItime_t")", - ts->id, tsf_tmp->ram_size, time); + tvhtrace("timeshift", "ts %d create RAM segment with %"PRId64" bytes (time %"PRId64")", + ts->id, tsf_tmp->ram_size, start_time); } } @@ -319,10 +317,10 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create ) } /* Create File */ - snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, time); + snprintf(path, sizeof(path), "%s/tvh-%"PRId64, ts->path, start_time); tvhtrace("timeshift", "ts %d create file %s", ts->id, path); if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) { - tsf_tmp = timeshift_filemgr_file_init(ts, time); + tsf_tmp = timeshift_filemgr_file_init(ts, start_time); tsf_tmp->wfd = fd; tsf_tmp->path = strdup(path); } diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index c6b5ce601..405ab575a 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -236,7 +236,7 @@ static timeshift_index_iframe_t *_timeshift_last_frame { int end; timeshift_index_iframe_t *tsi = NULL; - timeshift_file_t *tsf = timeshift_filemgr_get(ts, 0); + timeshift_file_t *tsf = timeshift_filemgr_get(ts, -1); while (tsf && !tsi) { if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list))) { tsf = timeshift_filemgr_prev(tsf, &end, 0); @@ -317,7 +317,7 @@ static int _timeshift_skip tsf = tsf_last; end = -1; } else { - tsf = tsf_last = timeshift_filemgr_get(ts, 0); + tsf = tsf_last = timeshift_filemgr_newest(ts); tsi = NULL; while (tsf && !tsi) { tsf_last = tsf; @@ -348,7 +348,7 @@ static int _timeshift_read { timeshift_file_t *tsf = *cur_file; ssize_t r; - off_t off, ooff; + off_t off; if (tsf) { @@ -365,7 +365,6 @@ static int _timeshift_read ts->id, tsf->path, (int64_t)tsf->roff, (int64_t)off, strerror(errno)); /* Read msg */ - ooff = tsf->roff; r = _read_msg(tsf, -1, sm); if (r < 0) { streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR); @@ -376,20 +375,8 @@ static int _timeshift_read } tvhtrace("timeshift", "ts %d seek to %jd (fd %i) read msg %p (%"PRId64")", ts->id, (intmax_t)tsf->roff, tsf->rfd, *sm, (int64_t)r); - /* Incomplete */ - if (r == 0) { - if (tsf->rfd >= 0) { - tvhtrace("timeshift", "ts %d seek to %jd (fd %i) (incomplete)", ts->id, (intmax_t)tsf->roff, tsf->rfd); - if ((off = lseek(tsf->rfd, ooff, SEEK_SET)) != ooff) - tvherror("timeshift", "seek to %s failed (off %"PRId64" != %"PRId64"): %s", - tsf->path, (int64_t)ooff, (int64_t)off, strerror(errno)); - } - tsf->roff = ooff; - return 0; - } - /* Special case - EOF */ - if (r == sizeof(size_t) || tsf->roff > tsf->size) { + if (r <= sizeof(size_t) || tsf->roff > tsf->size || *sm == NULL) { if (tsf->rfd >= 0) close(tsf->rfd); tsf->rfd = -1; @@ -398,6 +385,7 @@ static int _timeshift_read pthread_mutex_unlock(&ts->rdwr_mutex); tsf->roff = 0; // reset *wait = 0; + tvhtrace("timeshift", "ts %d eof, cur_file %p", ts->id, *cur_file); /* Check SMT_START index */ } else { @@ -438,6 +426,62 @@ static int _timeshift_flush_to_live return 0; } +/* + * Send the status message + */ +static void timeshift_fill_status + ( timeshift_t *ts, timeshift_status_t *status, int64_t current_time ) +{ + timeshift_index_iframe_t *fst, *lst; + int64_t shift; + + fst = _timeshift_first_frame(ts); + lst = _timeshift_last_frame(ts); + status->full = ts->full; + tvhtrace("timeshift", "status last->time %"PRId64" current time %"PRId64" state %d", + lst ? lst->time : -1, current_time, ts->state); + shift = lst ? ts_rescale_inv(lst->time - current_time, 1000000) : -1; + status->shift = (ts->state <= TS_LIVE || shift < 0 || !lst) ? 0 : shift; + if (lst && fst && lst != fst) { + status->pts_start = ts_rescale_inv(fst->time, 1000000); + status->pts_end = ts_rescale_inv(lst->time, 1000000); + } else { + status->pts_start = PTS_UNSET; + status->pts_end = PTS_UNSET; + } +} + +static void timeshift_status + ( timeshift_t *ts, int64_t current_time ) +{ + streaming_message_t *tsm; + timeshift_status_t *status; + + status = calloc(1, sizeof(timeshift_status_t)); + timeshift_fill_status(ts, status, current_time); + tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status); + streaming_target_deliver2(ts->output, tsm); +} + +/* + * Trace packet + */ +static void timeshift_trace_pkt + ( timeshift_t *ts, streaming_message_t *sm ) +{ + th_pkt_t *pkt = sm->sm_data; + tvhtrace("timeshift", + "ts %d pkt out - stream %d type %c pts %10"PRId64 + " dts %10"PRId64 " dur %10d len %6zu time %14"PRItime_t, + ts->id, + pkt->pkt_componentindex, + pkt_frametype_to_char(pkt->pkt_frametype), + ts_rescale(pkt->pkt_pts, 1000000), + ts_rescale(pkt->pkt_dts, 1000000), + pkt->pkt_duration, + pktbuf_len(pkt->pkt_payload), sm->sm_time); +} + /* ************************************************************************** * Thread * *************************************************************************/ @@ -452,12 +496,11 @@ void *timeshift_reader ( void *p ) int nfds, end, run = 1, wait = -1; timeshift_file_t *cur_file = NULL; int cur_speed = 100, keyframe_mode = 0; - int64_t pause_time = 0, play_time = 0, last_time = 0; - int64_t now, deliver, skip_time = 0; + int64_t mono_now, mono_play_time = 0, mono_last_status = 0; + int64_t deliver, deliver0, pause_time = 0, last_time = 0, skip_time = 0; streaming_message_t *sm = NULL, *ctrl = NULL; timeshift_index_iframe_t *tsi = NULL; streaming_skip_t *skip = NULL; - time_t last_status = 0; tvhpoll_t *pd; tvhpoll_event_t ev = { 0 }; @@ -482,7 +525,7 @@ void *timeshift_reader ( void *p ) wait = -1; end = 0; skip = NULL; - now = getmonoclock(); + mono_now = getmonoclock(); /* Control */ pthread_mutex_lock(&ts->state_mutex); @@ -527,12 +570,19 @@ void *timeshift_reader ( void *p ) ts->id); timeshift_writer_flush(ts); pthread_mutex_lock(&ts->rdwr_mutex); - if ((cur_file = timeshift_filemgr_get(ts, 1))) { + cur_file = timeshift_filemgr_newest(ts); + cur_file = timeshift_filemgr_get(ts, cur_file ? cur_file->last : + atomic_add_s64(&ts->last_time, 0)); + if (cur_file != NULL) { cur_file->roff = cur_file->size; pause_time = cur_file->last; last_time = pause_time; + } else { + last_time = atomic_add_s64(&ts->last_time, 0); } pthread_mutex_unlock(&ts->rdwr_mutex); + mono_play_time = mono_now; + tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64, mono_now); } /* Buffer playback */ @@ -555,10 +605,11 @@ void *timeshift_reader ( void *p ) } /* Update */ - play_time = getmonoclock(); cur_speed = speed; - if (speed != 100 || ts->state != TS_LIVE) + if (speed != 100 || ts->state != TS_LIVE) { ts->state = speed == 0 ? TS_PAUSE : TS_PLAY; + tvhtrace("timeshift", "reader - set %s", speed == 0 ? "TS_PAUSE" : "TS_PLAY"); + } tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d", ts->id, speed); } @@ -594,11 +645,6 @@ void *timeshift_reader ( void *p ) break; case SMT_SKIP_ABS_TIME: - if (ts->pts_delta == 0) { - tvhlog(LOG_ERR, "timeshift", "ts %d abs skip not possible no PTS delta", ts->id); - skip = NULL; - break; - } /* -fallthrough */ case SMT_SKIP_REL_TIME: @@ -609,7 +655,8 @@ void *timeshift_reader ( void *p ) /* Live playback (stage1) */ if (ts->state == TS_LIVE) { pthread_mutex_lock(&ts->rdwr_mutex); - if ((cur_file = timeshift_filemgr_get(ts, !ts->ondemand))) { + cur_file = timeshift_filemgr_newest(ts); + if (cur_file && (cur_file = timeshift_filemgr_get(ts, cur_file->last)) != NULL) { cur_file->roff = cur_file->size; last_time = cur_file->last; } else { @@ -621,17 +668,18 @@ void *timeshift_reader ( void *p ) /* May have failed */ if (skip) { - skip_time += (skip->type == SMT_SKIP_ABS_TIME) ? ts->pts_delta : last_time; - tvhlog(LOG_DEBUG, "timeshift", "ts %d skip last_time %"PRId64" pts_delta %"PRId64, - ts->id, skip_time - ts->pts_delta, ts->pts_delta); + if (skip->type == SMT_SKIP_REL_TIME) + skip_time += last_time; + tvhlog(LOG_DEBUG, "timeshift", "ts %d skip time %"PRId64, ts->id, skip_time); - /* Live (stage2) */ + /* Live (stage2) */ if (ts->state == TS_LIVE) { - if (skip_time >= now) { + if (skip_time >= atomic_add_s64(&ts->last_time, 0) - TIMESHIFT_PLAY_BUF) { tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id); skip = NULL; } else { ts->state = TS_PLAY; + tvhtrace("timeshift", "reader - set TS_PLAY"); } } } @@ -640,8 +688,10 @@ void *timeshift_reader ( void *p ) if (skip) { /* Adjust time */ - play_time = now; - pause_time = skip_time; + if (mono_play_time != mono_now) + tvhtrace("timeshift", "update play time skip - %"PRId64, mono_now); + mono_play_time = mono_now; + pause_time = atomic_add_s64(&ts->last_time, 0); tsi = NULL; /* Clear existing packet */ @@ -671,38 +721,23 @@ void *timeshift_reader ( void *p ) } } - /* Status message */ - if (now >= (last_status + 1000000)) { - streaming_message_t *tsm; - timeshift_status_t *status; - timeshift_index_iframe_t *fst, *lst; - status = calloc(1, sizeof(timeshift_status_t)); - fst = _timeshift_first_frame(ts); - lst = _timeshift_last_frame(ts); - status->full = ts->full; - status->shift = ts->state <= TS_LIVE ? 0 : ts_rescale_i(now - last_time, 1000000); - if (lst && fst && lst != fst && ts->pts_delta != PTS_UNSET) { - status->pts_start = ts_rescale_i(fst->time - ts->pts_delta, 1000000); - status->pts_end = ts_rescale_i(lst->time - ts->pts_delta, 1000000); - } else { - status->pts_start = PTS_UNSET; - status->pts_end = PTS_UNSET; - } - tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status); - streaming_target_deliver2(ts->output, tsm); - last_status = now; - } /* Done */ if (!run || !cur_file || ((ts->state != TS_PLAY && !skip))) { + if (mono_now >= (mono_last_status + 1000000)) { + timeshift_status(ts, last_time); + mono_last_status = mono_now; + } pthread_mutex_unlock(&ts->state_mutex); continue; } /* Calculate delivery time */ - deliver = (now - play_time) + TIMESHIFT_PLAY_BUF; - deliver = (deliver * cur_speed) / 100; + deliver0 = (mono_now - mono_play_time) + TIMESHIFT_PLAY_BUF; + deliver = (deliver0 * cur_speed) / 100; deliver = (deliver + pause_time); + tvhtrace("timeshift", "speed %d now %"PRId64" play_time %"PRId64" deliver %"PRId64" deliver0 %"PRId64, + cur_speed, mono_now, mono_play_time, deliver, deliver0); /* Determine next packet */ if (!sm) { @@ -718,7 +753,7 @@ void *timeshift_reader ( void *p ) else req_time = skip_time; tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to %"PRId64" from %"PRId64, - ts->id, req_time - ts->pts_delta, last_time - ts->pts_delta); + ts->id, req_time, last_time); /* Find */ pthread_mutex_lock(&ts->rdwr_mutex); @@ -727,7 +762,7 @@ void *timeshift_reader ( void *p ) pthread_mutex_unlock(&ts->rdwr_mutex); if (tsi) tvhlog(LOG_DEBUG, "timeshift", "ts %d skip found pkt @ %"PRId64, - ts->id, tsi->time - ts->pts_delta); + ts->id, tsi->time); /* File changed (close) */ if ((tsf != cur_file) && cur_file && cur_file->rfd >= 0) { @@ -755,12 +790,14 @@ void *timeshift_reader ( void *p ) /* Send skip response */ if (skip) { - if (sm && sm->sm_type == SMT_PACKET) { - th_pkt_t *pkt = sm->sm_data; - skip->time = pkt->pkt_pts; + if (sm) { + /* Status message */ + skip->time = ts_rescale_inv(sm->sm_time, 1000000); skip->type = SMT_SKIP_ABS_TIME; - tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to pts %"PRId64" ok, time %"PRId64, - ts->id, ts_rescale(skip->time, 1000000), sm->sm_time - ts->pts_delta); + tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to pts %"PRId64" ok", ts->id, sm->sm_time); + /* Update timeshift status */ + timeshift_fill_status(ts, &skip->timeshift, sm->sm_time); + mono_last_status = mono_now; } else { /* Report error */ skip->type = SMT_SKIP_ERROR; @@ -774,33 +811,30 @@ void *timeshift_reader ( void *p ) /* Deliver */ if (sm && (skip || (((cur_speed < 0) && (sm->sm_time >= deliver)) || - ((cur_speed > 0) && (sm->sm_time <= deliver))))) { - - if (sm->sm_type == SMT_PACKET && tvhtrace_enabled()) { - th_pkt_t *pkt = sm->sm_data; - tvhtrace("timeshift", - "ts %d pkt out - stream %d type %c pts %10"PRId64 - " dts %10"PRId64 " dur %10d len %6zu time %14"PRItime_t, - ts->id, - pkt->pkt_componentindex, - pkt_frametype_to_char(pkt->pkt_frametype), - ts_rescale(pkt->pkt_pts, 1000000), - ts_rescale(pkt->pkt_dts, 1000000), - pkt->pkt_duration, - pktbuf_len(pkt->pkt_payload), sm->sm_time - ts->pts_delta); - } + ((cur_speed > 0) && (sm->sm_time <= deliver))))) { + + if (sm->sm_type == SMT_PACKET && tvhtrace_enabled()) + timeshift_trace_pkt(ts, sm); last_time = sm->sm_time; streaming_target_deliver2(ts->output, sm); sm = NULL; wait = 0; + } else if (sm) { + if (cur_speed > 0) wait = (sm->sm_time - deliver) / 1000; else wait = (deliver - sm->sm_time) / 1000; if (wait == 0) wait = 1; - tvhtrace("timeshift", "ts %d wait %d", - ts->id, wait); + tvhtrace("timeshift", "ts %d wait %d", ts->id, wait); + + } + + /* Periodic timeshift status */ + if (mono_now >= (mono_last_status + 1000000)) { + timeshift_status(ts, last_time); + mono_last_status = mono_now; } /* Terminate */ @@ -811,11 +845,12 @@ void *timeshift_reader ( void *p ) /* Back to live (unless buffer is full) */ if (end == 1 && !ts->full) { tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id); - ts->state = TS_LIVE; cur_speed = 100; ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed); streaming_target_deliver2(ts->output, ctrl); ctrl = NULL; + tvhtrace("timeshift", "reader - set TS_LIVE"); + ts->state = TS_LIVE; /* Flush timeshift buffer to live */ if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1) @@ -835,13 +870,17 @@ void *timeshift_reader ( void *p ) } else { if (cur_speed <= 0) { cur_speed = 0; + tvhtrace("timeshift", "reader - set TS_PAUSE"); ts->state = TS_PAUSE; } else { cur_speed = 100; + tvhtrace("timeshift", "reader - set TS_PLAY"); ts->state = TS_PLAY; - play_time = now; + if (mono_play_time != mono_now) + tvhtrace("timeshift", "update play time (pause) - %"PRId64, mono_now); + mono_play_time = mono_now; } - tvhlog(LOG_DEBUG, "timeshift", "ts %d sob speed %d", ts->id, cur_speed); + tvhlog(LOG_DEBUG, "timeshift", "ts %d sob speed %d last time %"PRId64, ts->id, cur_speed, last_time); pause_time = last_time; ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed); streaming_target_deliver2(ts->output, ctrl); diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c index b27fe5143..40355bcf0 100644 --- a/src/timeshift/timeshift_writer.c +++ b/src/timeshift/timeshift_writer.c @@ -320,7 +320,7 @@ static void _process_msg case SMT_MPEGTS: case SMT_PACKET: pthread_mutex_lock(&ts->rdwr_mutex); - if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->wfd >= 0 || tsf->ram)) { + if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) { if ((err = _process_msg0(ts, tsf, &sm)) < 0) { timeshift_filemgr_close(tsf); tsf->bad = 1; diff --git a/src/tvheadend.h b/src/tvheadend.h index 35ca3344d..5cbb58063 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -309,6 +309,17 @@ typedef struct descramble_info { char protocol [128]; } descramble_info_t; +/** + * + */ +typedef struct timeshift_status +{ + int full; + int64_t shift; + int64_t pts_start; + int64_t pts_end; +} timeshift_status_t; + /** * Streaming skip */ @@ -326,6 +337,9 @@ typedef struct streaming_skip off_t size; int64_t time; }; +#if ENABLE_TIMESHIFT + timeshift_status_t timeshift; +#endif } streaming_skip_t; @@ -700,7 +714,7 @@ static inline int64_t ts_rescale(int64_t ts, int tb) return (ts * tb ) / 90000LL; } -static inline int64_t ts_rescale_i(int64_t ts, int tb) +static inline int64_t ts_rescale_inv(int64_t ts, int tb) { return (ts * 90000LL) / tb; }