]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
timeshift: implement timeshift to RAM, fixes #2626
authorJaroslav Kysela <perex@perex.cz>
Mon, 19 Jan 2015 20:08:15 +0000 (21:08 +0100)
committerJaroslav Kysela <perex@perex.cz>
Mon, 19 Jan 2015 20:08:15 +0000 (21:08 +0100)
docs/html/config_timeshift.html
src/timeshift.c
src/timeshift.h
src/timeshift/private.h
src/timeshift/timeshift_filemgr.c
src/timeshift/timeshift_reader.c
src/timeshift/timeshift_writer.c
src/webui/extjs.c
src/webui/static/app/timeshift.js

index 6f20cbdc63d32dd7836e3dbe82f78e077237a286..84821950002455e415df4b664b67eff9f169cba1 100644 (file)
       specify an unlimited period its highly recommended you specifying a value
       here.
 
+  <dt>Max. RAM Size (MegaBytes)
+  <dd>Specifies the maximum RAM (system memory) size for timeshift buffers.
+      When free RAM buffers are available, they are used instead storage to
+      save the timeshift data.
+
   <dt>Unlimited:
   <dd>If checked, this allows the combined size of all timeshift buffers to
       potentially grow unbounded until your storage media runs out of space
index c677531243502072d5540064cddcb15a5ebecc26..70de729e3ef1863bbcbf47975a7c15be43e6b026 100644 (file)
@@ -41,6 +41,8 @@ int       timeshift_unlimited_period;
 uint32_t  timeshift_max_period;
 int       timeshift_unlimited_size;
 uint64_t  timeshift_max_size;
+uint64_t  timeshift_ram_size;
+uint64_t  timeshift_ram_segment_size;
 
 /*
  * Intialise global file manager
@@ -61,6 +63,8 @@ void timeshift_init ( void )
   timeshift_max_period       = 3600;                    // 1Hr
   timeshift_unlimited_size   = 0;
   timeshift_max_size         = 10000 * (size_t)1048576; // 10G
+  timeshift_ram_size         = 0;
+  timeshift_ram_segment_size = 0;
 
   /* Load settings */
   if ((m = hts_settings_load("timeshift/config"))) {
@@ -77,6 +81,10 @@ void timeshift_init ( void )
       timeshift_unlimited_size = u32 ? 1 : 0;
     if (!htsmsg_get_u32(m, "max_size", &u32))
       timeshift_max_size = 1048576LL * u32;
+    if (!htsmsg_get_u32(m, "ram_size", &u32)) {
+      timeshift_ram_size = 1048576LL * u32;
+      timeshift_ram_segment_size = timeshift_ram_size / 10;
+    }
     htsmsg_destroy(m);
   }
 }
@@ -107,6 +115,7 @@ void timeshift_save ( void )
   htsmsg_add_u32(m, "max_period", timeshift_max_period);
   htsmsg_add_u32(m, "unlimited_size", timeshift_unlimited_size);
   htsmsg_add_u32(m, "max_size", timeshift_max_size / 1048576);
+  htsmsg_add_u32(m, "ram_size", timeshift_ram_size / 1048576);
 
   hts_settings_save(m, "timeshift/config");
 }
index db1736256ec1669d7fab13095af1446e2e550d39..908fceaee19d1e7f5a4e007dfc78b544a5d18cd4 100644 (file)
@@ -27,6 +27,9 @@ extern uint32_t  timeshift_max_period;
 extern int       timeshift_unlimited_size;
 extern uint64_t  timeshift_max_size;
 extern uint64_t  timeshift_total_size;
