]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
timeshift: change time source - use PTS as the synchronization source
authorJaroslav Kysela <perex@perex.cz>
Sat, 19 Dec 2015 20:08:00 +0000 (21:08 +0100)
committerJaroslav Kysela <perex@perex.cz>
Wed, 30 Dec 2015 17:33:08 +0000 (18:33 +0100)
src/atomic.h
src/htsp_server.c
src/timeshift.c
src/timeshift.h
src/timeshift/private.h
src/timeshift/timeshift_filemgr.c
src/timeshift/timeshift_reader.c
src/timeshift/timeshift_writer.c
src/tvheadend.h

index 67b972d1f396d43a791dedc110c41bd4ff154bc0..b5b8f9699de7d55a3f83ef666579d6ba400fe0f1 100644 (file)
@@ -48,6 +48,21 @@ atomic_add_u64(volatile uint64_t *ptr, uint64_t incr)
 #endif
 }
 
+static inline int64_t
+atomic_add_s64(volatile int64_t *ptr, int64_t incr)
+{
+#if ENABLE_ATOMIC64
+  return __sync_fetch_and_add(ptr, incr);
+#else
+  uint64_t ret;
+  pthread_mutex_lock(&atomic_lock);
+  ret = *ptr;
+  *ptr += incr;
+  pthread_mutex_unlock(&atomic_lock);
+  return ret;
+#endif
+}
+
 static inline time_t
 atomic_add_time_t(volatile time_t *ptr, time_t incr)
 {
@@ -123,6 +138,12 @@ atomic_exchange_u64(volatile uint64_t *ptr, uint64_t new)
   return  __sync_lock_test_and_set(ptr, new);
 }
 
+static inline int
+atomic_exchange_s64(volatile int64_t *ptr, int64_t new)
+{
+  return  __sync_lock_test_and_set(ptr, new);
+}
+
 static inline int
 atomic_exchange_time_t(volatile time_t *ptr, int new)
 {
index d7302daa0c90822f6bc8deb348cf71a4a9e213ec..33be5f8c82c68a9b01bdeb9c931a8fbf5c137774 100644 (file)
@@ -2459,7 +2459,7 @@ htsp_method_skip(htsp_connection_t *htsp, htsmsg_t *in)
   memset(&skip, 0, sizeof(skip));
   if(!htsmsg_get_s64(in, "time", &s64)) {
     skip.type = abs ? SMT_SKIP_ABS_TIME : SMT_SKIP_REL_TIME;
-    skip.time = hs->hs_90khz ? s64 : ts_rescale_i(s64, 1000000);
+    skip.time = hs->hs_90khz ? s64 : ts_rescale_inv(s64, 1000000);
     tvhtrace("htsp-sub", "skip: %s %"PRId64" (%s)", abs ? "abs" : "rel",
              skip.time, hs->hs_90khz ? "90kHz" : "1MHz");
   } else if (!htsmsg_get_s64(in, "size", &s64)) {
@@ -4005,6 +4005,26 @@ htsp_subscription_speed(htsp_subscription_t *hs, int speed)
   htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0);
 }
 
+/**
+ *
+ */
+#if ENABLE_TIMESHIFT
+static void
+htsp_subscription_timeshift_status(htsp_subscription_t *hs, timeshift_status_t *status)
+{
+  htsmsg_t *m = htsmsg_create_map();
+  htsmsg_add_str(m, "method", "timeshiftStatus");
+  htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
+  htsmsg_add_u32(m, "full", status->full);
+  htsmsg_add_s64(m, "shift", hs->hs_90khz ? status->shift : ts_rescale(status->shift, 1000000));
+  if (status->pts_start != PTS_UNSET)
+    htsmsg_add_s64(m, "start", hs->hs_90khz ? status->pts_start : ts_rescale(status->pts_start, 1000000)) ;
+  if (status->pts_end != PTS_UNSET)
+    htsmsg_add_s64(m, "end", hs->hs_90khz ? status->pts_end : ts_rescale(status->pts_end, 1000000)) ;
+  htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0);
+}
+#endif
+
 /**
  *
  */
@@ -4017,8 +4037,10 @@ htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip)
   htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
 
   /* Flush pkt buffers */
-  if (skip->type != SMT_SKIP_ERROR)
+  if (skip->type != SMT_SKIP_ERROR) {
     htsp_flush_queue(hs->hs_htsp, &hs->hs_q, 0);
+    htsp_subscription_timeshift_status(hs, &skip->timeshift);
+  }
 
   if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_ABS_SIZE)
     htsmsg_add_u32(m, "absolute", 1);
@@ -4031,26 +4053,6 @@ htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip)
   htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0);
 }
 
