]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
timeshift: early prototype of the new timeshift feature.
authorAdam Sutton <dev@adamsutton.me.uk>
Fri, 12 Oct 2012 16:24:27 +0000 (17:24 +0100)
committerAdam Sutton <dev@adamsutton.me.uk>
Wed, 9 Jan 2013 21:26:51 +0000 (21:26 +0000)
Currently this supports pause/resume, and speed control. FF up to 4x uses
full frame output, faster than that or reverse uses i-frame only output.
This causes problems with some players and needs work.

Also buffers are done at the subscription level which means the disk space
is not shared even if it holds the same content. And more importantly
this means you cannot yet record the timeshift buffer like on a standard
PVR.

22 files changed:
Makefile
configure
src/config2.c
src/config2.h
src/dvr/dvr_rec.c
src/htsp_server.c
src/main.c
src/packet.h
src/plumbing/globalheaders.c
src/plumbing/tsfix.c
src/streaming.c
src/subscriptions.c
src/subscriptions.h
src/timeshift.c [new file with mode: 0644]
src/timeshift.h [new file with mode: 0644]
src/timeshift/private.h [new file with mode: 0644]
src/timeshift/timeshift_filemgr.c [new file with mode: 0644]
src/timeshift/timeshift_reader.c [new file with mode: 0644]
src/timeshift/timeshift_writer.c [new file with mode: 0644]
src/tvheadend.h
src/webui/static/app/config.js
src/webui/webui.c

index 445586bc35aa133f4bf5dd4f69156eab9107efd0..c42e63d5a56b94828eaa9f1d6d5a01b608229004 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -122,7 +122,7 @@ SRCS-$(CONFIG_LINUXDVB) += src/epggrab/otamux.c\
   src/epggrab/support/freesat_huffman.c \
 
 SRCS += src/plumbing/tsfix.c \
-       src/plumbing/globalheaders.c \
+       src/plumbing/globalheaders.c
 
 SRCS += src/dvr/dvr_db.c \
        src/dvr/dvr_rec.c \
@@ -145,6 +145,13 @@ SRCS += src/muxer.c \
 # Optional code
 #
 
+# Timeshift
+SRCS-${CONFIG_TIMESHIFT} += \
+  src/timeshift.c \
+  src/timeshift/timeshift_filemgr.c \
+  src/timeshift/timeshift_writer.c \
+  src/timeshift/timeshift_reader.c \
+
 # DVB
 SRCS-${CONFIG_LINUXDVB} += \
        src/dvb/dvb.c \
index 5a5ce0ee639d934f2d19174cc7e6e53274ebb54c..6aa50abfe8ec6ff8ecc9f5f5baf2858219bf3bbe 100755 (executable)
--- a/configure
+++ b/configure
@@ -24,6 +24,7 @@ OPTIONS=(
   "avahi:auto"
   "zlib:auto"
   "libav:auto"
+  "timeshift:no"
   "bundle:no"
   "dvbcsa:no"
 )
index 5bcbd0d1a02a00334fb913fe0f310147564708b3..05bb7a3407403cfc22da8d069a8ef11c81e86d87 100644 (file)
@@ -26,11 +26,24 @@ static htsmsg_t *config;
 
 void config_init ( void )
 {
+  int save = 0;
+  uint32_t u32;
+
   config = hts_settings_load("config");
   if (!config) {
     tvhlog(LOG_DEBUG, "config", "no configuration, loading defaults");
     config = htsmsg_create_map();
   }
+
+  /* Defaults */
+  if (htsmsg_get_u32(config, "timeshiftperiod", &u32))
+    save |= config_set_timeshift_period(0);
+  if (htsmsg_get_u32(config, "timeshiftsize", &u32))
+    save |= config_set_timeshift_size(0);
+
+  /* Save defaults */
+  if (save)
+    config_save();
 }
 
 void config_save ( void )
@@ -43,22 +56,39 @@ htsmsg_t *config_get_all ( void )
   return htsmsg_copy(config);
 }
 
-const char *config_get_language ( void )
+static int _config_set_str ( const char *fld, const char *val )
 {
-  return htsmsg_get_str(config, "language");
+  const char *c = htsmsg_get_str(config, fld);
+  if (!c || strcmp(c, val)) {
+    if (c) htsmsg_delete_field(config, fld);
+    htsmsg_add_str(config, fld, val);
+    return 1;
+  }
+  return 0;
 }
 
-int config_set_language ( const char *lang )
+static int _config_set_u32 ( const char *fld, uint32_t val )
 {
-  const char *c = config_get_language();
-  if (!c || strcmp(c, lang)) {
-    if (c) htsmsg_delete_field(config, "language");
-    htsmsg_add_str(config, "language", lang);
+  uint32_t u32;
+  int ret = htsmsg_get_u32(config, fld, &u32);
+  if (ret || (u32 != val)) {
+    if (!ret) htsmsg_delete_field(config, fld);
+    htsmsg_add_u32(config, fld, val);
     return 1;
   }
   return 0;
 }
 
+const char *config_get_language ( void )
+{
+  return htsmsg_get_str(config, "language");
+}
+
+int config_set_language ( const char *lang )
+{
+  return _config_set_str("language", lang);
+}
+
 const char *config_get_muxconfpath ( void )
 {
   return htsmsg_get_str(config, "muxconfpath");
@@ -66,11 +96,35 @@ const char *config_get_muxconfpath ( void )
 
 int config_set_muxconfpath ( const char *path )
 {
-  const char *c = config_get_muxconfpath();
-  if (!c || strcmp(c, path)) {
-    if (c) htsmsg_delete_field(config, "muxconfpath");
-    htsmsg_add_str(config, "muxconfpath", path);
-    return 1;
-  }
-  return 0;
+  return _config_set_str("muxconfpath", path);
+}
+
+const char *config_get_timeshift_path ( void )
+{
+  return htsmsg_get_str(config, "timeshiftpath");
+}
+
+int config_set_timeshift_path ( const char *path )
+{
+  return _config_set_str("timeshiftpath", path);
+}
+
+uint32_t config_get_timeshift_period ( void )
+{
+  return htsmsg_get_u32_or_default(config, "timeshiftperiod", 0);
+}
+
+int config_set_timeshift_period ( uint32_t period )
+{
+  return _config_set_u32("timeshiftperiod", period);
+}
+
+uint32_t config_get_timeshift_size ( void )
+{
+  return htsmsg_get_u32_or_default(config, "timeshiftsize", 0);
+}
+
+int config_set_timeshift_size ( uint32_t size )
+{
+  return _config_set_u32("timeshiftsize", size);
 }
index cd68e30621de19e679970fe49ed61f49708c6e60..7ba28e0a30d9c5dcee0b13c0604dec90aec5c13b 100644 (file)
@@ -36,4 +36,16 @@ const char *config_get_language    ( void );
 int         config_set_language    ( const char *str )
   __attribute__((warn_unused_result));
 
+const char *config_get_timeshift_path ( void );
+int         config_set_timeshift_path ( const char *str )
+  __attribute__((warn_unused_result));
+
+uint32_t    config_get_timeshift_period ( void );
+int         config_set_timeshift_period ( uint32_t val )
+  __attribute__((warn_unused_result));
+
+uint32_t    config_get_timeshift_size ( void );
+int         config_set_timeshift_size ( uint32_t val )
+  __attribute__((warn_unused_result));
+
 #endif /* __TVH_CONFIG__H__ */
index 59daec4359633de6939467408b636b3d9cef65c7..0c98f533fa1a2b0c71bc8dc47418d6ed672f62e8 100755 (executable)
@@ -538,6 +538,8 @@ dvr_thread(void *aux)
       }
       break;
 
+    case SMT_SPEED:
+    case SMT_SKIP:
     case SMT_SIGNAL_STATUS:
       break;
 
index 9ce8c7ffda4dc5b8e5a935b463ad573c1325a123..0c1db9b1fce1505f81b45090c2f7d52189ad5fa6 100644 (file)
@@ -43,6 +43,9 @@
 #include "epg.h"
 #include "plumbing/tsfix.h"
 #include "imagecache.h"
+#if ENABLE_TIMESHIFT
+#include "timeshift.h"
+#endif
 
 #include <sys/statvfs.h>
 #include "settings.h"
@@ -170,6 +173,10 @@ typedef struct htsp_subscription {
   streaming_target_t hs_input;
   streaming_target_t *hs_tsfix;
 
+#if ENABLE_TIMESHIFT
+  streaming_target_t *hs_tshift;
+#endif
+
   htsp_msg_q_t hs_q;
 
   time_t hs_last_report; /* Last queue status report sent */
@@ -274,6 +281,9 @@ htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs)
   if(hs->hs_tsfix != NULL)
     tsfix_destroy(hs->hs_tsfix);
   htsp_flush_queue(htsp, &hs->hs_q);
+#if ENABLE_TIMESHIFT
+  if(hs->hs_tshift) timeshift_destroy(hs->hs_tshift);
+#endif
   free(hs);
 }
 