+extern uint64_t  timeshift_ram_size;
+extern uint64_t  timeshift_ram_segment_size;
+extern uint64_t  timeshift_total_ram_size;
 
 typedef struct timeshift_status
 {
index db72d402782c141b29b94488d5ca8931ea43e2ce..5f193fbc0af866d99ba1b49178ad45f5c8549c58 100644 (file)
@@ -51,12 +51,18 @@ typedef TAILQ_HEAD(timeshift_index_data_list,timeshift_index_data) timeshift_ind
  */
 typedef struct timeshift_file
 {
-  int                           fd;       ///< Write descriptor
+  int                           wfd;      ///< Write descriptor
+  int                           rfd;      ///< Read descriptor
   char                          *path;    ///< Full path to file
 
   time_t                        time;     ///< Files coarse timestamp
   size_t                        size;     ///< Current file size;
   int64_t                       last;     ///< Latest timestamp
+  off_t                         woff;     ///< Write offset
+  off_t                         roff;     ///< Read offset
+
+  uint8_t                      *ram;      ///< RAM area
+  int64_t                       ram_size; ///< RAM area size in bytes
 
   uint8_t                       bad;      ///< File is broken
 
@@ -66,6 +72,8 @@ typedef struct timeshift_file
   timeshift_index_data_list_t   sstart;   ///< Stream start messages
 
   TAILQ_ENTRY(timeshift_file) link;     ///< List entry
+
+  pthread_mutex_t               ram_lock; ///< Mutex for the ram array access
 } timeshift_file_t;
 
 typedef TAILQ_HEAD(timeshift_file_list,timeshift_file) timeshift_file_list_t;
@@ -113,15 +121,15 @@ typedef struct timeshift {
 /*
  * Write functions
  */
-ssize_t timeshift_write_start   ( int fd, int64_t time, streaming_start_t *ss );
-ssize_t timeshift_write_sigstat ( int fd, int64_t time, signal_status_t *ss );
-ssize_t timeshift_write_packet  ( int fd, int64_t time, th_pkt_t *pkt );
-ssize_t timeshift_write_mpegts  ( int fd, int64_t time, void *data );
+ssize_t timeshift_write_start   ( timeshift_file_t *tsf, int64_t time, streaming_start_t *ss );
+ssize_t timeshift_write_sigstat ( timeshift_file_t *tsf, int64_t time, signal_status_t *ss );
+ssize_t timeshift_write_packet  ( timeshift_file_t *tsf, int64_t time, th_pkt_t *pkt );
+ssize_t timeshift_write_mpegts  ( timeshift_file_t *tsf, int64_t time, void *data );
 ssize_t timeshift_write_skip    ( int fd, streaming_skip_t *skip );
 ssize_t timeshift_write_speed   ( int fd, int speed );
 ssize_t timeshift_write_stop    ( int fd, int code );
 ssize_t timeshift_write_exit    ( int fd );
-ssize_t timeshift_write_eof     ( int fd );
+ssize_t timeshift_write_eof     ( timeshift_file_t *tsf );
 
 void timeshift_writer_flush ( timeshift_t *ts );
 
index 2cf8e00ca7aea7115af48278d445120657c91b98..33cf39b63c50030056ce413a138f929b1df31991 100644 (file)
@@ -39,6 +39,7 @@ static pthread_mutex_t       timeshift_reaper_lock;
 static pthread_cond_t        timeshift_reaper_cond;
 
 uint64_t                     timeshift_total_size;
+uint64_t                     timeshift_total_ram_size;
 
 /* **************************************************************************
  * File reaper thread
@@ -63,15 +64,19 @@ static void* timeshift_reaper_callback ( void *p )
     TAILQ_REMOVE(&timeshift_reaper_list, tsf, link);
     pthread_mutex_unlock(&timeshift_reaper_lock);
 
-    tvhtrace("timeshift", "remove file %s", tsf->path);
-
-    /* Remove */
-    unlink(tsf->path);
-    dpath = dirname(tsf->path);
-    if (rmdir(dpath) == -1)
-      if (errno != ENOTEMPTY)
-        tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]",
-               dpath, strerror(errno));
+    if (tsf->path) {
+      tvhtrace("timeshift", "remove file %s", tsf->path);
+
+      /* Remove */
+      unlink(tsf->path);
+      dpath = dirname(tsf->path);
+      if (rmdir(dpath) == -1)
+        if (errno != ENOTEMPTY)
+          tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]",
+                 dpath, strerror(errno));
+    } else {
+      tvhtrace("timeshift", "remove RAM segment (time %li)", (long)tsf->time);
+    }
 
     /* Free memory */
     while ((ti = TAILQ_FIRST(&tsf->iframes))) {
@@ -85,6 +90,7 @@ static void* timeshift_reaper_callback ( void *p )
       free(tid);
     }
     free(tsf->path);
+    free(tsf->ram);
     free(tsf);
 
     pthread_mutex_lock(&timeshift_reaper_lock);
@@ -96,7 +102,12 @@ static void* timeshift_reaper_callback ( void *p )
 
 static void timeshift_reaper_remove ( timeshift_file_t *tsf )
 {
-  tvhtrace("timeshift", "queue file for removal %s", tsf->path);
+#if ENABLE_TRACE
+  if (tsf->path)
+    tvhtrace("timeshift", "queue file for removal %s", tsf->path);
+  else
+    tvhtrace("timeshift", "queue file for removal - RAM segment time %li", (long)tsf->time);
+#endif
   pthread_mutex_lock(&timeshift_reaper_lock);
   TAILQ_INSERT_TAIL(&timeshift_reaper_list, tsf, link);
   pthread_cond_signal(&timeshift_reaper_cond);
@@ -140,14 +151,17 @@ int timeshift_filemgr_makedirs ( int index, char *buf, size_t len )
  */
 void timeshift_filemgr_close ( timeshift_file_t *tsf )
 {
-  ssize_t r = timeshift_write_eof(tsf->fd);
+  ssize_t r = timeshift_write_eof(tsf);
   if (r > 0)
   {
     tsf->size += r;
     atomic_add_u64(&timeshift_total_size, r);
+    if (tsf->ram)
+      atomic_add_u64(&timeshift_total_ram_size, r);
   }
-  close(tsf->fd);
-  tsf->fd = -1;
+  if (tsf->wfd >= 0)
+    close(tsf->wfd);
+  tsf->wfd = -1;
 }
 
 /*
@@ -156,11 +170,19 @@ void timeshift_filemgr_close ( timeshift_file_t *tsf )
 void timeshift_filemgr_remove
   ( timeshift_t *ts, timeshift_file_t *tsf, int force )
 {
-  if (tsf->fd != -1)
-    close(tsf->fd);
-  tvhlog(LOG_DEBUG, "timeshift", "ts %d remove %s", ts->id, tsf->path);
+  if (tsf->wfd >= 0)
+    close(tsf->wfd);
+  assert(tsf->rfd < 0);
+#if ENABLE_TRACE
+  if (tsf->path)
+    tvhdebug("timeshift", "ts %d remove %s", ts->id, tsf->path);
+  else
+    tvhdebug("timeshift", "ts %d RAM segment remove time %li", ts->id, (long)tsf->time);
+#endif
   TAILQ_REMOVE(&ts->files, tsf, link);
   atomic_add_u64(&timeshift_total_size, -tsf->size);
+  if (tsf->ram)
+    atomic_add_u64(&timeshift_total_ram_size, -tsf->size);
   timeshift_reaper_remove(tsf);
 }
 
@@ -176,6 +198,26 @@ void timeshift_filemgr_flush ( timeshift_t *ts, timeshift_file_t *end )
   }
 }
 
+/*
+ *
+ */
+static timeshift_file_t * timeshift_filemgr_file_init
+  ( timeshift_t *ts, time_t time )
+{
+  timeshift_file_t *tsf;
+
+  tsf = calloc(1, sizeof(timeshift_file_t));
+  tsf->time     = time;
+  tsf->last     = getmonoclock();
+  tsf->wfd      = -1;
+  tsf->rfd      = -1;
+  TAILQ_INIT(&tsf->iframes);
+  TAILQ_INIT(&tsf->sstart);
+  TAILQ_INSERT_TAIL(&ts->files, tsf, link);
+  pthread_mutex_init(&tsf->ram_lock, NULL);
+  return tsf;
+}
+
 /*
  * Get current / new file
  */
@@ -185,7 +227,7 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
   struct timespec tp;
   timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp;
   timeshift_index_data_t *ti;
-  char path[512];
+  char path[PATH_MAX];
   time_t time;
 
   /* Return last file */
@@ -200,11 +242,12 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
   clock_gettime(CLOCK_MONOTONIC_COARSE, &tp);
   time   = tp.tv_sec / TIMESHIFT_FILE_PERIOD;
   tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list);