-/**
- *
- */
-#if ENABLE_TIMESHIFT
-static void
-htsp_subscription_timeshift_status(htsp_subscription_t *hs, timeshift_status_t *status)
-{
-  htsmsg_t *m = htsmsg_create_map();
-  htsmsg_add_str(m, "method", "timeshiftStatus");
-  htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
-  htsmsg_add_u32(m, "full", status->full);
-  htsmsg_add_s64(m, "shift", hs->hs_90khz ? status->shift : ts_rescale(status->shift, 1000000));
-  if (status->pts_start != PTS_UNSET)
-    htsmsg_add_s64(m, "start", hs->hs_90khz ? status->pts_start : ts_rescale(status->pts_start, 1000000)) ;
-  if (status->pts_end != PTS_UNSET)
-    htsmsg_add_s64(m, "end", hs->hs_90khz ? status->pts_end : ts_rescale(status->pts_end, 1000000)) ;
-  htsp_send_subscription(hs->hs_htsp, m, NULL, hs, 0);
-}
-#endif
-
 /**
  *
  */
index 39550bdf69ee4f599cd4f7348b214392f7e2b934..bd5d1865aa2b360c1df7c88df2d3540169ad5973 100644 (file)
@@ -24,6 +24,7 @@
 #include "settings.h"
 #include "atomic.h"
 #include "access.h"
+#include "atomic.h"
 
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -205,34 +206,80 @@ const idclass_t timeshift_conf_class = {
 };
 
 /*
- * Decode initial time diff
- *
- * Gather some packets and select the lowest pts to identify
- * the correct start. Note that for timeshift, the tsfix
- * stream plugin is applied, so the starting pts should be
- * near zero. If not - it's a bug.
+ * Process a packet with time sorting
  */
+
+#define MAX_TIME_DELTA (2*1000000) /* 2 seconds */
+#define BACKLOG_COUNT ARRAY_SIZE(timeshift_t->backlog)
+
 static void