@@ -1228,6 +1238,9 @@ static htsmsg_t *
 htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
 {
   uint32_t chid, sid, weight, req90khz, normts;
+#if ENABLE_TIMESHIFT
+  uint32_t timeshiftPeriod;
+#endif
   channel_t *ch;
   htsp_subscription_t *hs;
   const char *str;
@@ -1249,6 +1262,11 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
   req90khz = htsmsg_get_u32_or_default(in, "90khz", 0);
   normts = htsmsg_get_u32_or_default(in, "normts", 0);
 
+#if ENABLE_TIMESHIFT
+  timeshiftPeriod = htsmsg_get_u32_or_default(in, "timeshiftPeriod", 0);
+  timeshiftPeriod = MIN(timeshiftPeriod, config_get_timeshift_period());
+#endif
+
   /*
    * We send the reply now to avoid the user getting the 'subscriptionStart'
    * async message before the reply to 'subscribe'.
@@ -1279,14 +1297,19 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
   LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
   streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0);
 
-  streaming_target_t *st;
+  streaming_target_t *st = &hs->hs_input;
 
-  if(normts) {
-    hs->hs_tsfix = tsfix_create(&hs->hs_input);
-    st = hs->hs_tsfix;
-  } else {
-    st = &hs->hs_input;
+  if(normts)
+    st = hs->hs_tsfix = tsfix_create(st);
+#if ENABLE_TIMESHIFT
+  if (timeshiftPeriod != 0) {
+    if (timeshiftPeriod == ~0)
+      tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (unlimited)");
+    else
+      tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (%u mins)", timeshiftPeriod / 60);
+    st = hs->hs_tshift = timeshift_create(st, timeshiftPeriod);
   }
+#endif
 
   hs->hs_s = subscription_create_from_channel(ch, weight,
                                              htsp->htsp_logname,
@@ -1353,6 +1376,72 @@ htsp_method_change_weight(htsp_connection_t *htsp, htsmsg_t *in)
   return NULL;
 }
 
+/**
+ * Skip stream
+ */
+static htsmsg_t *
+htsp_method_skip(htsp_connection_t *htsp, htsmsg_t *in)
+{
+  htsp_subscription_t *hs;
+  uint32_t sid, abs;
+  int64_t s64;
+  streaming_skip_t skip;
+
+  if(htsmsg_get_u32(in, "subscriptionId", &sid))
+    return htsp_error("Missing argument 'subscriptionId'");
+
+  abs = htsmsg_get_u32_or_default(in, "absolute", 0);
+
+  if(!htsmsg_get_s64(in, "time", &s64)) {
+    skip.type = abs ? SMT_SKIP_ABS_TIME : SMT_SKIP_REL_TIME;
+    skip.time = s64;
+  } else if (!htsmsg_get_s64(in, "size", &s64)) {
+    skip.type = abs ? SMT_SKIP_ABS_SIZE : SMT_SKIP_REL_SIZE;
+    skip.size = s64;
+  } else {
+    return htsp_error("Missing argument 'time' or 'size'");
+  }
+
+  LIST_FOREACH(hs, &htsp->htsp_subscriptions, hs_link)
+    if(hs->hs_sid == sid)
+      break;
+
+  if(hs == NULL)
+    return htsp_error("Requested subscription does not exist");
+
+  subscription_set_skip(hs->hs_s, &skip);
+
+  htsp_reply(htsp, in, htsmsg_create_map());
+  return NULL;
+}
+
+/*
+ * Set stream speed
+ */
+static htsmsg_t *
+htsp_method_speed(htsp_connection_t *htsp, htsmsg_t *in)
+{
+  htsp_subscription_t *hs;
+  uint32_t sid;
+  int32_t speed;
+
+  if(htsmsg_get_u32(in, "subscriptionId", &sid))
+    return htsp_error("Missing argument 'subscriptionId'");
+  if(htsmsg_get_s32(in, "speed", &speed))
+    return htsp_error("Missing argument 'speed'");
+
+  LIST_FOREACH(hs, &htsp->htsp_subscriptions, hs_link)
+    if(hs->hs_sid == sid)
+      break;
+
+  if(hs == NULL)
+    return htsp_error("Requested subscription does not exist");
+
+  subscription_set_speed(hs->hs_s, speed);
+
+  htsp_reply(htsp, in, htsmsg_create_map());
+  return NULL;
+}
 
 /**
  * Open file
@@ -1529,6 +1618,8 @@ struct {
   { "subscribe",                htsp_method_subscribe,      ACCESS_STREAMING},
   { "unsubscribe",              htsp_method_unsubscribe,    ACCESS_STREAMING},
   { "subscriptionChangeWeight", htsp_method_change_weight,  ACCESS_STREAMING},
+  { "subscriptionSkip",         htsp_method_skip,           ACCESS_STREAMING},
+  { "subscriptionSpeed",        htsp_method_speed,          ACCESS_STREAMING},
   { "fileOpen",                 htsp_method_file_open,      ACCESS_RECORDER},
   { "fileRead",                 htsp_method_file_read,      ACCESS_RECORDER},
   { "fileClose",                htsp_method_file_close,     ACCESS_RECORDER},
@@ -2065,7 +2156,11 @@ const static char frametypearray[PKT_NTYPES] = {
  * Build a htsmsg from a th_pkt and enqueue it on our HTSP service
  */
 static void
+#if ENABLE_TIMESHIFT
+htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt, uint64_t timeshift)
+#else
 htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
+#endif
 {
   htsmsg_t *m;
   htsp_msg_t *hm;
@@ -2093,6 +2188,11 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
   htsmsg_add_u32(m, "stream", pkt->pkt_componentindex);
   htsmsg_add_u32(m, "com", pkt->pkt_commercial);
 
+#if ENABLE_TIMESHIFT
+  if (timeshift)
+    htsmsg_add_s64(m, "timeshift", timeshift);
+#endif
+
 
   if(pkt->pkt_pts != PTS_UNSET) {
     int64_t pts = hs->hs_90khz ? pkt->pkt_pts : ts_rescale(pkt->pkt_pts, 1000000);
@@ -2305,6 +2405,19 @@ htsp_subscription_signal_status(htsp_subscription_t *hs, signal_status_t *sig)
   htsp_send_message(hs->hs_htsp, m, &hs->hs_htsp->htsp_hmq_qstatus);
 }
 
+/**
+ *
+ */
+static void
+htsp_subscription_speed(htsp_subscription_t *hs, int speed)
+{
+  htsmsg_t *m = htsmsg_create_map();
+  htsmsg_add_str(m, "method", "subscriptionSpeed");
+  htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
+  htsmsg_add_u32(m, "speed", speed);
+  htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
+}
+
 /**
  *
  */
@@ -2315,7 +2428,12 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
 
   switch(sm->sm_type) {
   case SMT_PACKET:
-    htsp_stream_deliver(hs, sm->sm_data); // reference is transfered
+#if ENABLE_TIMESHIFT
+    htsp_stream_deliver(hs, sm->sm_data, sm->sm_timeshift);
+#else
+    htsp_stream_deliver(hs, sm->sm_data);
+#endif
+    // reference is transfered
     sm->sm_data = NULL;
     break;
 
@@ -2344,6 +2462,13 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
 
   case SMT_EXIT:
     abort();
+
+  case SMT_SKIP:
+    break;
+
+  case SMT_SPEED:
+    htsp_subscription_speed(hs, sm->sm_code);
+    break;
   }
   streaming_msg_free(sm);
 }
index 710e736c60b788abfa3499b95f3c6bd587175ae3..6cff04cb38f97ce5828b1b4fc1acd77bb1d923fb 100644 (file)
@@ -61,6 +61,7 @@
 #include "muxes.h"
 #include "config2.h"
 #include "imagecache.h"
+#include "timeshift.h"
 #if ENABLE_LIBAV
 #include "libav.h"
 #endif
@@ -598,6 +599,10 @@ main(int argc, char **argv)
   v4l_init();
 #endif
 
+#if ENABLE_TIMESHIFT
+  timeshift_init();
+#endif
+
   tcp_server_init();
   http_server_init();
   webui_init();
@@ -659,6 +664,10 @@ main(int argc, char **argv)
 
   epg_save();
 
+#if ENABLE_TIMESHIFT
+  timeshift_term();
+#endif
+
   tvhlog(LOG_NOTICE, "STOP", "Exiting HTS Tvheadend");
 
   if(opt_fork)