-  if (!tsf_tl || tsf_tl->time != time) {
+  if (!tsf_tl || tsf_tl->time != time ||
+      (tsf_tl->ram && tsf_tl->woff >= timeshift_ram_segment_size)) {
     tsf_hd = TAILQ_FIRST(&ts->files);
 
     /* Close existing */
-    if (tsf_tl && tsf_tl->fd != -1)
+    if (tsf_tl)
       timeshift_filemgr_close(tsf_tl);
 
     /* Check period */
@@ -236,32 +279,48 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
         ts->full = 1;
       }
     }
-      
+
     /* Create new file */
     tsf_tmp = NULL;
     if (!ts->full) {
 
-      /* Create directories */
-      if (!ts->path) {
-        if (timeshift_filemgr_makedirs(ts->id, path, sizeof(path)))
-          return NULL;
-        ts->path = strdup(path);
+      tvhtrace("timeshift", "ts %d RAM total %"PRId64" requested %"PRId64" segment %"PRId64,
+                   ts->id, atomic_pre_add_u64(&timeshift_total_ram_size, 0),
+                   timeshift_ram_size, timeshift_ram_segment_size);
+      if (timeshift_ram_size >= 8*1024*1024 &&
+          atomic_pre_add_u64(&timeshift_total_ram_size, 0) <
+            timeshift_ram_size + (timeshift_ram_segment_size / 2)) {
+        tsf_tmp = timeshift_filemgr_file_init(ts, time);
+        tsf_tmp->ram_size = MIN(16*1024*1024, timeshift_ram_segment_size);
+        tsf_tmp->ram = malloc(tsf_tmp->ram_size);
+        if (!tsf_tmp->ram) {
+          free(tsf_tmp);
+          tsf_tmp = NULL;
+        } else {
+          tvhtrace("timeshift", "ts %d create RAM segment with %"PRId64" bytes (time %li)",
+                   ts->id, tsf_tmp->ram_size, (long)time);
+        }
       }
+      
+      if (!tsf_tmp) {
+        /* Create directories */
+        if (!ts->path) {
+          if (timeshift_filemgr_makedirs(ts->id, path, sizeof(path)))
+            return NULL;
+          ts->path = strdup(path);
+        }
 
-      /* Create File */
-      snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, time);
-      tvhtrace("timeshift", "ts %d create file %s", ts->id, path);
-      if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) {
-        tsf_tmp = calloc(1, sizeof(timeshift_file_t));
-        tsf_tmp->time     = time;
-        tsf_tmp->fd       = fd;
-        tsf_tmp->path     = strdup(path);
-        tsf_tmp->refcount = 0;
-        tsf_tmp->last     = getmonoclock();
-        TAILQ_INIT(&tsf_tmp->iframes);
-        TAILQ_INIT(&tsf_tmp->sstart);
-        TAILQ_INSERT_TAIL(&ts->files, tsf_tmp, link);
+        /* Create File */
+        snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, time);
+        tvhtrace("timeshift", "ts %d create file %s", ts->id, path);
+        if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) {
+          tsf_tmp = timeshift_filemgr_file_init(ts, time);
+          tsf_tmp->wfd = fd;
+          tsf_tmp->path = strdup(path);
+        }
+      }
 