-timeshift_set_pts_delta ( timeshift_t *ts, int64_t pts )
+timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm )
 {
+  th_pkt_t *pkt = sm->sm_data;
+  tvhtrace("timeshift",
+           "ts %d pkt buf - stream %d type %c pts %10"PRId64
+           " dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
+           ts->id,
+           pkt->pkt_componentindex,
+           pkt_frametype_to_char(pkt->pkt_frametype),
+           ts_rescale(pkt->pkt_pts, 1000000),
+           ts_rescale(pkt->pkt_dts, 1000000),
+           pkt->pkt_duration,
+           pktbuf_len(pkt->pkt_payload),
+           sm->sm_time);
+  streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
+}
+
+static void
+timeshift_packet_flush ( timeshift_t *ts, int64_t time )
+{
+  streaming_message_t *lowest, *sm;
+  struct streaming_message_queue *sq;
   int i;
-  int64_t smallest = INT64_MAX;
 
-  if (pts == PTS_UNSET)
+  while (1) {
+    lowest = NULL;
+    for (i = 0; i < ts->backlog_max; i++) {
+      sq = &ts->backlog[i];
+      sm = TAILQ_FIRST(sq);
+      if (sm && sm->sm_time + MAX_TIME_DELTA < time)
+        if (lowest == NULL || lowest->sm_time > sm->sm_time)
+          lowest = sm;
+    }
+    if (!lowest)
+      break;
+    TAILQ_REMOVE(sq, lowest, sm_link);
+    timeshift_packet_deliver(ts, lowest);
+  }
+}
+
+static void
+timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
+{
+  streaming_message_t *sm;
+  int64_t time;
+
+  if (pkt->pkt_componentindex >= TIMESHIFT_BACKLOG_MAX) {
+    pkt_ref_dec(pkt);
     return;
+  }
 
-  for (i = 0; i < ARRAY_SIZE(ts->pts_val); i++) {
-    int64_t i64 = ts->pts_val[i];
-    if (i64 == PTS_UNSET) {
-      ts->pts_val[i] = pts;
-      break;
-    }
-    if (i64 < smallest)
-      smallest = i64;
+  sm = streaming_msg_create_pkt(pkt);
+
+  time = ts_rescale(pkt->pkt_pts, 1000000);
+  if (time > ts->last_time) {
+    atomic_exchange_s64(&ts->last_time, time);
+    timeshift_packet_flush(ts, time);
   }
 
-  if (i >= ARRAY_SIZE(ts->pts_val))
-    ts->pts_delta = getmonoclock() - ts_rescale(smallest, 1000000);
+  sm->sm_time = time;
+  if (time + MAX_TIME_DELTA < ts->last_time) {
+    timeshift_packet_deliver(ts, sm);
+  } else {
+    if (pkt->pkt_componentindex >= ts->backlog_max)
+      ts->backlog_max = pkt->pkt_componentindex + 1;
+    TAILQ_INSERT_TAIL(&ts->backlog[pkt->pkt_componentindex], sm, sm_link);
+  }
 }
 
 /*
@@ -281,25 +328,26 @@ static void timeshift_input
         (sm->sm_type == SMT_STOP && sm->sm_code == 0))
       exit = 1;
 
-    /* Record (one-off) PTS delta */
-    if (sm->sm_type == SMT_PACKET && ts->pts_delta == 0)
-      timeshift_set_pts_delta(ts, pkt->pkt_pts);
+    if (sm->sm_type == SMT_MPEGTS)
+      ts->packet_mode = 0;
 
     /* Buffer to disk */
     if ((ts->state > TS_LIVE) || (!ts->ondemand && (ts->state == TS_LIVE))) {
-      sm->sm_time = getmonoclock();
-      if (sm->sm_type == SMT_PACKET) {
-        tvhtrace("timeshift",
-                 "ts %d pkt buf - stream %d type %c pts %10"PRId64
-                 " dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
-                 ts->id,
-                 pkt->pkt_componentindex,
-                 pkt_frametype_to_char(pkt->pkt_frametype),
-                 ts_rescale(pkt->pkt_pts, 1000000),
-                 ts_rescale(pkt->pkt_dts, 1000000),
-                 pkt->pkt_duration,
-                 pktbuf_len(pkt->pkt_payload),
-                 sm->sm_time - ts->pts_delta);
+      if (ts->packet_mode) {
+        sm->sm_time = ts->last_time;
+        if (sm->sm_type == SMT_PACKET) {
+          timeshift_packet(ts, pkt);
+          sm->sm_data = NULL;
+          streaming_msg_free(sm);
+          goto pktcont;
+        }
+      } else {
+        if (ts->ref_time == 0) {
+          ts->ref_time = getmonoclock();
+          sm->sm_time = 0;
+        } else {
+          sm->sm_time = getmonoclock() - ts->ref_time;
+        }
       }
       streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
     } else {
@@ -317,6 +365,7 @@ static void timeshift_input
       }
       streaming_msg_free(sm);
     }
+pktcont:
 
     /* Exit/Stop */
     if (exit) {
@@ -336,6 +385,7 @@ timeshift_destroy(streaming_target_t *pad)
 {
   timeshift_t *ts = (timeshift_t*)pad;
   streaming_message_t *sm;
+  int i;
 
   /* Must hold global lock */
   lock_assert(&global_lock);
@@ -355,6 +405,8 @@ timeshift_destroy(streaming_target_t *pad)
 
   /* Shut stuff down */
   streaming_queue_deinit(&ts->wr_queue);
+  for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++)
+    streaming_queue_clear(&ts->backlog[i]);
 
   close(ts->rd_pipe.rd);
   close(ts->rd_pipe.wr);
@@ -396,9 +448,11 @@ streaming_target_t *timeshift_create
   ts->vididx     = -1;
   ts->id         = timeshift_index;
   ts->ondemand   = timeshift_conf.ondemand;
-  ts->pts_delta  = 0;
-  for (i = 0; i < ARRAY_SIZE(ts->pts_val); i++)
-    ts->pts_val[i] = PTS_UNSET;
+  ts->packet_mode= 1;
+  ts->last_time  = 0;
+  ts->ref_time   = 0;
+  for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++)
+    TAILQ_INIT(&ts->backlog[i]);
   pthread_mutex_init(&ts->rdwr_mutex, NULL);
   pthread_mutex_init(&ts->state_mutex, NULL);
 
index d91c62ca469cccb82d051664313a4965fe3e78f1..baa2e932f0082c96cbf97b5be9832d5127a11f9d 100644 (file)
@@ -40,14 +40,6 @@ typedef struct timeshift_conf {
 extern struct timeshift_conf timeshift_conf;
 extern const idclass_t timeshift_conf_class;
 
-typedef struct timeshift_status
-{
-  int     full;
-  int64_t shift;
-  int64_t pts_start;
-  int64_t pts_end;
-} timeshift_status_t;
-
 void timeshift_init ( void );
 void timeshift_term ( void );
 
index b112b060e1d1ba3fb7b8091357e8658fa7bed351..7cd409367c074ddeabd3b42759f573c37e40f6c6 100644 (file)
@@ -19,8 +19,9 @@
 #ifndef __TVH_TIMESHIFT_PRIVATE_H__
 #define __TVH_TIMESHIFT_PRIVATE_H__
 
-#define TIMESHIFT_PLAY_BUF     200000 // us to buffer in TX
-#define TIMESHIFT_FILE_PERIOD      60 // number of secs in each buffer file
+#define TIMESHIFT_PLAY_BUF         2000000 //< us to buffer in TX
+#define TIMESHIFT_FILE_PERIOD      60      //< number of secs in each buffer file
+#define TIMESHIFT_BACKLOG_MAX      16      //< maximum elementary streams
 
 /**
  * Indexes of import data in the stream
@@ -55,7 +56,7 @@ typedef struct timeshift_file
   int                           rfd;      ///< Read descriptor
   char                          *path;    ///< Full path to file
 
-  time_t                        time;     ///< Files coarse timestamp
+  int64_t                       time;     ///< Files coarse timestamp
   size_t                        size;     ///< Current file size;
   int64_t                       last;     ///< Latest timestamp
   off_t                         woff;     ///< Write offset
@@ -90,8 +91,11 @@ typedef struct timeshift {
   char                        *path;      ///< Directory containing buffer
   time_t                      max_time;   ///< Maximum period to shift
   int                         ondemand;   ///< Whether this is an on-demand timeshift
-  int64_t                     pts_delta;  ///< Delta between system clock and PTS
-  int64_t                     pts_val[6]; ///< Decision PTS values for multiple packets
+  int                         packet_mode;///< Packet mode (otherwise MPEG-TS data mode)
+  int64_t                     last_time;  ///< Last time in us (PTS conversion)
+  int64_t                     ref_time;   ///< Start time in us (monoclock)
+  struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting
+  int                         backlog_max;///< Maximum component index in backlog
 
   enum {
     TS_INIT,
@@ -153,7 +157,7 @@ void timeshift_filemgr_term     ( void );
 int  timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len );
 
 timeshift_file_t *timeshift_filemgr_get
-  ( timeshift_t *ts, int create );
+  ( timeshift_t *ts, int64_t start_time );
 timeshift_file_t *timeshift_filemgr_oldest
   ( timeshift_t *ts );
 timeshift_file_t *timeshift_filemgr_newest
index 6204232292ca07b821f961c66ea5e34cf0d81785..56f347eb659c521f54f6de009ee7a437cd274158 100644 (file)
@@ -210,13 +210,13 @@ void timeshift_filemgr_flush ( timeshift_t *ts, timeshift_file_t *end )
  *
  */
 static timeshift_file_t * timeshift_filemgr_file_init
-  ( timeshift_t *ts, time_t time )
+  ( timeshift_t *ts, int64_t start_time )
 {
   timeshift_file_t *tsf;
 
   tsf = calloc(1, sizeof(timeshift_file_t));
-  tsf->time     = time;
-  tsf->last     = getmonoclock();
+  tsf->time     = start_time / (1000000LL * TIMESHIFT_FILE_PERIOD);
+  tsf->last     = start_time;
   tsf->wfd      = -1;
   tsf->rfd      = -1;
   TAILQ_INIT(&tsf->iframes);
@@ -229,17 +229,16 @@ static timeshift_file_t * timeshift_filemgr_file_init
 /*
  * Get current / new file
  */
-timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
+timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int64_t start_time )
 {
   int fd;
-  struct timespec tp;
   timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp;
   timeshift_index_data_t *ti;
   char path[PATH_MAX];
-  time_t time;
+  int64_t time;
 
   /* Return last file */
-  if (!create)
+  if (start_time < 0)
     return timeshift_filemgr_newest(ts);
 
   /* No space */
@@ -247,10 +246,9 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
     return NULL;
 
   /* Store to file */
-  clock_gettime(CLOCK_MONOTONIC_COARSE, &tp);
-  time   = tp.tv_sec / TIMESHIFT_FILE_PERIOD;
   tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list);
-  if (!tsf_tl || tsf_tl->time != time ||
+  time = start_time / (1000000LL * TIMESHIFT_FILE_PERIOD);
+  if (!tsf_tl || tsf_tl->time < time ||
       (tsf_tl->ram && tsf_tl->woff >= timeshift_conf.ram_segment_size)) {
     tsf_hd = TAILQ_FIRST(&ts->files);
 
@@ -298,15 +296,15 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
       if (timeshift_conf.ram_size >= 8*1024*1024 &&
           atomic_pre_add_u64(&timeshift_total_ram_size, 0) <
             timeshift_conf.ram_size + (timeshift_conf.ram_segment_size / 2)) {
-        tsf_tmp = timeshift_filemgr_file_init(ts, time);
+        tsf_tmp = timeshift_filemgr_file_init(ts, start_time);
         tsf_tmp->ram_size = MIN(16*1024*1024, timeshift_conf.ram_segment_size);
         tsf_tmp->ram = malloc(tsf_tmp->ram_size);
         if (!tsf_tmp->ram) {
           free(tsf_tmp);
           tsf_tmp = NULL;
         } else {
-          tvhtrace("timeshift", "ts %d create RAM segment with %"PRId64" bytes (time %"PRItime_t")",
-                   ts->id, tsf_tmp->ram_size, time);
+          tvhtrace("timeshift", "ts %d create RAM segment with %"PRId64" bytes (time %"PRId64")",
+                   ts->id, tsf_tmp->ram_size, start_time);
         }
       }
       
@@ -319,10 +317,10 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
         }
 
         /* Create File */
-        snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, time);
+        snprintf(path, sizeof(path), "%s/tvh-%"PRId64, ts->path, start_time);
         tvhtrace("timeshift", "ts %d create file %s", ts->id, path);
         if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) {
-          tsf_tmp = timeshift_filemgr_file_init(ts, time);
+          tsf_tmp = timeshift_filemgr_file_init(ts, start_time);
           tsf_tmp->wfd = fd;
           tsf_tmp->path = strdup(path);
         }
index c6b5ce6013339fe2b8ec673347a53a0b207df559..405ab575aeaec92f27ffc56511ad38bc9ad50b83 100644 (file)
@@ -236,7 +236,7 @@ static timeshift_index_iframe_t *_timeshift_last_frame
 {
   int end;
   timeshift_index_iframe_t *tsi = NULL;
-  timeshift_file_t *tsf = timeshift_filemgr_get(ts, 0);
+  timeshift_file_t *tsf = timeshift_filemgr_get(ts, -1);
   while (tsf && !tsi) {
     if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list))) {
       tsf = timeshift_filemgr_prev(tsf, &end, 0);
@@ -317,7 +317,7 @@ static int _timeshift_skip
         tsf = tsf_last;
       end = -1;
     } else {
-      tsf = tsf_last = timeshift_filemgr_get(ts, 0);
+      tsf = tsf_last = timeshift_filemgr_newest(ts);
       tsi = NULL;
       while (tsf && !tsi) {
         tsf_last = tsf;
@@ -348,7 +348,7 @@ static int _timeshift_read
 {
   timeshift_file_t *tsf = *cur_file;
   ssize_t r;
-  off_t off, ooff;
+  off_t off;
 
   if (tsf) {
 
@@ -365,7 +365,6 @@ static int _timeshift_read
                  ts->id, tsf->path, (int64_t)tsf->roff, (int64_t)off, strerror(errno));
 
     /* Read msg */
-    ooff = tsf->roff;
     r = _read_msg(tsf, -1, sm);
     if (r < 0) {
       streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
@@ -376,20 +375,8 @@ static int _timeshift_read
     }
     tvhtrace("timeshift", "ts %d seek to %jd (fd %i) read msg %p (%"PRId64")", ts->id, (intmax_t)tsf->roff, tsf->rfd, *sm, (int64_t)r);
 
-    /* Incomplete */
-    if (r == 0) {
-      if (tsf->rfd >= 0) {
-        tvhtrace("timeshift", "ts %d seek to %jd (fd %i) (incomplete)", ts->id, (intmax_t)tsf->roff, tsf->rfd);
-        if ((off = lseek(tsf->rfd, ooff, SEEK_SET)) != ooff)
-          tvherror("timeshift", "seek to %s failed (off %"PRId64" != %"PRId64"): %s",
-                   tsf->path, (int64_t)ooff, (int64_t)off, strerror(errno));
-      }
-      tsf->roff = ooff;
-      return 0;
-    }
-
     /* Special case - EOF */
-    if (r == sizeof(size_t) || tsf->roff > tsf->size) {
+    if (r <= sizeof(size_t) || tsf->roff > tsf->size || *sm == NULL) {
       if (tsf->rfd >= 0)
         close(tsf->rfd);
       tsf->rfd  = -1;
@@ -398,6 +385,7 @@ static int _timeshift_read
       pthread_mutex_unlock(&ts->rdwr_mutex);
       tsf->roff = 0; // reset
       *wait     = 0;
+      tvhtrace("timeshift", "ts %d eof, cur_file %p", ts->id, *cur_file);
 
     /* Check SMT_START index */
     } else {
@@ -438,6 +426,62 @@ static int _timeshift_flush_to_live
   return 0;
 }
 
+/*
+ * Send the status message
+ */
+static void timeshift_fill_status
+  ( timeshift_t *ts, timeshift_status_t *status, int64_t current_time )
+{
+  timeshift_index_iframe_t *fst, *lst;
+  int64_t shift;
+
+  fst    = _timeshift_first_frame(ts);
+  lst    = _timeshift_last_frame(ts);
+  status->full  = ts->full;
+  tvhtrace("timeshift", "status last->time %"PRId64" current time %"PRId64" state %d",
+           lst ? lst->time : -1, current_time, ts->state);
+  shift  = lst ? ts_rescale_inv(lst->time - current_time, 1000000) : -1;
+  status->shift = (ts->state <= TS_LIVE || shift < 0 || !lst) ? 0 : shift;
+  if (lst && fst && lst != fst) {
+    status->pts_start = ts_rescale_inv(fst->time, 1000000);
+    status->pts_end   = ts_rescale_inv(lst->time, 1000000);
+  } else {
+    status->pts_start = PTS_UNSET;
+    status->pts_end   = PTS_UNSET;
+  }
+}
+
+static void timeshift_status
+  ( timeshift_t *ts, int64_t current_time )
+{
+  streaming_message_t *tsm;
+  timeshift_status_t *status;
+
+  status = calloc(1, sizeof(timeshift_status_t));
+  timeshift_fill_status(ts, status, current_time);
+  tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status);
+  streaming_target_deliver2(ts->output, tsm);
+}
+
+/*
+ * Trace packet
+ */
+static void timeshift_trace_pkt
+  ( timeshift_t *ts, streaming_message_t *sm )
+{
+  th_pkt_t *pkt = sm->sm_data;
+  tvhtrace("timeshift",
+           "ts %d pkt out - stream %d type %c pts %10"PRId64
+           " dts %10"PRId64 " dur %10d len %6zu time %14"PRItime_t,
+           ts->id,
+           pkt->pkt_componentindex,
+           pkt_frametype_to_char(pkt->pkt_frametype),
+           ts_rescale(pkt->pkt_pts, 1000000),
+           ts_rescale(pkt->pkt_dts, 1000000),
+           pkt->pkt_duration,
+           pktbuf_len(pkt->pkt_payload), sm->sm_time);
+}
+
 /* **************************************************************************
  * Thread
  * *************************************************************************/
@@ -452,12 +496,11 @@ void *timeshift_reader ( void *p )
   int nfds, end, run = 1, wait = -1;
   timeshift_file_t *cur_file = NULL;
   int cur_speed = 100, keyframe_mode = 0;
-  int64_t pause_time = 0, play_time = 0, last_time = 0;
-  int64_t now, deliver, skip_time = 0;
+  int64_t mono_now, mono_play_time = 0, mono_last_status = 0;
+  int64_t deliver, deliver0, pause_time = 0, last_time = 0, skip_time = 0;
   streaming_message_t *sm = NULL, *ctrl = NULL;
   timeshift_index_iframe_t *tsi = NULL;
   streaming_skip_t *skip = NULL;
-  time_t last_status = 0;
   tvhpoll_t *pd;
   tvhpoll_event_t ev = { 0 };
 
@@ -482,7 +525,7 @@ void *timeshift_reader ( void *p )
     wait      = -1;
     end       = 0;
     skip      = NULL;
-    now       = getmonoclock();
+    mono_now  = getmonoclock();
 
     /* Control */
     pthread_mutex_lock(&ts->state_mutex);
@@ -527,12 +570,19 @@ void *timeshift_reader ( void *p )
                        ts->id);
                 timeshift_writer_flush(ts);
                 pthread_mutex_lock(&ts->rdwr_mutex);
-                if ((cur_file    = timeshift_filemgr_get(ts, 1))) {
+                cur_file = timeshift_filemgr_newest(ts);
+                cur_file = timeshift_filemgr_get(ts, cur_file ? cur_file->last :
+                                                     atomic_add_s64(&ts->last_time, 0));
+                if (cur_file != NULL) {
                   cur_file->roff = cur_file->size;
                   pause_time     = cur_file->last;
                   last_time      = pause_time;
+                } else {
+                  last_time      = atomic_add_s64(&ts->last_time, 0);
                 }
                 pthread_mutex_unlock(&ts->rdwr_mutex);
+                mono_play_time = mono_now;
+                tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64, mono_now);
               }
 
             /* Buffer playback */
@@ -555,10 +605,11 @@ void *timeshift_reader ( void *p )
             }
 
             /* Update */
