]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
timeshift: merge packet log code to one fcn, many fixes
authorJaroslav Kysela <perex@perex.cz>
Sun, 3 Jan 2016 12:54:52 +0000 (13:54 +0100)
committerJaroslav Kysela <perex@perex.cz>
Sun, 3 Jan 2016 12:54:52 +0000 (13:54 +0100)
src/timeshift.c
src/timeshift/private.h
src/timeshift/timeshift_reader.c
src/timeshift/timeshift_writer.c

index 1ad81716c75a8e371c83cdbbda157b099e3b0fcb..bdb0673eec0b0440a6c86e41b3682c5d378c3b8c 100644 (file)
@@ -38,6 +38,27 @@ static int timeshift_index = 0;
 
 struct timeshift_conf timeshift_conf;
 
+/*
+ * Packet log
+ */
+void
+timeshift_packet_log0
+  ( const char *source, timeshift_t *ts, streaming_message_t *sm )
+{
+  th_pkt_t *pkt = sm->sm_data;
+  tvhtrace("timeshift",
+           "ts %d pkt %s - stream %d type %c pts %10"PRId64
+           " dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
+           ts->id, source,
+           pkt->pkt_componentindex,
+           pkt_frametype_to_char(pkt->pkt_frametype),
+           ts_rescale(pkt->pkt_pts, 1000000),
+           ts_rescale(pkt->pkt_dts, 1000000),
+           pkt->pkt_duration,
+           pktbuf_len(pkt->pkt_payload),
+           sm->sm_time);
+}
+
 /*
  * Safe values for RAM configuration
  */
@@ -214,23 +235,6 @@ const idclass_t timeshift_conf_class = {
 #define MAX_TIME_DELTA (2*1000000) /* 2 seconds */
 #define BACKLOG_COUNT ARRAY_SIZE(timeshift_t->backlog)
 
-static void
-timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm )
-{
-  th_pkt_t *pkt = sm->sm_data;
-  tvhtrace("timeshift",
-           "ts %d pkt buf - stream %d type %c pts %10"PRId64
-           " dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
-           ts->id,
-           pkt->pkt_componentindex,
-           pkt_frametype_to_char(pkt->pkt_frametype),
-           ts_rescale(pkt->pkt_pts, 1000000),
-           ts_rescale(pkt->pkt_dts, 1000000),
-           pkt->pkt_duration,
-           pktbuf_len(pkt->pkt_payload),
-           sm->sm_time);
-  streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
-}
 
 static void
 timeshift_packet_flush ( timeshift_t *ts, int64_t time )
@@ -251,7 +255,9 @@ timeshift_packet_flush ( timeshift_t *ts, int64_t time )
     if (!lowest)
       break;
     TAILQ_REMOVE(sq, lowest, sm_link);
-    timeshift_packet_deliver(ts, lowest);
+    ts->last_wr_time = lowest->sm_time;
+    timeshift_packet_log("wr ", ts, lowest);
+    streaming_target_deliver2(&ts->wr_queue.sq_st, lowest);
   }
 }
 
@@ -274,7 +280,9 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
 
   sm->sm_time = time;
   if (time + MAX_TIME_DELTA < ts->last_time) {
-    timeshift_packet_deliver(ts, sm);
+    ts->last_wr_time = time;
+    timeshift_packet_log("wr2", ts, sm);
+    streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
   } else {
     if (pkt->pkt_componentindex >= ts->backlog_max)
       ts->backlog_max = pkt->pkt_componentindex + 1;
@@ -323,10 +331,11 @@ static void timeshift_input
 
     /* Send to the writer thread */
     if (ts->packet_mode) {
-      sm->sm_time = ts->last_time;
+      sm->sm_time = ts->last_wr_time;
       if (type == SMT_PACKET) {
         timeshift_packet(ts, pkt);
-        goto msg_free;
+        streaming_msg_free(sm);
+        goto _exit;
       }
     } else {
       if (ts->ref_time == 0) {
@@ -337,10 +346,9 @@ static void timeshift_input
       }
     }
     streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
-msg_free:
-    streaming_msg_free(sm);
 
     /* Exit/Stop */
+_exit:
     if (ts->exit)
       timeshift_write_exit(ts->rd_pipe.wr);
   }
@@ -417,6 +425,7 @@ streaming_target_t *timeshift_create
   ts->dobuf      = ts->ondemand ? 0 : 1;
   ts->packet_mode= 1;
   ts->last_time  = 0;
+  ts->last_wr_time = 0;
   ts->buf_time   = 0;
   ts->start_pts  = 0;
   ts->ref_time   = 0;