+      if (tsf_tmp) {
         /* Copy across last start message */
         if (tsf_tl && (ti = TAILQ_LAST(&tsf_tl->sstart, timeshift_index_data_list))) {
           tvhtrace("timeshift", "ts %d copy smt_start to new file",
@@ -343,6 +402,7 @@ void timeshift_filemgr_init ( void )
 
   /* Size processing */
   timeshift_total_size = 0;
+  timeshift_ram_size   = 0;
 
   /* Start the reaper thread */
   timeshift_reaper_run = 1;
@@ -371,5 +431,3 @@ void timeshift_filemgr_term ( void )
   if (!timeshift_filemgr_get_root(path, sizeof(path)))
     rmtree(path);
 }
-
-
index 58d744ebaf171abdc29c70e7ad8a9dcdd09c6698..d83ec7fb68007547a39f9cc0e578f823a7d206d7 100644 (file)
  * File Reading
  * *************************************************************************/
 
-static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf )
+static ssize_t _read_buf ( timeshift_file_t *tsf, int fd, void *buf, size_t size )
+{
+  if (tsf && tsf->ram) {
+    if (tsf->roff + size > tsf->woff) return -1;
+    pthread_mutex_lock(&tsf->ram_lock);
+    memcpy(buf, tsf->ram + tsf->roff, size);
+    tsf->roff += size;
+    pthread_mutex_unlock(&tsf->ram_lock);
+    return size;
+  } else {
+    size = read(tsf ? tsf->rfd : fd, buf, size);
+    if (size > 0 && tsf)
+      tsf->roff += size;
+    return size;
+  }
+}
+
+static ssize_t _read_pktbuf ( timeshift_file_t *tsf, int fd, pktbuf_t **pktbuf )
 {
   ssize_t r, cnt = 0;
   size_t sz;
 
   /* Size */
-  r = read(fd, &sz, sizeof(sz));
+  r = _read_buf(tsf, fd, &sz, sizeof(sz));
   if (r < 0) return -1;
   if (r != sizeof(sz)) return 0;
   cnt += r;
@@ -60,7 +77,7 @@ static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf )
 
   /* Data */
   *pktbuf = pktbuf_alloc(NULL, sz);
-  r = read(fd, (*pktbuf)->pb_data, sz);
+  r = _read_buf(tsf, fd, (*pktbuf)->pb_data, sz);
   if (r != sz) {
     free((*pktbuf)->pb_data);
     free(*pktbuf);
@@ -72,7 +89,7 @@ static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf )
 }
 
 