-            play_time  = getmonoclock();
             cur_speed  = speed;
-            if (speed != 100 || ts->state != TS_LIVE)
+            if (speed != 100 || ts->state != TS_LIVE) {
               ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
+              tvhtrace("timeshift", "reader - set %s", speed == 0 ? "TS_PAUSE" : "TS_PLAY");
+            }
             tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d",
                    ts->id, speed);
           }
@@ -594,11 +645,6 @@ void *timeshift_reader ( void *p )
               break;
 
             case SMT_SKIP_ABS_TIME:
-              if (ts->pts_delta == 0) {
-                tvhlog(LOG_ERR, "timeshift", "ts %d abs skip not possible no PTS delta", ts->id);
-                skip = NULL;
-                break;
-              }
               /* -fallthrough */
             case SMT_SKIP_REL_TIME:
 
@@ -609,7 +655,8 @@ void *timeshift_reader ( void *p )
               /* Live playback (stage1) */
               if (ts->state == TS_LIVE) {
                 pthread_mutex_lock(&ts->rdwr_mutex);
-                if ((cur_file    = timeshift_filemgr_get(ts, !ts->ondemand))) {
+                cur_file = timeshift_filemgr_newest(ts);
+                if (cur_file && (cur_file = timeshift_filemgr_get(ts, cur_file->last)) != NULL) {
                   cur_file->roff = cur_file->size;
                   last_time      = cur_file->last;
                 } else {
@@ -621,17 +668,18 @@ void *timeshift_reader ( void *p )
 
               /* May have failed */
               if (skip) {
-                skip_time += (skip->type == SMT_SKIP_ABS_TIME) ? ts->pts_delta : last_time;
-                tvhlog(LOG_DEBUG, "timeshift", "ts %d skip last_time %"PRId64" pts_delta %"PRId64,
-                       ts->id, skip_time - ts->pts_delta, ts->pts_delta);
+                if (skip->type == SMT_SKIP_REL_TIME)
+                  skip_time += last_time;
+                tvhlog(LOG_DEBUG, "timeshift", "ts %d skip time %"PRId64, ts->id, skip_time);
 
-               /* Live (stage2) */
+                /* Live (stage2) */
                 if (ts->state == TS_LIVE) {
-                  if (skip_time >= now) {
+                  if (skip_time >= atomic_add_s64(&ts->last_time, 0) - TIMESHIFT_PLAY_BUF) {
                     tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id);
                     skip = NULL;
                   } else {
                     ts->state = TS_PLAY;
+                    tvhtrace("timeshift", "reader - set TS_PLAY");
                   }
                 }
               }
@@ -640,8 +688,10 @@ void *timeshift_reader ( void *p )
               if (skip) {
 
                 /* Adjust time */
-                play_time  = now;
-                pause_time = skip_time;
+                if (mono_play_time != mono_now)
+                  tvhtrace("timeshift", "update play time skip - %"PRId64, mono_now);
+                mono_play_time = mono_now;
+                pause_time = atomic_add_s64(&ts->last_time, 0);
                 tsi        = NULL;
 
                 /* Clear existing packet */
@@ -671,38 +721,23 @@ void *timeshift_reader ( void *p )
       }
     }
 
-    /* Status message */
-    if (now >= (last_status + 1000000)) {
-      streaming_message_t *tsm;
-      timeshift_status_t *status;
-      timeshift_index_iframe_t *fst, *lst;
-      status = calloc(1, sizeof(timeshift_status_t));
-      fst    = _timeshift_first_frame(ts);
-      lst    = _timeshift_last_frame(ts);
-      status->full  = ts->full;
-      status->shift = ts->state <= TS_LIVE ? 0 : ts_rescale_i(now - last_time, 1000000);
-      if (lst && fst && lst != fst && ts->pts_delta != PTS_UNSET) {
-        status->pts_start = ts_rescale_i(fst->time - ts->pts_delta, 1000000);
-        status->pts_end   = ts_rescale_i(lst->time - ts->pts_delta, 1000000);
-      } else {
-        status->pts_start = PTS_UNSET;
-        status->pts_end   = PTS_UNSET;
-      }
-      tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status);
-      streaming_target_deliver2(ts->output, tsm);
-      last_status = now;
-    }
 
     /* Done */
     if (!run || !cur_file || ((ts->state != TS_PLAY && !skip))) {
+      if (mono_now >= (mono_last_status + 1000000)) {
+        timeshift_status(ts, last_time);
+        mono_last_status = mono_now;
+      }
       pthread_mutex_unlock(&ts->state_mutex);
       continue;
     }
 
     /* Calculate delivery time */
-    deliver = (now - play_time) + TIMESHIFT_PLAY_BUF;
-    deliver = (deliver * cur_speed) / 100;
+    deliver0 = (mono_now - mono_play_time) + TIMESHIFT_PLAY_BUF;
+    deliver = (deliver0 * cur_speed) / 100;
     deliver = (deliver + pause_time);
+    tvhtrace("timeshift", "speed %d now %"PRId64" play_time %"PRId64" deliver %"PRId64" deliver0 %"PRId64,
+             cur_speed, mono_now, mono_play_time, deliver, deliver0);
 
     /* Determine next packet */
     if (!sm) {
@@ -718,7 +753,7 @@ void *timeshift_reader ( void *p )
         else
           req_time = skip_time;
         tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to %"PRId64" from %"PRId64,
-               ts->id, req_time - ts->pts_delta, last_time - ts->pts_delta);
+               ts->id, req_time, last_time);
 
         /* Find */
         pthread_mutex_lock(&ts->rdwr_mutex);