index b094289cde3dbc6f66bf2eeed93bf2ae8b920a0d..f668060a128d9d16a3612434792954a8e0414d9b 100644 (file)
@@ -19,7 +19,7 @@
 #ifndef __TVH_TIMESHIFT_PRIVATE_H__
 #define __TVH_TIMESHIFT_PRIVATE_H__
 
-#define TIMESHIFT_PLAY_BUF         500000  //< us to buffer in TX
+#define TIMESHIFT_PLAY_BUF         1000000  //< us to buffer in TX
 #define TIMESHIFT_FILE_PERIOD      60      //< number of secs in each buffer file
 #define TIMESHIFT_BACKLOG_MAX      16      //< maximum elementary streams
 
@@ -94,6 +94,7 @@ typedef struct timeshift {
   int                         packet_mode;///< Packet mode (otherwise MPEG-TS data mode)
   int                         dobuf;      ///< Buffer packets (store)
   int64_t                     last_time;  ///< Last time in us (PTS conversion)
+  int64_t                     last_wr_time;///< Last write time in us (PTS conversion)
   int64_t                     start_pts;  ///< Start time for packets (PTS)
   int64_t                     ref_time;   ///< Start time in us (monoclock)
   int64_t                     buf_time;   ///< Last buffered time in us (PTS conversion)
@@ -128,6 +129,16 @@ typedef struct timeshift {
 extern uint64_t timeshift_total_size;
 extern uint64_t timeshift_total_ram_size;
 
+void timeshift_packet_log0
+  ( const char *prefix, timeshift_t *ts, streaming_message_t *sm );
+
+static inline void timeshift_packet_log
+  ( const char *prefix, timeshift_t *ts, streaming_message_t *sm )
+{
+  if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
+    timeshift_packet_log0(prefix, ts, sm);
+}
+
 /*
  * Write functions
  */
@@ -141,9 +152,6 @@ ssize_t timeshift_write_stop    ( int fd, int code );
 ssize_t timeshift_write_exit    ( int fd );
 ssize_t timeshift_write_eof     ( timeshift_file_t *tsf );
 
-void timeshift_writer_flush ( timeshift_t *ts );
-void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst );
-
 /*
  * Threads
  */
index 1b0a19e8445dd4120dece56f5c1977237b1a4ce0..0668e5dd51d2a505125aaee4948bf4015cdb79b9 100644 (file)
@@ -413,25 +413,6 @@ static int _timeshift_read
   return 0;
 }
 
-/*
- * Trace packet
- */
-static void timeshift_trace_pkt
-  ( timeshift_t *ts, streaming_message_t *sm )
-{
-  th_pkt_t *pkt = sm->sm_data;
-  tvhtrace("timeshift",
-           "ts %d pkt out - stream %d type %c pts %10"PRId64
-           " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64,
-           ts->id,
-           pkt->pkt_componentindex,
-           pkt_frametype_to_char(pkt->pkt_frametype),
-           ts_rescale(pkt->pkt_pts, 1000000),
-           ts_rescale(pkt->pkt_dts, 1000000),
-           pkt->pkt_duration,
-           pktbuf_len(pkt->pkt_payload), sm->sm_time);
-}
-
 /*
  * Flush all data to live
  */
@@ -444,8 +425,7 @@ static int _timeshift_flush_to_live
     if (_timeshift_read(ts, cur_file, &sm, wait) == -1)
       return -1;
     if (!sm) break;
-    if (tvhtrace_enabled() && sm->sm_type == SMT_PACKET)
-      timeshift_trace_pkt(ts, sm);
+    timeshift_packet_log("ouf", ts, sm);
     streaming_target_deliver2(ts->output, sm);
   }
   return 0;