index fedc1402c0ad27e8e535771272cc1e7673a7c850..2aecce7b8e3fbe78fbac3ecbb21b07d094a0afd1 100644 (file)
@@ -36,6 +36,14 @@ typedef struct pktbuf {
 #define PKT_B_FRAME 3
 #define PKT_NTYPES  4 
 
+static inline char pkt_frametype_to_char ( int frametype )
+{
+  if (frametype == PKT_I_FRAME) return 'I';
+  if (frametype == PKT_P_FRAME) return 'P';
+  if (frametype == PKT_B_FRAME) return 'B';
+  return ' ';
+}
+
 typedef struct th_pkt {
   int64_t pkt_dts;
   int64_t pkt_pts;
index d776f34c858539f82fa5a86d38d0ba9e17e7bded..509da87b15c22b36a66136041fe99daaeec13586 100644 (file)
@@ -255,6 +255,8 @@ gh_hold(globalheaders_t *gh, streaming_message_t *sm)
   case SMT_SIGNAL_STATUS:
   case SMT_NOSTART:
   case SMT_MPEGTS:
+  case SMT_SPEED:
+  case SMT_SKIP:
     streaming_target_deliver2(gh->gh_output, sm);
     break;
   }
@@ -283,6 +285,8 @@ gh_pass(globalheaders_t *gh, streaming_message_t *sm)
   case SMT_SIGNAL_STATUS:
   case SMT_NOSTART:
   case SMT_MPEGTS:
+  case SMT_SKIP:
+  case SMT_SPEED:
     streaming_target_deliver2(gh->gh_output, sm);
     break;
 
index a811e67c4d5c33edb7cd67907f9e11fbc5d58593..c54b76fc6d94cd8a9cff6571aaa2339b90a15f5c 100644 (file)
@@ -369,6 +369,8 @@ tsfix_input(void *opaque, streaming_message_t *sm)
   case SMT_SIGNAL_STATUS:
   case SMT_NOSTART:
   case SMT_MPEGTS:
+  case SMT_SPEED:
+  case SMT_SKIP:
     break;
   }
 
index bf1614f8827485803ec7881222973743bfc9e1a2..3173d3b0cd48f17dc810e7db16cd6d88736a430d 100755 (executable)
@@ -137,6 +137,10 @@ streaming_msg_create(streaming_message_type_t type)
 {
   streaming_message_t *sm = malloc(sizeof(streaming_message_t));
   sm->sm_type = type;
+#if ENABLE_TIMESHIFT
+  sm->sm_time      = 0;
+  sm->sm_timeshift = 0;
+#endif
   return sm;
 }
 
@@ -188,7 +192,11 @@ streaming_msg_clone(streaming_message_t *src)
   streaming_message_t *dst = malloc(sizeof(streaming_message_t));
   streaming_start_t *ss;
 
-  dst->sm_type = src->sm_type;
+  dst->sm_type      = src->sm_type;
+#if ENABLE_TIMESHIFT
+  dst->sm_time      = src->sm_time;
+  dst->sm_timeshift = src->sm_timeshift;
+#endif
 
   switch(src->sm_type) {
 
@@ -202,11 +210,17 @@ streaming_msg_clone(streaming_message_t *src)
     atomic_add(&ss->ss_refcount, 1);
     break;
 
+  case SMT_SKIP:
+    dst->sm_data = malloc(sizeof(streaming_skip_t));
+    memcpy(dst->sm_data, src->sm_data, sizeof(streaming_skip_t));
+    break;
+
   case SMT_SIGNAL_STATUS:
     dst->sm_data = malloc(sizeof(signal_status_t));
     memcpy(dst->sm_data, src->sm_data, sizeof(signal_status_t));
     break;
 
+  case SMT_SPEED:
   case SMT_STOP:
   case SMT_SERVICE_STATUS:
   case SMT_NOSTART:
@@ -264,17 +278,13 @@ streaming_msg_free(streaming_message_t *sm)
     break;
 
   case SMT_STOP:
-    break;
-
   case SMT_EXIT:
-    break;
-
   case SMT_SERVICE_STATUS:
-    break;
-
   case SMT_NOSTART:
+  case SMT_SPEED:
     break;
 
+  case SMT_SKIP:
   case SMT_SIGNAL_STATUS:
     free(sm->sm_data);
     break;
index 5dd5837900a8f69cc100306f074a0fa255069079..8dff4acf687aaadfdb298dc707cd4f597c0482b5 100644 (file)
@@ -545,8 +545,6 @@ subscription_dummy_join(const char *id, int first)
         "Dummy join %s ok", id);
 }
 
-
-
 /**
  *
  */
@@ -635,3 +633,41 @@ subscription_init(void)
 {
   gtimer_arm(&every_sec, every_sec_cb, NULL, 1);
 }