@@ -727,7 +762,7 @@ void *timeshift_reader ( void *p )
         pthread_mutex_unlock(&ts->rdwr_mutex);
         if (tsi)
           tvhlog(LOG_DEBUG, "timeshift", "ts %d skip found pkt @ %"PRId64,
-                 ts->id, tsi->time - ts->pts_delta);
+                 ts->id, tsi->time);
 
         /* File changed (close) */
         if ((tsf != cur_file) && cur_file && cur_file->rfd >= 0) {
@@ -755,12 +790,14 @@ void *timeshift_reader ( void *p )
 
     /* Send skip response */
     if (skip) {
-      if (sm && sm->sm_type == SMT_PACKET) {
-        th_pkt_t *pkt = sm->sm_data;
-        skip->time = pkt->pkt_pts;
+      if (sm) {
+        /* Status message */
+        skip->time = ts_rescale_inv(sm->sm_time, 1000000);
         skip->type = SMT_SKIP_ABS_TIME;
-        tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to pts %"PRId64" ok, time %"PRId64,
-               ts->id, ts_rescale(skip->time, 1000000), sm->sm_time - ts->pts_delta);
+        tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to pts %"PRId64" ok", ts->id, sm->sm_time);
+        /* Update timeshift status */
+        timeshift_fill_status(ts, &skip->timeshift, sm->sm_time);
+        mono_last_status = mono_now;
       } else {
         /* Report error */
         skip->type = SMT_SKIP_ERROR;
@@ -774,33 +811,30 @@ void *timeshift_reader ( void *p )
     /* Deliver */
     if (sm && (skip ||
                (((cur_speed < 0) && (sm->sm_time >= deliver)) ||
-               ((cur_speed > 0) && (sm->sm_time <= deliver))))) {
-
-      if (sm->sm_type == SMT_PACKET && tvhtrace_enabled()) {
-        th_pkt_t *pkt = sm->sm_data;
-        tvhtrace("timeshift",
-                 "ts %d pkt out - stream %d type %c pts %10"PRId64
-                 " dts %10"PRId64 " dur %10d len %6zu time %14"PRItime_t,
-                 ts->id,
-                 pkt->pkt_componentindex,
-                 pkt_frametype_to_char(pkt->pkt_frametype),
-                 ts_rescale(pkt->pkt_pts, 1000000),
-                 ts_rescale(pkt->pkt_dts, 1000000),
-                 pkt->pkt_duration,
-                 pktbuf_len(pkt->pkt_payload), sm->sm_time - ts->pts_delta);
-      }
+                ((cur_speed > 0) && (sm->sm_time <= deliver))))) {
+
+      if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
+        timeshift_trace_pkt(ts, sm);
       last_time = sm->sm_time;
       streaming_target_deliver2(ts->output, sm);
       sm        = NULL;
       wait      = 0;
+
     } else if (sm) {
+
       if (cur_speed > 0)
         wait = (sm->sm_time - deliver) / 1000;
       else
         wait = (deliver - sm->sm_time) / 1000;
       if (wait == 0) wait = 1;
-      tvhtrace("timeshift", "ts %d wait %d",
-               ts->id, wait);
+      tvhtrace("timeshift", "ts %d wait %d", ts->id, wait);
+
+    }
+
+    /* Periodic timeshift status */
+    if (mono_now >= (mono_last_status + 1000000)) {
+      timeshift_status(ts, last_time);
+      mono_last_status = mono_now;
     }
 
     /* Terminate */
@@ -811,11 +845,12 @@ void *timeshift_reader ( void *p )
       /* Back to live (unless buffer is full) */
       if (end == 1 && !ts->full) {
         tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id);
-        ts->state = TS_LIVE;
         cur_speed = 100;
         ctrl      = streaming_msg_create_code(SMT_SPEED, cur_speed);
         streaming_target_deliver2(ts->output, ctrl);
         ctrl      = NULL;
+        tvhtrace("timeshift", "reader - set TS_LIVE");
+        ts->state = TS_LIVE;
 
         /* Flush timeshift buffer to live */
         if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1)