@@ -503,7 +483,7 @@ static void timeshift_status
 void *timeshift_reader ( void *p )
 {
   timeshift_t *ts = p;
-  int nfds, end, run = 1, wait = -1;
+  int nfds, end, run = 1, wait = -1, state;
   timeshift_file_t *cur_file = NULL, *tmp_file;
   int cur_speed = 100, keyframe_mode = 0;
   int64_t mono_now, mono_play_time = 0, mono_last_status = 0;
@@ -567,7 +547,8 @@ void *timeshift_reader ( void *p )
           if (cur_speed != speed) {
 
             /* Live playback */
-            if (ts->state == TS_LIVE) {
+            state = ts->state;
+            if (state == TS_LIVE) {
 
               /* Reject */
               if (speed >= 100) {
@@ -579,7 +560,6 @@ void *timeshift_reader ( void *p )
               } else {
                 tvhlog(LOG_DEBUG, "timeshift", "ts %d enter timeshift mode",
                        ts->id);
-                timeshift_writer_flush(ts);
                 ts->dobuf = 1;
                 tmp_file = timeshift_filemgr_newest(ts);
                 if (tmp_file != NULL) {
@@ -613,14 +593,15 @@ void *timeshift_reader ( void *p )
 
             /* Update */
             cur_speed = speed;
-            if (speed != 100 || ts->state != TS_LIVE) {
+            if (speed != 100 || state != TS_LIVE) {
               ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
               tvhtrace("timeshift", "reader - set %s", speed == 0 ? "TS_PAUSE" : "TS_PLAY");
             }
-            if (ts->state == TS_PLAY) {
+            if (ts->state == TS_PLAY && state != TS_PLAY) {
               mono_play_time = mono_now;
-              tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64, mono_now, pause_time);
-            } else if (ts->state == TS_PAUSE) {
+              tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64,
+                       mono_now, pause_time);
+            } else if (ts->state == TS_PAUSE && state != TS_PAUSE) {
               pause_time = last_time;
             }
             tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d", ts->id, speed);
@@ -815,8 +796,7 @@ void *timeshift_reader ( void *p )
                 ((cur_speed > 0) && (sm->sm_time <= deliver))))) {
 
       last_time = sm->sm_time;
-      if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
-        timeshift_trace_pkt(ts, sm);
+      timeshift_packet_log("out", ts, sm);
       streaming_target_deliver2(ts->output, sm);
       sm        = NULL;
       wait      = 0;
index ac9dc298a7d748a12a78625b5937b2b7df7800a2..89e30d2b0d46fe76dca8eeeb7381acb019710eb9 100644 (file)
@@ -253,15 +253,13 @@ static void _handle_sstart ( timeshift_t *ts, timeshift_file_t *tsf, streaming_m
  * *************************************************************************/
 
 static inline ssize_t _process_msg0
-  ( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t **smp )
+  ( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t *sm )
 {
   ssize_t err;
-  streaming_message_t *sm = *smp;
 
   if (sm->sm_type == SMT_START) {
     err = 0;
-    _handle_sstart(ts, tsf, sm);
-    *smp = NULL;
+    _handle_sstart(ts, tsf, streaming_msg_clone(sm));
   } else if (sm->sm_type == SMT_SIGNAL_STATUS)
     err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data);
   else if (sm->sm_type == SMT_PACKET) {
@@ -307,7 +305,7 @@ static void _process_msg
     /* Terminate */
     case SMT_EXIT:
       if (run) *run = 0;
-      goto live;
+      break;
     case SMT_STOP:
       if (sm->sm_code != SM_CODE_SOURCE_RECONFIGURED && run)
         *run = 0;
@@ -334,14 +332,19 @@ static void _process_msg
     case SMT_PACKET:
       pthread_mutex_lock(&ts->state_mutex);
       ts->buf_time = sm->sm_time;
-      if (ts->state == TS_LIVE)
+      if (ts->state == TS_LIVE) {
         streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
+        if (sm->sm_type == SMT_PACKET)
+          timeshift_packet_log("liv", ts, sm);
+      }
       if (ts->dobuf) {
         if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) {
-          if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
+          if ((err = _process_msg0(ts, tsf, sm)) < 0) {
             timeshift_filemgr_close(tsf);
             tsf->bad = 1;
             ts->full = 1; ///< Stop any more writing
+          } else {
+            timeshift_packet_log("sav", ts, sm);
           }
           tsf->refcount--;
         }
@@ -352,6 +355,7 @@ static void _process_msg
 
   /* Next */
   streaming_msg_free(sm);
+  return;
 
 live:
   pthread_mutex_lock(&ts->state_mutex);
@@ -388,34 +392,3 @@ void *timeshift_writer ( void *aux )
   pthread_mutex_unlock(&sq->sq_mutex);
   return NULL;
 }
-
-/* **************************************************************************
- * Utilities
- * *************************************************************************/
-
-void timeshift_writer_flush ( timeshift_t *ts )
-
-{
-  streaming_message_t *sm;
-  streaming_queue_t *sq = &ts->wr_queue;
-
-  pthread_mutex_lock(&sq->sq_mutex);
-  while ((sm = TAILQ_FIRST(&sq->sq_queue))) {
-    streaming_queue_remove(sq, sm);
-    _process_msg(ts, sm, NULL);
-  }
-  pthread_mutex_unlock(&sq->sq_mutex);
-}
-
-void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst )
-{
-  streaming_message_t *sm, *sm2;
-  streaming_queue_t *sq = &ts->wr_queue;
-
-  pthread_mutex_lock(&sq->sq_mutex);
-  TAILQ_FOREACH(sm, &sq->sq_queue, sm_link) {
-    sm2 = streaming_msg_clone(sm);
-    TAILQ_INSERT_TAIL(dst, sm2, sm_link);
-  }
-  pthread_mutex_unlock(&sq->sq_mutex);
-}