+
+/**
+ * Set speed
+ */
+void
+subscription_set_speed ( th_subscription_t *s, int speed )
+{
+  streaming_message_t *sm;
+  service_t *t = s->ths_service;
+
+  pthread_mutex_lock(&t->s_stream_mutex);
+
+  sm = streaming_msg_create_code(SMT_SPEED, speed);
+
+  streaming_target_deliver(s->ths_output, sm);
+
+  pthread_mutex_unlock(&t->s_stream_mutex);
+}
+
+/**
+ * Set skip
+ */
+void
+subscription_set_skip ( th_subscription_t *s, const streaming_skip_t *skip )
+{
+  streaming_message_t *sm;
+  service_t *t = s->ths_service;
+
+  pthread_mutex_lock(&t->s_stream_mutex);
+
+  sm = streaming_msg_create(SMT_SKIP);
+  sm->sm_data = malloc(sizeof(streaming_skip_t));
+  memcpy(sm->sm_data, skip, sizeof(streaming_skip_t));
+
+  streaming_target_deliver(s->ths_output, sm);
+
+  pthread_mutex_unlock(&t->s_stream_mutex);
+}
index a73605f398e770ee209837dcb4f8efe0b780e34a..28b84af8917e73bd5de9fd6f5df63542378a79c7 100644 (file)
@@ -111,6 +111,14 @@ th_subscription_t *subscription_create(int weight, const char *name,
 
 void subscription_change_weight(th_subscription_t *s, int weight);
 
+void subscription_set_speed
+  (th_subscription_t *s, int32_t speed );
+
+void subscription_set_skip
+  (th_subscription_t *s, const streaming_skip_t *skip);
+
+void subscription_stop(th_subscription_t *s);
+
 void subscription_unlink_service(th_subscription_t *s, int reason);
 
 void subscription_dummy_join(const char *id, int first);
diff --git a/src/timeshift.c b/src/timeshift.c
new file mode 100644 (file)
index 0000000..ebbb0c6
--- /dev/null
@@ -0,0 +1,184 @@
+/**
+ *  TV headend - Timeshift
+ *  Copyright (C) 2012 Adam Sutton
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tvheadend.h"
+#include "streaming.h"
+#include "timeshift.h"
+#include "timeshift/private.h"
+#include "config2.h"
+#include "settings.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <assert.h>
+#include <stdio.h>
+
+static int timeshift_index = 0;
+
+/*
+ * Intialise global file manager
+ */
+void timeshift_init ( void )
+{
+  timeshift_filemgr_init();
+}
+
+/*
+ * Terminate global file manager
+ */
+void timeshift_term ( void )
+{
+  timeshift_filemgr_term();
+}
+
+/*
+ * Receive data
+ */
+static void timeshift_input
+  ( void *opaque, streaming_message_t *sm )
+{
+  timeshift_t *ts = opaque;
+
+  pthread_mutex_lock(&ts->state_mutex);
+
+  /* Control */
+  if (sm->sm_type == SMT_SKIP) {
+    if (ts->state >= TS_LIVE)
+      timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
+  } else if (sm->sm_type == SMT_SPEED) {
+    if (ts->state >= TS_LIVE)
+      timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
+  }
+
+  else {
+
+    /* Start */
+    if (sm->sm_type == SMT_START && ts->state == TS_INIT) {
+      ts->state  = TS_LIVE;
+    }
+
+    /* Pass-thru */
+    if (ts->state <= TS_LIVE) {
+      streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
+    }
+
+    /* Buffer to disk */
+    if (ts->state >= TS_LIVE) {
+      sm->sm_time = getmonoclock();
+      streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
+    } else
+      streaming_msg_free(sm);
+
+    /* Exit/Stop */
+    if (sm->sm_type == SMT_EXIT ||
+        (sm->sm_type == SMT_STOP && sm->sm_code == 0)) {
+      timeshift_write_exit(ts->rd_pipe.wr);
+      ts->state = TS_EXIT;
+    }
+  }
+
+  pthread_mutex_unlock(&ts->state_mutex);
+}
+
+/**
+ *
+ */
+void
+timeshift_destroy(streaming_target_t *pad)
+{
+  timeshift_t *ts = (timeshift_t*)pad;
+  timeshift_file_t *tsf;
+  streaming_message_t *sm;
+
+  /* Must hold global lock */
+  lock_assert(&global_lock);
+
+  /* Ensure the thread exits */
+  // Note: this is a workaround for the fact the Q might have been flushed
+  //       in reader thread (VERY unlikely)
+  sm = streaming_msg_create(SMT_EXIT);
+  streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
+
+  /* Wait for all threads */
+  pthread_join(ts->rd_thread, NULL);
+  pthread_join(ts->wr_thread, NULL);
+  pthread_join(ts->rm_thread, NULL);
+
+  /* Shut stuff down */
+  streaming_queue_deinit(&ts->wr_queue);
+
+  close(ts->rd_pipe.rd);
+  close(ts->rd_pipe.wr);
+
+  /* Flush files */
+  while ((tsf = TAILQ_FIRST(&ts->files)))
+    timeshift_filemgr_remove(ts, tsf, 1);
+
+  free(ts->path);
+  free(ts);
+}
+
+/**
+ * Create timeshift buffer
+ *
+ * max_period of buffer in seconds (0 = unlimited)
+ * max_size   of buffer in bytes   (0 = unlimited)
+ */
+streaming_target_t *timeshift_create
+  (streaming_target_t *out, time_t max_time)
+{
+  char buf[512];
+  timeshift_t *ts = calloc(1, sizeof(timeshift_t));
+
+  /* Must hold global lock */
+  lock_assert(&global_lock);
+
+  /* Create directories */
+  if (timeshift_filemgr_makedirs(timeshift_index, buf, sizeof(buf)))
+    return NULL;
+
+  /* Setup structure */
+  TAILQ_INIT(&ts->files);
+  ts->output     = out;
+  ts->path       = strdup(buf);
+  ts->max_time   = max_time;
+  ts->state      = TS_INIT;
+  ts->full       = 0;
+  ts->vididx     = -1;
+  ts->id         = timeshift_index;
+  pthread_mutex_init(&ts->rdwr_mutex, NULL);
+  pthread_mutex_init(&ts->state_mutex, NULL);
+
+  /* Initialise output */
+  tvh_pipe(O_NONBLOCK, &ts->rd_pipe);
+
+  /* Initialise input */
+  streaming_queue_init(&ts->wr_queue, 0);
+  streaming_target_init(&ts->input, timeshift_input, ts, 0);
+  pthread_create(&ts->wr_thread, NULL, timeshift_writer, ts);
+  pthread_create(&ts->rd_thread, NULL, timeshift_reader, ts);
+
+  /* Update index */
+  timeshift_index++;
+
+  return &ts->input;
+}
diff --git a/src/timeshift.h b/src/timeshift.h
new file mode 100644 (file)
index 0000000..69574bc
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ *  TV headend - Timeshift
+ *  Copyright (C) 2012 Adam Sutton
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __TVH_TIMESHIFT_H__
+#define __TVH_TIMESHIFT_H__
+
+void timeshift_init ( void );
+void timeshift_term ( void );
+
+streaming_target_t *timeshift_create
+  (streaming_target_t *out, time_t max_period);
+
+void timeshift_destroy(streaming_target_t *pad);
+
+#endif /* __TVH_TIMESHIFT_H__ */
diff --git a/src/timeshift/private.h b/src/timeshift/private.h
new file mode 100644 (file)
index 0000000..455fd93
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ *  TV headend - Timeshift
+ *  Copyright (C) 2012 Adam Sutton
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __TVH_TIMESHIFT_PRIVATE_H__
+#define __TVH_TIMESHIFT_PRIVATE_H__
+
+#define TS_PLAY_BUF 100000 // us to buffer in TX
+
+/**
+ * Indexes of import data in the stream
+ */
+typedef struct timeshift_index
+{
+  off_t                        pos;    ///< Position in the file
+  union {
+    int64_t                    time;   ///< Packet time
+    void                      *data;   ///< Associated data
+  };
+  TAILQ_ENTRY(timeshift_index) link;   ///< List entry
+} timeshift_index_t;
+
+typedef TAILQ_HEAD(timeshift_index_list,timeshift_index) timeshift_index_list_t;
+
+/**
+ * Timeshift file
+ */
+typedef struct timeshift_file
+{
+  int                         fd;       ///< Write descriptor
+  char                        *path;    ///< Full path to file
+
+  time_t                      time;     ///< Files coarse timestamp
+  size_t                      size;     ///< Current file size;
+  int64_t                     last;     ///< Latest timestamp
+
+  uint8_t                     bad;      ///< File is broken
+
+  int                         refcount; ///< Reader ref count
+
+  timeshift_index_list_t      iframes;  ///< I-frame indexing
+  timeshift_index_list_t      sstart;   ///< Stream start messages
+
+  TAILQ_ENTRY(timeshift_file) link;     ///< List entry
+} timeshift_file_t;
+
+typedef TAILQ_HEAD(timeshift_file_list,timeshift_file) timeshift_file_list_t;
+
+/**
+ *
+ */
+typedef struct timeshift {
+  // Note: input MUST BE FIRST in struct
+  streaming_target_t          input;      ///< Input source
+  streaming_target_t          *output;    ///< Output dest
+
+  int                         id;         ///< Reference number
+  char                        *path;      ///< Directory containing buffer
+  time_t                      max_time;   ///< Maximum period to shift
+
+  enum {
+    TS_INIT,
+    TS_EXIT,
+    TS_LIVE,
+    TS_PAUSE,
+    TS_PLAY,
+  }                           state;       ///< Play state
+  pthread_mutex_t             state_mutex; ///< Protect state changes
+  uint8_t                     full;        ///< Buffer is full
+
+  streaming_queue_t           wr_queue;   ///< Writer queue
+  pthread_t                   wr_thread;  ///< Writer thread
+
+  pthread_t                   rd_thread;  ///< Reader thread
+  th_pipe_t                   rd_pipe;    ///< Message passing to reader
+
+  pthread_t                   rm_thread;  ///< Reaper thread
+  timeshift_file_list_t       rm_list;    ///< Remove files
+
+  pthread_mutex_t             rdwr_mutex; ///< Buffer protection
+  timeshift_file_list_t       files;      ///< List of files
+
+  int                         vididx;     ///< Index of (current) video stream
+
+} timeshift_t;
+
+/*
+ * Write functions
+ */
+ssize_t timeshift_write_start   ( int fd, int64_t time, streaming_start_t *ss );
+ssize_t timeshift_write_sigstat ( int fd, int64_t time, signal_status_t *ss );
+ssize_t timeshift_write_packet  ( int fd, int64_t time, th_pkt_t *pkt );
+ssize_t timeshift_write_mpegts  ( int fd, int64_t time, void *data );
+ssize_t timeshift_write_skip    ( int fd, streaming_skip_t *skip );
+ssize_t timeshift_write_speed   ( int fd, int speed );
+ssize_t timeshift_write_stop    ( int fd, int code );
+ssize_t timeshift_write_exit    ( int fd );
+ssize_t timeshift_write_eof     ( int fd );
+
+void timeshift_writer_flush ( timeshift_t *ts );
+
+/*
+ * Threads
+ */
+void *timeshift_reader ( void *p );
+void *timeshift_writer ( void *p );
+
+/*
+ * File management
+ */
+void timeshift_filemgr_init     ( void );
+void timeshift_filemgr_term     ( void );
+int  timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len );
+
+timeshift_file_t *timeshift_filemgr_get
+  ( timeshift_t *ts, int create );
+timeshift_file_t *timeshift_filemgr_prev
+  ( timeshift_file_t *ts, int *end, int keep );
+timeshift_file_t *timeshift_filemgr_next
+  ( timeshift_file_t *ts, int *end, int keep );
+void timeshift_filemgr_remove
+  ( timeshift_t *ts, timeshift_file_t *tsf, int force );
+void timeshift_filemgr_close ( timeshift_file_t *tsf );
+
+#endif /* __TVH_TIMESHIFT_PRIVATE_H__ */
diff --git a/src/timeshift/timeshift_filemgr.c b/src/timeshift/timeshift_filemgr.c
new file mode 100644 (file)
index 0000000..ff00ff8
--- /dev/null
@@ -0,0 +1,306 @@
+/**
+ *  TV headend - Timeshift File Manager
+ *  Copyright (C) 2012 Adam Sutton
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "tvheadend.h"
+#include "streaming.h"
+#include "timeshift.h"
+#include "timeshift/private.h"
+#include "config2.h"
+#include "settings.h"
+
+static int                   timeshift_reaper_run;
+static timeshift_file_list_t timeshift_reaper_list;
+static pthread_t             timeshift_reaper_thread;
+static pthread_mutex_t       timeshift_reaper_lock;
+static pthread_cond_t        timeshift_reaper_cond;
+
+/* **************************************************************************
+ * File reaper thread
+ * *************************************************************************/
+
+static void* timeshift_reaper_callback ( void *p )
+{
+  char *dpath;
+  timeshift_file_t *tsf;
+  timeshift_index_t *ti;
+  streaming_message_t *sm;
+  pthread_mutex_lock(&timeshift_reaper_lock);
+  while (timeshift_reaper_run) {
+
+    /* Get next */
+    tsf = TAILQ_FIRST(&timeshift_reaper_list);
+    if (!tsf) {
+      pthread_cond_wait(&timeshift_reaper_cond, &timeshift_reaper_lock);
+      continue;
+    }
+    TAILQ_REMOVE(&timeshift_reaper_list, tsf, link);
+    pthread_mutex_unlock(&timeshift_reaper_lock);
+
+#ifdef TSHFT_TRACE
+    tvhlog(LOG_DEBUG, "timeshift", "remove file %s", tsf->path);
+#endif
+
+    /* Remove */
+    unlink(tsf->path);
+    dpath = dirname(tsf->path);
+    if (rmdir(dpath) == -1)
+      if (errno != ENOTEMPTY)
+        tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]",
+               dpath, strerror(errno));
+
+    /* Free memory */
+    while ((ti = TAILQ_FIRST(&tsf->iframes))) {
+      TAILQ_REMOVE(&tsf->iframes, ti, link);
+      free(ti);
+    }
+    while ((ti = TAILQ_FIRST(&tsf->sstart))) {
+      TAILQ_REMOVE(&tsf->sstart, ti, link);
+      sm = ti->data;
+      streaming_msg_free(sm);
+      free(ti);
+    }
+    free(tsf->path);
+    free(tsf);
+  }
+  pthread_mutex_unlock(&timeshift_reaper_lock);
+#ifdef TSHFT_TRACE
+  tvhlog(LOG_DEBUG, "timeshift", "reaper thread exit");
+#endif
+  return NULL;
+}
+
+static void timeshift_reaper_remove ( timeshift_file_t *tsf )
+{
+#ifdef TSHFT_TRACE
+  tvhlog(LOG_DEBUG, "timeshift", "queue file for removal %s", tsf->path);
+#endif
+  pthread_mutex_lock(&timeshift_reaper_lock);
+  TAILQ_INSERT_TAIL(&timeshift_reaper_list, tsf, link);
+  pthread_cond_signal(&timeshift_reaper_cond);
+  pthread_mutex_unlock(&timeshift_reaper_lock);
+}
+
+/* **************************************************************************
+ * File Handling
+ * *************************************************************************/
+
+/*
+ * Get root directory
+ */
+static void timeshift_filemgr_get_root ( char *buf, size_t len )
+{
+  const char *path = config_get_timeshift_path();
+  if (!path || !*path)
+    path = hts_settings_get_root();
+  snprintf(buf, len, "%s/timeshift", path);
+}
+
+/*
+ * Create timeshift directories (for a given instance)
+ */
+int timeshift_filemgr_makedirs ( int index, char *buf, size_t len )
+{
+  timeshift_filemgr_get_root(buf, len);
+  snprintf(buf+strlen(buf), len-strlen(buf), "/%d", index);
+  return makedirs(buf, 0700);
+}
+
+/*
+ * Close file
+ */
+void timeshift_filemgr_close ( timeshift_file_t *tsf )
+{
+  ssize_t r = timeshift_write_eof(tsf->fd);
+  if (r > 0)
+    tsf->size += r;
+  close(tsf->fd);
+  tsf->fd = -1;
+}
+
+/*
+ * Remove file
+ */
+void timeshift_filemgr_remove
+  ( timeshift_t *ts, timeshift_file_t *tsf, int force )
+{
+  if (tsf->fd != -1)
+    close(tsf->fd);
+  TAILQ_REMOVE(&ts->files, tsf, link);
+  timeshift_reaper_remove(tsf);
+}
+
+/*
+ * Get current / new file
+ */
+timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
+{
+  int fd;
+  struct timespec tp;
+  timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp;
+  timeshift_index_t *ti;
+  char path[512];
+
+  /* Return last file */
+  if (!create)
+    return TAILQ_LAST(&ts->files, timeshift_file_list);
+
+  /* No space */
+  if (ts->full)
+    return NULL;
+
+  /* Store to file */
+  clock_gettime(CLOCK_MONOTONIC_COARSE, &tp);
+  tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list);
+  if (!tsf_tl || tsf_tl->time != tp.tv_sec) {
+    tsf_hd = TAILQ_FIRST(&ts->files);
+
+    /* Close existing */
+    if (tsf_tl && tsf_tl->fd != -1)
+      timeshift_filemgr_close(tsf_tl);
+
+    /* Check period */
+    if (ts->max_time && tsf_hd && tsf_tl) {
+      time_t d = tsf_tl->time - tsf_hd->time;
+      if (d > (ts->max_time+5)) {
+        if (!tsf_hd->refcount) {
+          timeshift_filemgr_remove(ts, tsf_hd, 0);
+        } else {
+#ifdef TSHFT_TRACE
+          tvhlog(LOG_DEBUG, "timeshift", "ts %d buffer full", ts->id);
+#endif
+          ts->full = 1;
+        }
+      }
+    }
+
+    /* Check size */
+    // TODO: need to implement this
+
+    /* Create new file */
+    tsf_tmp = NULL;
+    if (!ts->full) {
+      snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, tp.tv_sec);
+#ifdef TSHFT_TRACE
+      tvhlog(LOG_DEBUG, "timeshift", "ts %d create file %s", ts->id, path);
+#endif
+      if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) {
+        tsf_tmp = calloc(1, sizeof(timeshift_file_t));
+        tsf_tmp->time     = tp.tv_sec;
+        tsf_tmp->fd       = fd;
+        tsf_tmp->path     = strdup(path);
+        tsf_tmp->refcount = 0;
+        TAILQ_INIT(&tsf_tmp->iframes);
+        TAILQ_INIT(&tsf_tmp->sstart);
+        TAILQ_INSERT_TAIL(&ts->files, tsf_tmp, link);
+
+        /* Copy across last start message */
+        if (tsf_tl && (ti = TAILQ_LAST(&tsf_tl->sstart, timeshift_index_list))) {
+#ifdef TSHFT_TRACE
+          tvhlog(LOG_DEBUG, "timeshift", "ts %d copy smt_start to new file",
+                 ts->id);
+#endif
+          timeshift_index_t *ti2 = calloc(1, sizeof(timeshift_index_t));
+          ti2->pos  = ti->pos;
+          ti2->data = streaming_msg_clone(ti->data);
+          TAILQ_INSERT_TAIL(&tsf_tmp->sstart, ti2, link);
+        }
+      }
+    }
+    tsf_tl = tsf_tmp;
+  }
+
+  return tsf_tl;
+}
+
+timeshift_file_t *timeshift_filemgr_next
+  ( timeshift_file_t *tsf, int *end, int keep )
+{
+  timeshift_file_t *nxt = TAILQ_NEXT(tsf, link);
+  if (!nxt && end)  *end = 1;
+  if (!nxt && keep) return tsf;
+  tsf->refcount--;
+  if (nxt)
+    nxt->refcount++;
+  return nxt;
+}
+
+timeshift_file_t *timeshift_filemgr_prev
+  ( timeshift_file_t *tsf, int *end, int keep )
+{
+  timeshift_file_t *nxt = TAILQ_PREV(tsf, timeshift_file_list, link);
+  if (!nxt && end)  *end = 1;
+  if (!nxt && keep) return tsf;
+  tsf->refcount--;
+  if (nxt)
+    nxt->refcount++;
+  return nxt;
+}
+
+/* **************************************************************************
+ * Setup / Teardown
+ * *************************************************************************/
+
+/*
+ * Initialise global structures
+ */
+void timeshift_filemgr_init ( void )
+{
+  char path[512];
+
+  /* Try to remove any rubbish left from last run */
+  timeshift_filemgr_get_root(path, sizeof(path));
+  rmtree(path);
+
+  /* Start the reaper thread */
+  timeshift_reaper_run = 1;
+  pthread_mutex_init(&timeshift_reaper_lock, NULL);
+  pthread_cond_init(&timeshift_reaper_cond, NULL);
+  TAILQ_INIT(&timeshift_reaper_list);
+  pthread_create(&timeshift_reaper_thread, NULL,
+                 timeshift_reaper_callback, NULL);
+}
+
+/*
+ * Terminate
+ */
+void timeshift_filemgr_term ( void )
+{
+  char path[512];
+
+  /* Wait for thread */
+  pthread_mutex_lock(&timeshift_reaper_lock);
+  timeshift_reaper_run = 0;
+  pthread_cond_signal(&timeshift_reaper_cond);
+  pthread_mutex_unlock(&timeshift_reaper_lock);
+  pthread_join(timeshift_reaper_thread, NULL);
+
+  /* Remove the lot */
+  timeshift_filemgr_get_root(path, sizeof(path));
+  rmtree(path);
+}
+
+
diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c
new file mode 100644 (file)
index 0000000..c3e57d7
--- /dev/null
@@ -0,0 +1,490 @@
+/**
+ *  TV headend - Timeshift Reader
+ *  Copyright (C) 2012 Adam Sutton
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tvheadend.h"
+#include "streaming.h"
+#include "timeshift.h"
+#include "timeshift/private.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <assert.h>
+
+/* **************************************************************************
+ * File Reading
+ * *************************************************************************/
+
+static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf )
+{
+  ssize_t r, cnt = 0;
+  size_t sz;
+
+  /* Size */
+  r = read(fd, &sz, sizeof(sz));
+  if (r < 0) return -1;
+  if (r != sizeof(sz)) return 0;
+  cnt += r;
+
+  /* Empty */
+  if (!sz) {
+    *pktbuf = NULL;
+    return cnt;
+  }
+
+  /* Data */
+  *pktbuf = pktbuf_alloc(NULL, sz);
+  r = read(fd, (*pktbuf)->pb_data, sz);
+  if (r != sz) {
+    free((*pktbuf)->pb_data);
+    free(*pktbuf);
+    return r < 0 ? -1 : 0;
+  }
+  cnt += r;
+
+  return cnt;
+}
+
+
+static ssize_t _read_msg ( int fd, streaming_message_t **sm )
+{
+  ssize_t r, cnt = 0;
+  size_t sz;
+  streaming_message_type_t type;
+  int64_t time;
+  void *data;
+  int code;
+
+  /* Clear */
+  *sm = NULL;
+
+  /* Size */
+  r = read(fd, &sz, sizeof(sz));
+  if (r < 0) return -1;
+  if (r != sizeof(sz)) return 0;
+  cnt += r;
+
+  /* EOF */
+  if (sz == 0) return cnt;
+
+  /* Type */
+  r = read(fd, &type, sizeof(type));
+  if (r < 0) return -1;
+  if (r != sizeof(type)) return 0;
+  cnt += r;
+
+  /* Time */
+  r = read(fd, &time, sizeof(time));
+  if (r < 0) return -1;
+  if (r != sizeof(time)) return 0;
+  cnt += r;
+
+  /* Adjust size */
+  sz -= sizeof(type) + sizeof(time);
+  cnt += sz;
+
+  /* Standard messages */
+  switch (type) {
+
+    /* Unhandled */
+    case SMT_START:
+    case SMT_NOSTART:
+    case SMT_SERVICE_STATUS:
+      break;
+
+    /* Code */
+    case SMT_STOP:
+    case SMT_EXIT:
+    case SMT_SPEED:
+      if (sz != sizeof(code)) return -1;
+      r = read(fd, &code, sz);
+      if (r != sz) {
+        if (r < 0) return -1;
+        return 0;
+      }
+      *sm = streaming_msg_create_code(type, code);
+      break;
+
+    /* Data */
+    case SMT_SKIP:
+    case SMT_SIGNAL_STATUS:
+    case SMT_MPEGTS:
+    case SMT_PACKET:
+      data = malloc(sz);
+      r = read(fd, data, sz);
+      if (r != sz) {
+        free(data);
+        if (r < 0) return -1;
+        return 0;
+      }
+      if (type == SMT_PACKET) {
+        th_pkt_t *pkt = data;
+        pkt->pkt_payload = pkt->pkt_header = NULL;
+        *sm = streaming_msg_create_pkt(pkt);
+        r   = _read_pktbuf(fd, &pkt->pkt_header);
+        if (r < 0) {
+          streaming_msg_free(*sm);
+          return r;
+        }
+        cnt += r;
+        r   = _read_pktbuf(fd, &pkt->pkt_payload);
+        if (r < 0) {
+          streaming_msg_free(*sm);
+          return r;
+        }
+        cnt += r;
+      } else {
+        *sm = streaming_msg_create_data(type, data);
+      }
+      (*sm)->sm_time = time;
+      break;
+  }
+
+  /* OK */
+  return cnt;
+}
+
+/* **************************************************************************
+ * Thread
+ * *************************************************************************/
+
+/*
+ * Timeshift thread
+ */
+void *timeshift_reader ( void *p )
+{
+  timeshift_t *ts = p;
+  int efd, nfds, end, fd = -1, run = 1, wait = -1;
+  off_t cur_off = 0;
+  int cur_speed = 100, keyframe_mode = 0;
+  int64_t pause_time = 0, play_time = 0, last_time = 0, tx_time = 0;
+  int64_t now, deliver;
+  streaming_message_t *sm = NULL, *ctrl;
+  timeshift_file_t *cur_file = NULL, *tsi_file = NULL;
+  timeshift_index_t *tsi = NULL;
+
+  /* Poll */
+  struct epoll_event ev;
+  efd = epoll_create(1);
+  ev.events  = EPOLLIN;
+  ev.data.fd = ts->rd_pipe.rd;
+  epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev);
+
+  /* Output */
+  while (run) {
+
+    /* Wait for data */
+    if(wait)
+      nfds = epoll_wait(efd, &ev, 1, wait);
+    else
+      nfds = 0;
+    wait = -1;
+    end  = 0;
+
+    /* Control */
+    pthread_mutex_lock(&ts->state_mutex);
+    if (nfds == 1) {
+      if (_read_msg(ev.data.fd, &ctrl) > 0) {
+
+        /* Exit */
+        if (ctrl->sm_type == SMT_EXIT) {
+#ifdef TSHFT_TRACE
+          tvhlog(LOG_DEBUG, "timeshift", "ts %d read exit request", ts->id);
+#endif
+          run = 0;
+          streaming_msg_free(ctrl);
+
+        /* Speed */
+        // TODO: currently just pause
+        } else if (ctrl->sm_type == SMT_SPEED) {
+          int speed = ctrl->sm_code;
+          int keyframe;
+
+          /* Bound it */
+          if (speed > 3200)  speed = 3200;
+          if (speed < -3200) speed = -3200;
+
+          /* Process */
+          if (cur_speed != speed) {
+
+            /* Live playback */
+            if (ts->state == TS_LIVE) {
+
+              /* Reject */
+              if (speed >= 100) {
+                tvhlog(LOG_DEBUG, "timeshift", "ts %d reject 1x+ in live mode",
+                       ts->id);
+                speed = 100;
+
+              /* Set position */
+              } else {
+                tvhlog(LOG_DEBUG, "timeshift", "ts %d enter timeshift mode",
+                       ts->id);
+                timeshift_writer_flush(ts);
+                pthread_mutex_lock(&ts->rdwr_mutex);
+                if ((cur_file   = timeshift_filemgr_get(ts, 0))) {
+                  cur_off    = cur_file->size;
+                  pause_time = cur_file->last;
+                  last_time  = pause_time;
+                }
+                pthread_mutex_unlock(&ts->rdwr_mutex);
+              }
+
+            /* Buffer playback */
+            } else if (ts->state == TS_PLAY) {
+              pause_time = last_time;
+
+            /* Paused */
+            } else {
+            }
+
+            /* Check keyframe mode */
+            keyframe      = (speed < 0) || (speed > 400);
+            if (keyframe != keyframe_mode) {
+              tvhlog(LOG_DEBUG, "timeshift", "using keyframe mode? %s",
+                     keyframe ? "yes" : "no");
+              keyframe_mode = keyframe;
+              if (keyframe) {
+                tsi      = NULL;
+                tsi_file = cur_file;
+              }
+            }
+
+            /* Update */
+            play_time  = getmonoclock();
+            cur_speed  = speed;
+            if (speed != 100 || ts->state != TS_LIVE)
+              ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
+            tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d",
+                   ts->id, speed);
+          }
+
+          /* Send on the message */
+          ctrl->sm_code = speed;
+          streaming_target_deliver2(ts->output, ctrl);
+
+        /* Skip */
+        } else {
+          streaming_msg_free(ctrl);
+        }
+
+        ctrl = NULL;
+      }
+    }
+
+    /* Done */
+    if (!run || ts->state != TS_PLAY || !cur_file) {
+      pthread_mutex_unlock(&ts->state_mutex);
+      continue;
+    }
+
+    /* Calculate delivery time */
+    now     = getmonoclock();
+    deliver = (now - play_time) + TS_PLAY_BUF;
+    deliver = (deliver * cur_speed) / 100;
+    deliver = (deliver + pause_time);
+
+    /* Rewind or Fast forward (i-frame only) */
+    if (keyframe_mode) {
+      wait = 0;
+
+      /* Find next index */
+      if (cur_speed < 0) {
+        if (!tsi) {
+          TAILQ_FOREACH_REVERSE(tsi, &tsi_file->iframes,
+                                timeshift_index_list, link) {
+            if (tsi->time < last_time) break;
+          }
+        }
+      } else {
+        if (!tsi) {
+          TAILQ_FOREACH(tsi, &tsi_file->iframes, link) {
+            if (tsi->time > last_time) break;
+          }
+        }
+      }
+
+      /* Next file */
+      if (!tsi) {
+        if (fd != -1)
+          close(fd);
+        wait     = 0; // immediately cycle around
+        fd       = -1;
+        if (cur_speed < 0)
+          tsi_file = timeshift_filemgr_prev(tsi_file, &end, 1);
+        else
+          tsi_file = timeshift_filemgr_next(tsi_file, &end, 0);
+      }
+
+      /* Deliver */
+      if (tsi && (((cur_speed < 0) && (tsi->time >= deliver)) ||
+                  ((cur_speed > 0) && (tsi->time <= deliver)))) {
+
+        /* Keep delivery to 5fps max */
+        if ((now - tx_time) >= 200000) {
+
+          /* Open */
+          if (fd == -1) {
+#ifdef TSHFT_TRACE
+            tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
+                 ts->id, tsi_file->path);
+#endif
+            fd = open(tsi_file->path, O_RDONLY);
+          }
+
+          /* Read */
+          off_t ret = lseek(fd, tsi->pos, SEEK_SET);
+          assert(ret == tsi->pos);
+          ssize_t r = _read_msg(fd, &sm);
+
+          /* Send */
+          if (r > 0) {
+#ifdef TSHFT_TRACE
+            tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
+                   ts->id, sm->sm_time);
+#endif
+            sm->sm_timeshift = now - sm->sm_time;
+            streaming_target_deliver2(ts->output, sm);
+            cur_file  = tsi_file;
+            cur_off   = tsi->pos + r;
+            last_time = sm->sm_time;
+            tx_time   = now;
+            sm        = NULL;
+          } else {
+            wait      = -1;
+            close(fd);
+            fd = -1;
+          }
+        }
+
+        /* Next index */
+        if (cur_speed < 0)
+          tsi = TAILQ_PREV(tsi, timeshift_index_list, link);
+        else
+          tsi = TAILQ_NEXT(tsi, link);
+
+      /* Not yet! */
+      } else if (tsi) {
+        if (cur_speed > 0)
+          wait = (tsi->time - deliver) / 1000;
+        else
+          wait = (deliver - tsi->time) / 1000;
+        if (wait == 0) wait = 1;
+      }
+
+    /* Full frame delivery */
+    } else {
+
+      /* Open file */
+      if (fd == -1) {
+#ifdef TSHFT_TRACE
+        tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
+               ts->id, cur_file->path);
+#endif
+        fd = open(cur_file->path, O_RDONLY);
+        if (cur_off) lseek(fd, cur_off, SEEK_SET);
+      }
+
+      /* Process */
+      pthread_mutex_lock(&ts->rdwr_mutex);
+      end = 1;
+      while (cur_file && cur_off < cur_file->size) {
+
+        /* Read msg */
+        if (!sm) {
+          ssize_t r = _read_msg(fd, &sm);
+          assert(r != -1);
+
+          /* Incomplete */
+          if (r == 0) {
+            lseek(fd, cur_off, SEEK_SET);
+            break;
+          }
+
+          cur_off += r;
+
+          /* Special case - EOF */
+          if (r == sizeof(size_t) || cur_off > cur_file->size) {
+            close(fd);
+            wait     = 0; // immediately cycle around
+            cur_off  = 0; // reset
+            fd       = -1;
+            cur_file = timeshift_filemgr_next(cur_file, NULL, 0);
+            break;
+          }
+        }
+
+        assert(sm);
+        end = 0;
+
+        /* Deliver */
+        if (sm->sm_time <= deliver) {
+#ifdef TSHFT_TRACE
+          tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
+                 ts->id, sm->sm_time);
+#endif
+          sm->sm_timeshift = now - sm->sm_time;
+          streaming_target_deliver2(ts->output, sm);
+          tx_time   = now;
+          last_time = sm->sm_time;
+          sm        = NULL;
+          wait      = 0;
+        } else {
+          wait = (sm->sm_time - deliver) / 1000;
+          if (wait == 0) wait = 1;
+          break;
+        }
+      }
+    }
+
+    /* Terminate */
+    if (!cur_file || end) {
+
+      /* Back to live */
+      if (cur_speed > 0) {
+        tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id);
+        ts->state = TS_LIVE;
+        cur_speed = 100;
+        ctrl      = streaming_msg_create_code(SMT_SPEED, cur_speed);
+        streaming_target_deliver2(ts->output, ctrl);
+
+      /* Pause */
+      } else if (cur_speed < 0) {
+        tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id);
+        cur_speed = 0;
+        ts->state = TS_PAUSE;
+        ctrl      = streaming_msg_create_code(SMT_SPEED, cur_speed);
+        streaming_target_deliver2(ts->output, ctrl);
+      }
+    }
+
+    pthread_mutex_unlock(&ts->rdwr_mutex);
+    pthread_mutex_unlock(&ts->state_mutex);
+  }
+
+  /* Cleanup */
+  if (sm) streaming_msg_free(sm);
+#ifdef TSHFT_TRACE
+  tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id);
+#endif
+
+  return NULL;
+}
diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c
new file mode 100644 (file)
index 0000000..a66e6e1
--- /dev/null
@@ -0,0 +1,327 @@
+/**
+ *  TV headend - Timeshift Write Handler
+ *  Copyright (C) 2012 Adam Sutton
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tvheadend.h"
+#include "streaming.h"
+#include "timeshift.h"
+#include "timeshift/private.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <assert.h>
+
+/* **************************************************************************
+ * File Writing
+ * *************************************************************************/
+
+/*
+ * Write data (retry on EAGAIN)
+ */
+static ssize_t _write
+  ( int fd, const void *buf, size_t count )
+{
+  ssize_t r;
+  size_t  n = 0;
+  while ( n < count ) {
+    r = write(fd, buf+n, count-n);
+    if (r == -1) {
+      if (errno == EAGAIN)
+        continue;
+      else
+        return -1;
+    }
+    n += r;
+  }
+  return count == n ? n : -1;
+}
+
+/*
+ * Write message
+ */
+static ssize_t _write_msg
+  ( int fd, streaming_message_type_t type, int64_t time,
+    const void *buf, size_t len )
+{
+  size_t len2 = len + sizeof(type) + sizeof(time);
+  ssize_t err, ret;
+  ret = err = _write(fd, &len2, sizeof(len2));
+  if (err < 0) return err;
+  err = _write(fd, &type, sizeof(type));
+  if (err < 0) return err;
+  ret += err;
+  err = _write(fd, &time, sizeof(time));
+  if (err < 0) return err;
+  ret += err;
+  if (len) {
+    err = _write(fd, buf, len);
+    if (err < 0) return err;
+    ret += err;
+  }
+  return ret;
+}
+
+/*
+ * Write packet buffer
+ */
+static int _write_pktbuf ( int fd, pktbuf_t *pktbuf )
+{
+  ssize_t ret, err;
+  if (pktbuf) {
+    ret = err = _write(fd, &pktbuf->pb_size, sizeof(pktbuf->pb_size));
+    if (err < 0) return err;
+    err = _write(fd, pktbuf->pb_data, pktbuf->pb_size);
+    if (err < 0) return err;
+    ret += err;
+  } else {
+    size_t sz = 0;
+    ret = _write(fd, &sz, sizeof(sz));
+  }
+  return ret;
+}
+
+/*
+ * Write signal status
+ */
+ssize_t timeshift_write_sigstat
+  ( int fd, int64_t time, signal_status_t *sigstat )
+{
+  return _write_msg(fd, SMT_SIGNAL_STATUS, time, sigstat,
+                    sizeof(signal_status_t));
+}
+
+/*
+ * Write packet
+ */
+ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt )
+{
+  ssize_t ret = 0, err;
+  ret = err = _write_msg(fd, SMT_PACKET, time, pkt, sizeof(th_pkt_t));
+  if (err <= 0) return err;
+  err = _write_pktbuf(fd, pkt->pkt_header);
+  if (err <= 0) return err;
+  ret += err;
+  err = _write_pktbuf(fd, pkt->pkt_payload);
+  if (err <= 0) return err;
+  ret += err;
+  return ret;
+}
+
+/*
+ * Write MPEGTS data
+ */
+ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data )
+{
+  return _write_msg(fd, SMT_MPEGTS, time, data, 188);
+}
+
+/*
+ * Write skip message
+ */
+ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip )
+{
+  return _write_msg(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t));
+}
+
+/*
+ * Write speed message
+ */
+ssize_t timeshift_write_speed ( int fd, int speed )
+{
+  return _write_msg(fd, SMT_SPEED, 0, &speed, sizeof(speed));
+}
+
+/*
+ * Stop
+ */
+ssize_t timeshift_write_stop ( int fd, int code )
+{
+  return _write_msg(fd, SMT_STOP, 0, &code, sizeof(code));
+}
+
+/*
+ * Exit
+ */
+ssize_t timeshift_write_exit ( int fd )
+{
+  int code = 0;
+  return _write_msg(fd, SMT_EXIT, 0, &code, sizeof(code));
+}
+
+/*
+ * Write end of file (special internal message)
+ */
+ssize_t timeshift_write_eof ( int fd )
+{
+  size_t sz = 0;
+  return _write(fd, &sz, sizeof(sz));
+}
+
+/* **************************************************************************
+ * Thread
+ * *************************************************************************/
+
+static inline ssize_t _process_msg0
+  ( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t **smp )
+{
+  int i;
+  ssize_t err;
+  streaming_start_t *ss;
+  streaming_message_t *sm = *smp;
+  if (sm->sm_type == SMT_START) {
+    err = 0;
+    timeshift_index_t *ti = calloc(1, sizeof(timeshift_index_t));
+    ti->pos  = tsf->size;
+    ti->data = sm;
+    *smp = NULL;
+    TAILQ_INSERT_TAIL(&tsf->sstart, ti, link);
+
+    /* Update video index */
+    ss = sm->sm_data;
+    for (i = 0; i < ss->ss_num_components; i++)
+      if (SCT_ISVIDEO(ss->ss_components[i].ssc_type))
+        ts->vididx = ss->ss_components[i].ssc_index;
+  } else if (sm->sm_type == SMT_SIGNAL_STATUS)
+    err = timeshift_write_sigstat(tsf->fd, sm->sm_time, sm->sm_data);
+  else if (sm->sm_type == SMT_PACKET) {
+    err = timeshift_write_packet(tsf->fd, sm->sm_time, sm->sm_data);
+    if (err > 0) {
+      th_pkt_t *pkt = sm->sm_data;
+
+      /* Index video iframes */
+      if (pkt->pkt_componentindex == ts->vididx &&
+          pkt->pkt_frametype      == PKT_I_FRAME) {
+        timeshift_index_t *ti = calloc(1, sizeof(timeshift_index_t));
+        ti->pos  = tsf->size;
+        ti->time = sm->sm_time;
+        TAILQ_INSERT_TAIL(&tsf->iframes, ti, link);
+      }
+    }
+  } else if (sm->sm_type == SMT_MPEGTS)
+    err = timeshift_write_mpegts(tsf->fd, sm->sm_time, sm->sm_data);
+  else
+    err = 0;
+
+  /* OK */
+  if (err > 0) {
+    tsf->last  = sm->sm_time;
+    tsf->size += err;
+  }
+  return err;
+}
+
+static void _process_msg
+  ( timeshift_t *ts, streaming_message_t *sm, int *run )
+{
+  int err;
+  timeshift_file_t *tsf;
+
+  /* Process */
+  switch (sm->sm_type) {
+
+    /* Terminate */
+    case SMT_EXIT:
+      if (run) *run = 0;
+      break;
+    case SMT_STOP:
+      if (sm->sm_code == 0 && run)
+        *run = 0;
+      break;
+
+    /* Timeshifting */
+    case SMT_SKIP:
+    case SMT_SPEED:
+      break;
+
+    /* Status */
+    case SMT_NOSTART:
+    case SMT_SERVICE_STATUS:
+      break;
+
+    /* Store */
+    case SMT_SIGNAL_STATUS:
+    case SMT_START:
+    case SMT_MPEGTS:
+    case SMT_PACKET:
+      pthread_mutex_lock(&ts->rdwr_mutex);
+      if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->fd != -1)) {
+        if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
+          timeshift_filemgr_close(tsf);
+          tsf->bad = 1;
+          ts->full = 1; ///< Stop any more writing
+        }
+      }
+      pthread_mutex_unlock(&ts->rdwr_mutex);
+      break;
+  }
+
+  /* Next */
+  if (sm)
+    streaming_msg_free(sm);
+}
+
+void *timeshift_writer ( void *aux )
+{
+  int run = 1;
+  timeshift_t *ts = aux;
+  streaming_queue_t *sq = &ts->wr_queue;
+  streaming_message_t *sm;
+
+  pthread_mutex_lock(&sq->sq_mutex);
+
+  while (run) {
+
+    /* Get message */
+    sm = TAILQ_FIRST(&sq->sq_queue);
+    if (sm == NULL) {
+      pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
+      continue;
+    }
+    TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+    pthread_mutex_unlock(&sq->sq_mutex);
+
+    _process_msg(ts, sm, &run);
+
+    pthread_mutex_lock(&sq->sq_mutex);
+  }
+
+  pthread_mutex_unlock(&sq->sq_mutex);
+  return NULL;
+}
+
+/* **************************************************************************
+ * Utilities
+ * *************************************************************************/
+
+void timeshift_writer_flush ( timeshift_t *ts )
+
+{
+  streaming_message_t *sm;
+  streaming_queue_t *sq = &ts->wr_queue;
+
+  pthread_mutex_lock(&sq->sq_mutex);
+  while ((sm = TAILQ_FIRST(&sq->sq_queue))) {
+    TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+    _process_msg(ts, sm, NULL);
+  }
+  pthread_mutex_unlock(&sq->sq_mutex);
+}
+
index c578a6edbc2f67e3721c2d24d65113b2f6e35739..2f6adcf9958bc00b218d4cc09a3f9add594297c2 100644 (file)
@@ -210,6 +210,24 @@ typedef struct signal_status {
   int unc;      /* uncorrected blocks */
 } signal_status_t;
 