@@ -835,13 +870,17 @@ void *timeshift_reader ( void *p )
       } else {
         if (cur_speed <= 0) {
           cur_speed = 0;
+          tvhtrace("timeshift", "reader - set TS_PAUSE");
           ts->state = TS_PAUSE;
         } else {
           cur_speed = 100;
+          tvhtrace("timeshift", "reader - set TS_PLAY");
           ts->state = TS_PLAY;
-          play_time = now;
+          if (mono_play_time != mono_now)
+            tvhtrace("timeshift", "update play time (pause) - %"PRId64, mono_now);
+          mono_play_time = mono_now;
         }
-        tvhlog(LOG_DEBUG, "timeshift", "ts %d sob speed %d", ts->id, cur_speed);
+        tvhlog(LOG_DEBUG, "timeshift", "ts %d sob speed %d last time %"PRId64, ts->id, cur_speed, last_time);
         pause_time = last_time;
         ctrl       = streaming_msg_create_code(SMT_SPEED, cur_speed);
         streaming_target_deliver2(ts->output, ctrl);
index b27fe5143068651fd5c5352e48366d151e94df49..40355bcf081d749d07dbbd9dea86d9d405573433 100644 (file)
@@ -320,7 +320,7 @@ static void _process_msg
     case SMT_MPEGTS:
     case SMT_PACKET:
       pthread_mutex_lock(&ts->rdwr_mutex);
-      if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->wfd >= 0 || tsf->ram)) {
+      if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) {
         if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
           timeshift_filemgr_close(tsf);
           tsf->bad = 1;
index 35ca3344d5a5de782b2ff93c833b09138ee63f4e..5cbb58063e7f0fc9ab42d5f0629ad86ce37657b5 100644 (file)
@@ -309,6 +309,17 @@ typedef struct descramble_info {
   char protocol  [128];
 } descramble_info_t;
 
+/**
+ *
+ */
+typedef struct timeshift_status
+{
+  int     full;
+  int64_t shift;
+  int64_t pts_start;
+  int64_t pts_end;
+} timeshift_status_t;
+
 /**
  * Streaming skip
  */
@@ -326,6 +337,9 @@ typedef struct streaming_skip
     off_t   size;
     int64_t time;
   };
+#if ENABLE_TIMESHIFT
+  timeshift_status_t timeshift;
+#endif
 } streaming_skip_t;
 
 
@@ -700,7 +714,7 @@ static inline int64_t ts_rescale(int64_t ts, int tb)
   return (ts * tb ) / 90000LL;
 }
 
-static inline int64_t ts_rescale_i(int64_t ts, int tb)
+static inline int64_t ts_rescale_inv(int64_t ts, int tb)
 {
   return (ts * 90000LL) / tb;
 }