]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
timeshift: write all 'temporary' packets (outside storage) when going to live
authorJaroslav Kysela <perex@perex.cz>
Wed, 30 Dec 2015 10:14:49 +0000 (11:14 +0100)
committerJaroslav Kysela <perex@perex.cz>
Wed, 30 Dec 2015 17:33:08 +0000 (18:33 +0100)
src/timeshift.c
src/timeshift/private.h
src/timeshift/timeshift_reader.c
src/timeshift/timeshift_writer.c

index bd5d1865aa2b360c1df7c88df2d3540169ad5973..94ff2af83ed0fd65b597c5fc447b93715659b0fa 100644 (file)
@@ -282,6 +282,35 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
   }
 }
 
+void
+timeshift_packets_clone( timeshift_t *ts, struct streaming_message_queue *dst )
+{
+  streaming_message_t *lowest, *sm;
+  struct streaming_message_queue *sq, *backlogs;
+  int i;
+
+  lock_assert(&ts->buffering_mutex);
+
+  /* init temporary queues */
+  backlogs = alloca(ts->backlog_max * sizeof(*backlogs));
+  for (i = 0; i < ts->backlog_max; i++)
+    TAILQ_INIT(&backlogs[i]);
+  /* push to destination (pts sorted) */
+  while (1) {
+    lowest = NULL;
+    for (i = 0; i < ts->backlog_max; i++) {
+      sq = &backlogs[i];
+      sm = TAILQ_FIRST(sq);
+      if (sm && (lowest == NULL || lowest->sm_time > sm->sm_time))
+        lowest = sm;
+    }
+    if (!lowest)
+      break;
+    TAILQ_REMOVE(sq, lowest, sm_link);
+    TAILQ_INSERT_TAIL(dst, lowest, sm_link);
+  }
+}
+
 /*
  * Receive data
  */
@@ -312,6 +341,8 @@ static void timeshift_input
       ts->state  = TS_LIVE;
     }
 
+    pthread_mutex_lock(&ts->buffering_mutex);
+
     /* Pass-thru */
     if (ts->state <= TS_LIVE) {
       if (sm->sm_type == SMT_START) {
@@ -367,6 +398,8 @@ static void timeshift_input
     }
 pktcont:
 
+    pthread_mutex_unlock(&ts->buffering_mutex);
+
     /* Exit/Stop */
     if (exit) {
       timeshift_write_exit(ts->rd_pipe.wr);
@@ -455,6 +488,7 @@ streaming_target_t *timeshift_create
     TAILQ_INIT(&ts->backlog[i]);
   pthread_mutex_init(&ts->rdwr_mutex, NULL);
   pthread_mutex_init(&ts->state_mutex, NULL);
+  pthread_mutex_init(&ts->buffering_mutex, NULL);
 
   /* Initialise output */
   tvh_pipe(O_NONBLOCK, &ts->rd_pipe);
index 7cd409367c074ddeabd3b42759f573c37e40f6c6..7a3165b6afa50088f4e172f8d6c8d96c12cb1912 100644 (file)
@@ -96,6 +96,7 @@ typedef struct timeshift {
   int64_t                     ref_time;   ///< Start time in us (monoclock)
   struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting
   int                         backlog_max;///< Maximum component index in backlog
+  pthread_mutex_t             buffering_mutex;///< Protect backlog / write queues
 
   enum {
     TS_INIT,
@@ -128,6 +129,11 @@ typedef struct timeshift {
 extern uint64_t timeshift_total_size;
 extern uint64_t timeshift_total_ram_size;
 
+/*
+ *
+ */
+void timeshift_packets_clone ( timeshift_t *ts, struct streaming_message_queue *dst );
+
 /*
  * Write functions
  */
@@ -142,6 +148,7 @@ ssize_t timeshift_write_exit    ( int fd );
 ssize_t timeshift_write_eof     ( timeshift_file_t *tsf );
 
 void timeshift_writer_flush ( timeshift_t *ts );
+void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst );
 
 /*
  * Threads
index 405ab575aeaec92f27ffc56511ad38bc9ad50b83..c8e1faf253374c7f45ab71d027d9c1d30af1dc40 100644 (file)
@@ -426,6 +426,24 @@ static int _timeshift_flush_to_live
   return 0;
 }
 
+/*
+ * Write packets from temporary queues
+ */
+static void _timeshift_write_queues
+  ( timeshift_t *ts )
+{
+  struct streaming_message_queue sq;
+  streaming_message_t *sm;
+
+  TAILQ_INIT(&sq);
+  timeshift_writer_clone(ts, &sq);
+  timeshift_packets_clone(ts, &sq);
+  while ((sm = TAILQ_FIRST(&sq)) != NULL) {
+    TAILQ_REMOVE(&sq, sm, sm_link);
+    streaming_target_deliver2(ts->output, sm);
+  }
+}
+
 /*
  * Send the status message
  */
@@ -850,11 +868,21 @@ void *timeshift_reader ( void *p )
         streaming_target_deliver2(ts->output, ctrl);
         ctrl      = NULL;
         tvhtrace("timeshift", "reader - set TS_LIVE");
-        ts->state = TS_LIVE;
+
+        /* Critical section - protect write / backlog queues */
+        pthread_mutex_lock(&ts->buffering_mutex);
 
         /* Flush timeshift buffer to live */
-        if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1)
+        if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1) {
+          pthread_mutex_unlock(&ts->buffering_mutex);
           break;
+        }
+
+        /* Flush write / backlog queues */
+        _timeshift_write_queues(ts);
+        
+        ts->state = TS_LIVE;
+        pthread_mutex_unlock(&ts->buffering_mutex);
 
         /* Close file (if open) */
         if (cur_file && cur_file->rfd >= 0) {
index 40355bcf081d749d07dbbd9dea86d9d405573433..3107c55492564e75049a538039cdfbbd566ba4ce 100644 (file)
@@ -384,3 +384,15 @@ void timeshift_writer_flush ( timeshift_t *ts )
   pthread_mutex_unlock(&sq->sq_mutex);
 }
 
+void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst )
+{
+  streaming_message_t *sm, *sm2;
+  streaming_queue_t *sq = &ts->wr_queue;
+
+  pthread_mutex_lock(&sq->sq_mutex);
+  TAILQ_FOREACH(sm, &sq->sq_queue, sm_link) {
+    sm2 = streaming_msg_clone(sm);
+    TAILQ_INSERT_TAIL(dst, sm2, sm_link);
+  }
+  pthread_mutex_unlock(&sq->sq_mutex);
+}