-static ssize_t _read_msg ( int fd, streaming_message_t **sm )
+static ssize_t _read_msg ( timeshift_file_t *tsf, int fd, streaming_message_t **sm )
 {
   ssize_t r, cnt = 0;
   size_t sz;
@@ -85,7 +102,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
   *sm = NULL;
 
   /* Size */
-  r = read(fd, &sz, sizeof(sz));
+  r = _read_buf(tsf, fd, &sz, sizeof(sz));
   if (r < 0) return -1;
   if (r != sizeof(sz)) return 0;
   cnt += r;
@@ -97,13 +114,13 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
   if (sz > 1024 * 1024) return -1;
 
   /* Type */
-  r = read(fd, &type, sizeof(type));
+  r = _read_buf(tsf, fd, &type, sizeof(type));
   if (r < 0) return -1;
   if (r != sizeof(type)) return 0;
   cnt += r;
 
   /* Time */
-  r = read(fd, &time, sizeof(time));
+  r = _read_buf(tsf, fd, &time, sizeof(time));
   if (r < 0) return -1;
   if (r != sizeof(time)) return 0;
   cnt += r;
@@ -127,7 +144,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
     case SMT_EXIT:
     case SMT_SPEED:
       if (sz != sizeof(code)) return -1;
-      r = read(fd, &code, sz);
+      r = _read_buf(tsf, fd, &code, sz);
       if (r != sz) {
         if (r < 0) return -1;
         return 0;
@@ -141,7 +158,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
     case SMT_MPEGTS:
     case SMT_PACKET:
       data = malloc(sz);
-      r = read(fd, data, sz);
+      r = _read_buf(tsf, fd, data, sz);
       if (r != sz) {
         free(data);
         if (r < 0) return -1;
@@ -152,13 +169,13 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
         pkt->pkt_payload  = pkt->pkt_meta = NULL;
         pkt->pkt_refcount = 0;
         *sm = streaming_msg_create_pkt(pkt);
-        r   = _read_pktbuf(fd, &pkt->pkt_meta);
+        r   = _read_pktbuf(tsf, fd, &pkt->pkt_meta);
         if (r < 0) {
           streaming_msg_free(*sm);
           return r;
         }
         cnt += r;
-        r   = _read_pktbuf(fd, &pkt->pkt_payload);
+        r   = _read_pktbuf(tsf, fd, &pkt->pkt_payload);
         if (r < 0) {
           streaming_msg_free(*sm);
           return r;
@@ -317,24 +334,31 @@ static int _timeshift_skip
  * Output packet
  */
 static int _timeshift_read
-  ( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd,
+  ( timeshift_t *ts, timeshift_file_t **cur_file,
     streaming_message_t **sm, int *wait )
 {
-  if (*cur_file) {
+  timeshift_file_t *tsf = *cur_file;
+  ssize_t r;
+  off_t off, ooff;
+
+  if (tsf) {
 
     /* Open file */
-    if (*fd < 0) {
-      tvhtrace("timeshift", "ts %d open file %s",
-               ts->id, (*cur_file)->path);
-      *fd = open((*cur_file)->path, O_RDONLY);
-      if (*fd < 0)
+    if (tsf->rfd < 0 && !tsf->ram) {
+      tsf->rfd = open(tsf->path, O_RDONLY);
+      tvhtrace("timeshift", "ts %d open file %s (fd %i)", ts->id, tsf->path, tsf->rfd);
+      if (tsf->rfd < 0)
         return -1;
     }
-    tvhtrace("timeshift", "ts %d seek to %jd", ts->id, (intmax_t)*cur_off);
-    lseek(*fd, *cur_off, SEEK_SET);
+    tvhtrace("timeshift", "ts %d seek to %jd (fd %i)", ts->id, tsf->roff, tsf->rfd);
+    if (tsf->rfd >= 0)
+      if ((off = lseek(tsf->rfd, tsf->roff, SEEK_SET)) != tsf->roff)
+        tvherror("timeshift", "seek to %s failed (off %"PRId64" != %"PRId64"): %s",
+                 tsf->path, (int64_t)tsf->roff, (int64_t)off, strerror(errno));
 
     /* Read msg */
-    ssize_t r = _read_msg(*fd, sm);
+    ooff = tsf->roff;
+    r = _read_msg(tsf, -1, sm);
     if (r < 0) {
       streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
       streaming_target_deliver2(ts->output, e);
@@ -349,21 +373,25 @@ static int _timeshift_read
 
     /* Incomplete */
     if (r == 0) {
-      lseek(*fd, *cur_off, SEEK_SET);
+      if (tsf->rfd >= 0) {
+        tvhtrace("timeshift", "ts %d seek to %jd (fd %i) (incomplete)", ts->id, tsf->roff, tsf->rfd);
+        if ((off = lseek(tsf->rfd, ooff, SEEK_SET)) != ooff)
+          tvherror("timeshift", "seek to %s failed (off %"PRId64" != %"PRId64"): %s",
+                   tsf->path, (int64_t)ooff, (int64_t)off, strerror(errno));
+      }
+      tsf->roff = ooff;
       return 0;
     }
 
-    /* Update */
-    *cur_off += r;
-
     /* Special case - EOF */
-    if (r == sizeof(size_t) || *cur_off > (*cur_file)->size) {
-      close(*fd);
-      *fd       = -1;
+    if (r == sizeof(size_t) || tsf->roff > tsf->size) {
+      if (tsf->rfd >= 0)
+        close(tsf->rfd);
+      tsf->rfd  = -1;
       pthread_mutex_lock(&ts->rdwr_mutex);
-      *cur_file = timeshift_filemgr_next(*cur_file, NULL, 0);
+      *cur_file = timeshift_filemgr_next(tsf, NULL, 0);
       pthread_mutex_unlock(&ts->rdwr_mutex);
-      *cur_off  = 0; // reset
+      tsf->roff = 0; // reset
       *wait     = 0;
 
     /* Check SMT_START index */
@@ -386,12 +414,12 @@ static int _timeshift_read
  * Flush all data to live
  */
 static int _timeshift_flush_to_live
-  ( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd,
+  ( timeshift_t *ts, timeshift_file_t **cur_file,
     streaming_message_t **sm, int *wait )
 {
   time_t pts = 0;
   while (*cur_file) {
-    if (_timeshift_read(ts, cur_file, cur_off, fd, sm, wait) == -1)
+    if (_timeshift_read(ts, cur_file, sm, wait) == -1)
       return -1;
     if (!*sm) break;
     if ((*sm)->sm_type == SMT_PACKET) {
@@ -409,15 +437,15 @@ static int _timeshift_flush_to_live
  * Thread
  * *************************************************************************/
 
+
 /*
  * Timeshift thread
  */
 void *timeshift_reader ( void *p )
 {
   timeshift_t *ts = p;
-  int nfds, end, fd = -1, run = 1, wait = -1;
+  int nfds, end, run = 1, wait = -1;
   timeshift_file_t *cur_file = NULL;
-  off_t cur_off = 0;
   int cur_speed = 100, keyframe_mode = 0;
   int64_t pause_time = 0, play_time = 0, last_time = 0;
   int64_t now, deliver, skip_time = 0;
@@ -454,7 +482,7 @@ void *timeshift_reader ( void *p )
     /* Control */
     pthread_mutex_lock(&ts->state_mutex);
     if (nfds == 1) {
-      if (_read_msg(ts->rd_pipe.rd, &ctrl) > 0) {
+      if (_read_msg(NULL, ts->rd_pipe.rd, &ctrl) > 0) {
 
         /* Exit */
         if (ctrl->sm_type == SMT_EXIT) {
@@ -494,10 +522,10 @@ void *timeshift_reader ( void *p )
                        ts->id);
                 timeshift_writer_flush(ts);
                 pthread_mutex_lock(&ts->rdwr_mutex);
-                if ((cur_file   = timeshift_filemgr_get(ts, 1))) {
-                  cur_off    = cur_file->size;
-                  pause_time = cur_file->last;
-                  last_time  = pause_time;
+                if ((cur_file    = timeshift_filemgr_get(ts, 1))) {
+                  cur_file->roff = cur_file->size;
+                  pause_time     = cur_file->last;
+                  last_time      = pause_time;
                 }
                 pthread_mutex_unlock(&ts->rdwr_mutex);
               }
@@ -576,9 +604,9 @@ void *timeshift_reader ( void *p )
               /* Live playback (stage1) */
               if (ts->state == TS_LIVE) {
                 pthread_mutex_lock(&ts->rdwr_mutex);
-                if ((cur_file   = timeshift_filemgr_get(ts, !ts->ondemand))) {
-                  cur_off    = cur_file->size;
-                  last_time  = cur_file->last;
+                if ((cur_file    = timeshift_filemgr_get(ts, !ts->ondemand))) {
+                  cur_file->roff = cur_file->size;
+                  last_time      = cur_file->last;
                 } else {
                   tvhlog(LOG_ERR, "timeshift", "ts %d failed to get current file", ts->id);
                   skip = NULL;
@@ -695,23 +723,24 @@ void *timeshift_reader ( void *p )
           tvhlog(LOG_DEBUG, "timeshift", "ts %d skip found pkt @ %"PRId64, ts->id, tsi->time);
 
         /* File changed (close) */
-        if ((tsf != cur_file) && (fd != -1)) {
-          close(fd);
-          fd = -1;
+        if ((tsf != cur_file) && cur_file && cur_file->rfd >= 0) {
+          close(cur_file->rfd);
+          cur_file->rfd = -1;
         }
 
         /* Position */
         if (cur_file)
           cur_file->refcount--;
-        cur_file = tsf;
-        if (tsi)
-          cur_off = tsi->pos;
-        else
-          cur_off = 0;
+        if ((cur_file = tsf) != NULL) {
+          if (tsi)
+            cur_file->roff = tsi->pos;
+          else
+            cur_file->roff = 0;
+        }
       }
 
       /* Find packet */
-      if (_timeshift_read(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) {
+      if (_timeshift_read(ts, &cur_file, &sm, &wait) == -1) {
         pthread_mutex_unlock(&ts->state_mutex);
         break;
       }
@@ -782,13 +811,13 @@ void *timeshift_reader ( void *p )
         streaming_target_deliver2(ts->output, ctrl);
 
         /* Flush timeshift buffer to live */
-        if (_timeshift_flush_to_live(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1)
+        if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1)
           break;
 
         /* Close file (if open) */
-        if (fd != -1) {
-          close(fd);
-          fd = -1;
+        if (cur_file && cur_file->rfd >= 0) {
+          close(cur_file->rfd);
+          cur_file->rfd = -1;
         }
 
         /* Flush ALL files */
@@ -823,7 +852,10 @@ void *timeshift_reader ( void *p )
 
   /* Cleanup */
   tvhpoll_destroy(pd);
-  if (fd != -1) close(fd);
+  if (cur_file && cur_file->rfd >= 0) {
+    close(cur_file->rfd);
+    cur_file->rfd = -1;
+  }
   if (sm)       streaming_msg_free(sm);
   if (ctrl)     streaming_msg_free(ctrl);
   tvhtrace("timeshift", "ts %d exit reader thread", ts->id);
index 40a0a6970ab5ecc8979044407954959368c7976a..2714344e8b401ccbe56cb2a2be6fc8bba0bf8de8 100644 (file)
@@ -36,7 +36,7 @@
 /*
  * Write data (retry on EAGAIN)
  */
-static ssize_t _write
+static ssize_t _write_fd
   ( int fd, const void *buf, size_t count )
 {
   ssize_t r;
@@ -54,25 +54,76 @@ static ssize_t _write
   return count == n ? n : -1;
 }
 
+static ssize_t _write
+  ( timeshift_file_t *tsf, const void *buf, size_t count )
+{
+  uint8_t *ram;
+  size_t alloc;
+  if (tsf->ram) {
+    pthread_mutex_lock(&tsf->ram_lock);
+    if (tsf->ram_size < tsf->woff + count) {
+      if (tsf->ram_size >= timeshift_ram_segment_size)
+        alloc = MAX(count, 64*1024);
+      else
+        alloc = MAX(count, 4*1024*1024);
+      ram = realloc(tsf->ram, tsf->ram_size + alloc);
+      if (ram == NULL) {
+        tvhwarn("timeshift", "RAM timeshift memalloc failed");
+        pthread_mutex_unlock(&tsf->ram_lock);
+        return -1;
+      }
+      tsf->ram = ram;
+      tsf->ram_size += alloc;
+    }
+    memcpy(tsf->ram + tsf->woff, buf, count);
+    tsf->woff += count;
+    pthread_mutex_unlock(&tsf->ram_lock);
+    return count;
+  }
+  return _write_fd(tsf->wfd, buf, count);
+}
+
 /*
  * Write message
  */
 static ssize_t _write_msg
+  ( timeshift_file_t *tsf, streaming_message_type_t type, int64_t time,
+    const void *buf, size_t len )
+{
+  size_t len2 = len + sizeof(type) + sizeof(time);
+  ssize_t err, ret;
+  ret = err = _write(tsf, &len2, sizeof(len2));
+  if (err < 0) return err;
+  err = _write(tsf, &type, sizeof(type));
+  if (err < 0) return err;
+  ret += err;
+  err = _write(tsf, &time, sizeof(time));
+  if (err < 0) return err;
+  ret += err;
+  if (len) {
+    err = _write(tsf, buf, len);
+    if (err < 0) return err;
+    ret += err;
+  }
+  return ret;
+}
+
+static ssize_t _write_msg_fd
   ( int fd, streaming_message_type_t type, int64_t time,
     const void *buf, size_t len )
 {
   size_t len2 = len + sizeof(type) + sizeof(time);
   ssize_t err, ret;
-  ret = err = _write(fd, &len2, sizeof(len2));
+  ret = err = _write_fd(fd, &len2, sizeof(len2));
   if (err < 0) return err;
-  err = _write(fd, &type, sizeof(type));
+  err = _write_fd(fd, &type, sizeof(type));
   if (err < 0) return err;
   ret += err;
-  err = _write(fd, &time, sizeof(time));
+  err = _write_fd(fd, &time, sizeof(time));
   if (err < 0) return err;
   ret += err;
   if (len) {
-    err = _write(fd, buf, len);
+    err = _write_fd(fd, buf, len);
     if (err < 0) return err;
     ret += err;
   }
@@ -82,18 +133,18 @@ static ssize_t _write_msg
 /*
  * Write packet buffer
  */
-static int _write_pktbuf ( int fd, pktbuf_t *pktbuf )
+static int _write_pktbuf ( timeshift_file_t *tsf, pktbuf_t *pktbuf )
 {
   ssize_t ret, err;
   if (pktbuf) {
-    ret = err = _write(fd, &pktbuf->pb_size, sizeof(pktbuf->pb_size));
+    ret = err = _write(tsf, &pktbuf->pb_size, sizeof(pktbuf->pb_size));
     if (err < 0) return err;
-    err = _write(fd, pktbuf->pb_data, pktbuf->pb_size);
+    err = _write(tsf, pktbuf->pb_data, pktbuf->pb_size);
     if (err < 0) return err;
     ret += err;
   } else {
     size_t sz = 0;
-    ret = _write(fd, &sz, sizeof(sz));
+    ret = _write(tsf, &sz, sizeof(sz));
   }
   return ret;
 }
@@ -102,24 +153,24 @@ static int _write_pktbuf ( int fd, pktbuf_t *pktbuf )
  * Write signal status
  */
 ssize_t timeshift_write_sigstat
-  ( int fd, int64_t time, signal_status_t *sigstat )
+  ( timeshift_file_t *tsf, int64_t time, signal_status_t *sigstat )
 {
-  return _write_msg(fd, SMT_SIGNAL_STATUS, time, sigstat,
+  return _write_msg(tsf, SMT_SIGNAL_STATUS, time, sigstat,
                     sizeof(signal_status_t));
 }
 
 /*
  * Write packet
  */
-ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt )
+ssize_t timeshift_write_packet ( timeshift_file_t *tsf, int64_t time, th_pkt_t *pkt )
 {
   ssize_t ret = 0, err;
-  ret = err = _write_msg(fd, SMT_PACKET, time, pkt, sizeof(th_pkt_t));
+  ret = err = _write_msg(tsf, SMT_PACKET, time, pkt, sizeof(th_pkt_t));
   if (err <= 0) return err;
-  err = _write_pktbuf(fd, pkt->pkt_meta);
+  err = _write_pktbuf(tsf, pkt->pkt_meta);
   if (err <= 0) return err;
   ret += err;
-  err = _write_pktbuf(fd, pkt->pkt_payload);
+  err = _write_pktbuf(tsf, pkt->pkt_payload);
   if (err <= 0) return err;
   ret += err;
   return ret;
@@ -128,9 +179,9 @@ ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt )
 /*
  * Write MPEGTS data
  */
-ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data )
+ssize_t timeshift_write_mpegts ( timeshift_file_t *tsf, int64_t time, void *data )
 {
-  return _write_msg(fd, SMT_MPEGTS, time, data, 188);
+  return _write_msg(tsf, SMT_MPEGTS, time, data, 188);
 }
 
 /*
@@ -138,7 +189,7 @@ ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data )
  */
 ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip )
 {
-  return _write_msg(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t));
+  return _write_msg_fd(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t));
 }
 
 /*
@@ -146,7 +197,7 @@ ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip )
  */
 ssize_t timeshift_write_speed ( int fd, int speed )
 {
-  return _write_msg(fd, SMT_SPEED, 0, &speed, sizeof(speed));
+  return _write_msg_fd(fd, SMT_SPEED, 0, &speed, sizeof(speed));
 }
 
 /*
@@ -154,7 +205,7 @@ ssize_t timeshift_write_speed ( int fd, int speed )
  */
 ssize_t timeshift_write_stop ( int fd, int code )
 {
-  return _write_msg(fd, SMT_STOP, 0, &code, sizeof(code));
+  return _write_msg_fd(fd, SMT_STOP, 0, &code, sizeof(code));
 }
 
 /*
@@ -163,16 +214,16 @@ ssize_t timeshift_write_stop ( int fd, int code )
 ssize_t timeshift_write_exit ( int fd )
 {
   int code = 0;
-  return _write_msg(fd, SMT_EXIT, 0, &code, sizeof(code));
+  return _write_msg_fd(fd, SMT_EXIT, 0, &code, sizeof(code));
 }
 
 /*
  * Write end of file (special internal message)
  */
-ssize_t timeshift_write_eof ( int fd )
+ssize_t timeshift_write_eof ( timeshift_file_t *tsf )
 {
   size_t sz = 0;
-  return _write(fd, &sz, sizeof(sz));
+  return _write(tsf, &sz, sizeof(sz));
 }
 
 /* **************************************************************************
@@ -200,9 +251,9 @@ static inline ssize_t _process_msg0
       if (SCT_ISVIDEO(ss->ss_components[i].ssc_type))
         ts->vididx = ss->ss_components[i].ssc_index;
   } else if (sm->sm_type == SMT_SIGNAL_STATUS)
-    err = timeshift_write_sigstat(tsf->fd, sm->sm_time, sm->sm_data);
+    err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data);
   else if (sm->sm_type == SMT_PACKET) {
-    err = timeshift_write_packet(tsf->fd, sm->sm_time, sm->sm_data);
+    err = timeshift_write_packet(tsf, sm->sm_time, sm->sm_data);
     if (err > 0) {
       th_pkt_t *pkt = sm->sm_data;
 
@@ -216,7 +267,7 @@ static inline ssize_t _process_msg0
       }
     }
   } else if (sm->sm_type == SMT_MPEGTS)
-    err = timeshift_write_mpegts(tsf->fd, sm->sm_time, sm->sm_data);
+    err = timeshift_write_mpegts(tsf, sm->sm_time, sm->sm_data);
   else
     err = 0;
 
@@ -225,6 +276,8 @@ static inline ssize_t _process_msg0
     tsf->last  = sm->sm_time;
     tsf->size += err;
     atomic_add_u64(&timeshift_total_size, err);
+    if (tsf->ram)
+      atomic_add_u64(&timeshift_total_ram_size, err);
   }
   return err;
 }
@@ -265,7 +318,7 @@ static void _process_msg
     case SMT_MPEGTS:
     case SMT_PACKET:
       pthread_mutex_lock(&ts->rdwr_mutex);
-      if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->fd != -1)) {
+      if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->wfd >= 0 || tsf->ram)) {
         if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
           timeshift_filemgr_close(tsf);
           tsf->bad = 1;
index 248f6c1aa588021c36be801eb85f7eb276a1d1a6..0053eaaeeb215fac0adeeb50d00a8bf49669a2c1 100755 (executable)
@@ -677,6 +677,7 @@ extjs_timeshift(http_connection_t *hc, const char *remain, void *opaque)
     htsmsg_add_u32(m, "timeshift_max_period", timeshift_max_period / 60);
     htsmsg_add_u32(m, "timeshift_unlimited_size", timeshift_unlimited_size);
     htsmsg_add_u32(m, "timeshift_max_size", timeshift_max_size / 1048576);
+    htsmsg_add_u32(m, "timeshift_ram_size", timeshift_ram_size / 1048576);
     pthread_mutex_unlock(&global_lock);
     out = json_single_record(m, "config");
 
@@ -696,6 +697,10 @@ extjs_timeshift(http_connection_t *hc, const char *remain, void *opaque)
     timeshift_unlimited_size = http_arg_get(&hc->hc_req_args, "timeshift_unlimited_size") ? 1 : 0;
     if ((str = http_arg_get(&hc->hc_req_args, "timeshift_max_size")))
       timeshift_max_size   = atol(str) * 1048576LL;
+    if ((str = http_arg_get(&hc->hc_req_args, "timeshift_ram_size"))) {
+      timeshift_ram_size         = atol(str) * 1048576LL;
+      timeshift_ram_segment_size = timeshift_ram_size / 10;
+    }
     timeshift_save();
     pthread_mutex_unlock(&global_lock);
 
index 017512ce3e48a471ed1aaca4b25ceb13f1db51d8..ee7ada92ea551441e96599f03ee0f1c608cd9bd1 100644 (file)
@@ -12,7 +12,8 @@ tvheadend.timeshift = function(panel, index) {
         'timeshift_enabled', 'timeshift_ondemand',
         'timeshift_path',
         'timeshift_unlimited_period', 'timeshift_max_period',
-        'timeshift_unlimited_size', 'timeshift_max_size'
+        'timeshift_unlimited_size', 'timeshift_max_size',
+        'timeshift_ram_size'
     ]
             );
 
@@ -59,6 +60,13 @@ tvheadend.timeshift = function(panel, index) {
         width: 300
     });
 
+    var timeshiftRamSize = new Ext.form.NumberField({
+        fieldLabel: 'Max. RAM Size (MB)',
+        name: 'timeshift_ram_size',
+        allowBlank: false,
+        width: 300
+    });
+
     var timeshiftUnlSize = new Ext.form.Checkbox({
         fieldLabel: 'Unlimited size',
         name: 'timeshift_unlimited_size',
@@ -100,7 +108,7 @@ tvheadend.timeshift = function(panel, index) {
         width: 500,
         autoHeight: true,
         border: false,
-           items : [timeshiftMaxPeriod, timeshiftMaxSize]
+           items : [timeshiftMaxPeriod, timeshiftMaxSize, timeshiftRamSize]
     });
 
     var timeshiftPanelB = new Ext.form.FieldSet({