+/**
+ * Streaming skip
+ */
+typedef struct streaming_skip
+{
+  enum {
+    SMT_SKIP_REL_TIME,
+    SMT_SKIP_ABS_TIME,
+    SMT_SKIP_REL_SIZE,
+    SMT_SKIP_ABS_SIZE
+  } type;
+  union {
+    off_t   size;
+    time_t  time;
+  };
+} streaming_skip_t;
+
+
 /**
  * A streaming pad generates data.
  * It has one or more streaming targets attached to it.
@@ -234,6 +252,7 @@ TAILQ_HEAD(streaming_message_queue, streaming_message);
  * Streaming messages types
  */
 typedef enum {
+
   /**
    * Packet with data.
    *
@@ -291,6 +310,17 @@ typedef enum {
    * Internal message to exit receiver
    */
   SMT_EXIT,
+
+  /**
+   * Set stream speed
+   */
+  SMT_SPEED,
+
+  /**
+   * Skip the stream
+   */
+  SMT_SKIP,
+
 } streaming_message_type_t;
 
 #define SMT_TO_MASK(x) (1 << ((unsigned int)x))
@@ -326,6 +356,10 @@ typedef enum {
 typedef struct streaming_message {
   TAILQ_ENTRY(streaming_message) sm_link;
   streaming_message_type_t sm_type;
+#if ENABLE_TIMESHIFT
+  int64_t  sm_time;
+  uint64_t sm_timeshift;
+#endif
   union {
     void *sm_data;
     int sm_code;
index 42eb6082b6b8df3c2a5e397710995990b6e8e12a..a7e4d968e5d772f7b51bd8a4cba2d72c3fe637d3 100644 (file)
@@ -103,6 +103,49 @@ tvheadend.miscconf = function() {
   if (tvheadend.capabilities.indexOf('imagecache') == -1)
     imagecachePanel.hide();
 
+       /* ****************************************************************
+        * Timeshift
+        * ***************************************************************/
+
+       var timeshiftPath = new Ext.form.TextField({
+               fieldLabel  : 'Temp. storage path',
+               name        : 'timeshiftpath',
+               allowBlank  : true,
+               width       : 400
+       });
+
+       var timeshiftPeriod = new Ext.form.NumberField({
+               fieldLabel  : 'Max period (minutes, per stream)',
+               name        : 'timeshiftperiod',
+               allowBlank  : false,
+               width       : 400
+       });
+
+       var timeshiftPeriodU = new Ext.form.Checkbox({
+               fieldLabel  : '(unlimited)',
+               name        : 'timeshiftperiod_unlimited',
+               allowBlank  : false,
+               width       : 400
+       });
+       timeshiftPeriodU.on('check', function(e, c) {
+               timeshiftPeriod.setDisabled(c);
+       });
+
+       var timeshiftSize = new Ext.form.NumberField({
+               fieldLabel  : 'Max size (MB, global)',
+               name        : 'timeshiftsize',
+               allowBlank  : false,
+               width       : 400
+       });
+
+       var timeshiftFields = new Ext.form.FieldSet({
+               title       : 'Timeshift',
+               width       : 700,
+               autoHeight  : true,
+               collapsible : true,
+               items       : [ timeshiftPath, timeshiftPeriod, timeshiftPeriodU ]//, timeshiftSize ]
+       });
+
        /* ****************************************************************
         * Form
         * ***************************************************************/
@@ -127,7 +170,7 @@ tvheadend.miscconf = function() {
                border : false,
                bodyStyle : 'padding:15px',
                labelAlign : 'left',
-               labelWidth : 150,
+               labelWidth : 200,
                waitMsgTarget : true,
                reader : confreader,
                layout : 'form',
@@ -149,6 +192,14 @@ tvheadend.miscconf = function() {
                                op : 'loadSettings'
                        },
                        success : function(form, action) {
+                               v = parseInt(timeshiftPeriod.getValue());
+                               if (v == 4294967295) {
+                                       timeshiftPeriodU.setValue(true);
+                                       timeshiftPeriod.setValue("");
+                                       timeshiftPeriod.setDisabled(true); // TODO: this isn't working
+                               } else {
+                                       timeshiftPeriod.setValue(v / 60);
+                               }
                                confpanel.enable();
                        }
                });
index 0257010e14b280c0098025275d3ed10f3bebc88e..3e1e525c3f95c4496cd53af3cade1a8b9ab64f00 100644 (file)
@@ -244,6 +244,8 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
       }
       break;
 
+    case SMT_SKIP:
+    case SMT_SPEED:
     case SMT_SIGNAL_STATUS:
       break;