]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
timeshift: reduce and improve the logic - move more packet handling logic to writer...
authorJaroslav Kysela <perex@perex.cz>
Sun, 3 Jan 2016 10:19:27 +0000 (11:19 +0100)
committerJaroslav Kysela <perex@perex.cz>
Sun, 3 Jan 2016 10:24:57 +0000 (11:24 +0100)
src/packet.h
src/timeshift.c
src/timeshift/private.h
src/timeshift/timeshift_reader.c
src/timeshift/timeshift_writer.c

index fb12be1c5847bb304af4db7b6ea177245bcad31e..14d3c6c697096308473ccddf7371827cd4fea00e 100644 (file)
@@ -56,9 +56,6 @@ typedef struct th_pkt {
   uint8_t pkt_componentindex;
   uint8_t pkt_frametype;
   uint8_t pkt_field;  // Set if packet is only a half frame (a field)
-#if ENABLE_TIMESHIFT
-  uint8_t pkt_delivered;
-#endif
 
   uint8_t pkt_channels;
   uint8_t pkt_sri;
index 19e956426c7e67b559692be7482a5b4f88b27b80..1ad81716c75a8e371c83cdbbda157b099e3b0fcb 100644 (file)
@@ -233,7 +233,7 @@ timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm )
 }
 
 static void
-timeshift_packet_flush ( timeshift_t *ts, int64_t time, int deliver )
+timeshift_packet_flush ( timeshift_t *ts, int64_t time )
 {
   streaming_message_t *lowest, *sm;
   struct streaming_message_queue *sq;
@@ -251,15 +251,12 @@ timeshift_packet_flush ( timeshift_t *ts, int64_t time, int deliver )
     if (!lowest)
       break;
     TAILQ_REMOVE(sq, lowest, sm_link);
-    if (deliver)
-      timeshift_packet_deliver(ts, lowest);
-    else
-      streaming_msg_free(lowest);
+    timeshift_packet_deliver(ts, lowest);
   }
 }
 
 static void
-timeshift_packet( timeshift_t *ts, th_pkt_t *pkt, int deliver )
+timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
 {
   streaming_message_t *sm;
   int64_t time;
@@ -272,61 +269,16 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt, int deliver )
   time = ts_rescale(pkt->pkt_pts, 1000000);
   if (time > ts->last_time) {
     ts->last_time = time;
-    timeshift_packet_flush(ts, time, deliver);
+    timeshift_packet_flush(ts, time);
   }
 
   sm->sm_time = time;
   if (time + MAX_TIME_DELTA < ts->last_time) {
-    if (deliver)
-      timeshift_packet_deliver(ts, sm);
+    timeshift_packet_deliver(ts, sm);
   } else {
     if (pkt->pkt_componentindex >= ts->backlog_max)
       ts->backlog_max = pkt->pkt_componentindex + 1;
     TAILQ_INSERT_TAIL(&ts->backlog[pkt->pkt_componentindex], sm, sm_link);
-    pkt->pkt_delivered = ts->state <= TS_LIVE;
-  }
-}
-
-void
-timeshift_packets_clone
-  ( timeshift_t *ts, struct streaming_message_queue *dst, int delivered )
-{
-  streaming_message_t *lowest, *sm, *sm2;
-  struct streaming_message_queue *sq, *sq2, *backlogs;
-  th_pkt_t *pkt;
-  int i;
-
-  lock_assert(&ts->state_mutex);
-
-  /* init temporary queues and copy the backlog data */
-  backlogs = alloca(ts->backlog_max * sizeof(*backlogs));
-  for (i = 0; i < ts->backlog_max; i++) {
-    sq = &backlogs[i];
-    sq2 = &ts->backlog[i];
-    TAILQ_INIT(sq);
-    TAILQ_FOREACH(sm, sq2, sm_link) {
-      if (!delivered) {
-        pkt = sm->sm_data;
-        if (pkt->pkt_delivered)
-          continue;
-      }
-      sm2 = streaming_msg_clone(sm);
-      TAILQ_INSERT_TAIL(sq, sm2, sm_link);
-    }
-  }
-  /* push to destination (pts sorted) */
-  while (1) {
-    lowest = NULL;
-    for (i = 0; i < ts->backlog_max; i++) {
-      sq = &backlogs[i];
-      sm = TAILQ_FIRST(sq);
-      if (sm && (lowest == NULL || lowest->sm_time > sm->sm_time))
-        lowest = sm;
-    }
-    if (!lowest)
-      break;
-    TAILQ_REMOVE(sq, lowest, sm_link);
-    TAILQ_INSERT_TAIL(dst, lowest, sm_link);
   }
 }
 
@@ -336,31 +288,24 @@ timeshift_packets_clone
 static void timeshift_input
   ( void *opaque, streaming_message_t *sm )
 {
-  int exit = 0, type = sm->sm_type;
+  int type = sm->sm_type;
   timeshift_t *ts = opaque;
   th_pkt_t *pkt = sm->sm_data, *pkt2;
 
-  pthread_mutex_lock(&ts->state_mutex);
-
   /* Control */
   if (type == SMT_SKIP) {
-    if (ts->state >= TS_LIVE)
-      timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
+    timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
     streaming_msg_free(sm);
   } else if (type == SMT_SPEED) {
-    if (ts->state >= TS_LIVE)
-      timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
+    timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
     streaming_msg_free(sm);
-  }
-
-  else {
+  } else {
 
-    /* Start */
-    if (type == SMT_START && ts->state == TS_INIT)
-      ts->state = TS_LIVE;
+    if (type == SMT_START)
+      timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA);
 
     /* Change PTS/DTS offsets */
-    if (ts->packet_mode && ts->start_pts && type == SMT_PACKET) {
+    else if (ts->packet_mode && ts->start_pts && type == SMT_PACKET) {
       pkt2 = pkt_copy_shallow(pkt);
       pkt_ref_dec(pkt);
       sm->sm_data = pkt = pkt2;
@@ -368,73 +313,37 @@ static void timeshift_input
       pkt->pkt_dts += ts->start_pts;
     }
 
-    /* Pass-thru */
-    if (ts->state <= TS_LIVE) {
-      if (type == SMT_START) {
-        if (ts->smt_start)
-          streaming_start_unref(ts->smt_start);
-        ts->smt_start = sm->sm_data;
-        atomic_add(&ts->smt_start->ss_refcount, 1);
-        if (ts->packet_mode) {
-          timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA + 1000, ts->dobuf);
-          if (ts->last_time)
-            ts->start_pts = ts->last_time + 1000;
-        }
-      }
-      streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
-    }
-
     /* Check for exit */
-    if (type == SMT_EXIT ||
+    else if (type == SMT_EXIT ||
         (type == SMT_STOP && sm->sm_code != SM_CODE_SOURCE_RECONFIGURED))
-      exit = 1;
+      ts->exit = 1;
 
-    if (type == SMT_MPEGTS)
+    else if (type == SMT_MPEGTS)
       ts->packet_mode = 0;
 
-    /* Buffer to disk */
-    if ((ts->state > TS_LIVE) || (ts->dobuf && (ts->state == TS_LIVE))) {
-      if (ts->packet_mode) {
-        sm->sm_time = ts->last_time;
-        if (type == SMT_PACKET) {
-          timeshift_packet(ts, pkt, 1);
-          goto msg_free;
-        }
-      } else {
-        if (ts->ref_time == 0) {
-          ts->ref_time = getmonoclock();
-          sm->sm_time = 0;
-        } else {
-          sm->sm_time = getmonoclock() - ts->ref_time;
-        }
+    /* Send to the writer thread */
+    if (ts->packet_mode) {
+      sm->sm_time = ts->last_time;
+      if (type == SMT_PACKET) {
+        timeshift_packet(ts, pkt);
+        goto msg_free;
       }
-      streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
     } else {
-      if (type == SMT_PACKET) {
-        timeshift_packet(ts, pkt, 0);
-        tvhtrace("timeshift",
-                 "ts %d pkt in  - stream %d type %c pts %10"PRId64
-                 " dts %10"PRId64" dur %10d len %6zu",
-                 ts->id,
-                 pkt->pkt_componentindex,
-                 pkt_frametype_to_char(pkt->pkt_frametype),
-                 ts_rescale(pkt->pkt_pts, 1000000),
-                 ts_rescale(pkt->pkt_dts, 1000000),
-                 pkt->pkt_duration,
-                 pktbuf_len(pkt->pkt_payload));
+      if (ts->ref_time == 0) {
+        ts->ref_time = getmonoclock();
+        sm->sm_time = 0;
+      } else {
+        sm->sm_time = getmonoclock() - ts->ref_time;
       }
-msg_free:
-      streaming_msg_free(sm);
     }
+    streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
+msg_free:
+    streaming_msg_free(sm);
 
     /* Exit/Stop */
-    if (exit) {
+    if (ts->exit)
       timeshift_write_exit(ts->rd_pipe.wr);
-      ts->state = TS_EXIT;
-    }
   }
-
-  pthread_mutex_unlock(&ts->state_mutex);
 }
 
 /**
@@ -474,10 +383,6 @@ timeshift_destroy(streaming_target_t *pad)
   /* Flush files */
   timeshift_filemgr_flush(ts, NULL);
 
-  /* Release SMT_START index */
-  if (ts->smt_start)
-    streaming_start_unref(ts->smt_start);
-
   if (ts->path)
     free(ts->path);
   free(ts);
@@ -503,7 +408,8 @@ streaming_target_t *timeshift_create
   ts->output     = out;
   ts->path       = NULL;
   ts->max_time   = max_time;
-  ts->state      = TS_INIT;
+  ts->state      = TS_LIVE;
+  ts->exit       = 0;
   ts->full       = 0;
   ts->vididx     = -1;
   ts->id         = timeshift_index;
@@ -511,6 +417,7 @@ streaming_target_t *timeshift_create
   ts->dobuf      = ts->ondemand ? 0 : 1;
   ts->packet_mode= 1;
   ts->last_time  = 0;
+  ts->buf_time   = 0;
   ts->start_pts  = 0;
   ts->ref_time   = 0;
   for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++)
index cce7feee61dfc81010dca1e40015d4788e64e558..b094289cde3dbc6f66bf2eeed93bf2ae8b920a0d 100644 (file)
@@ -19,7 +19,7 @@
 #ifndef __TVH_TIMESHIFT_PRIVATE_H__
 #define __TVH_TIMESHIFT_PRIVATE_H__
 
-#define TIMESHIFT_PLAY_BUF         2000000 //< us to buffer in TX
+#define TIMESHIFT_PLAY_BUF         500000  //< us to buffer in TX
 #define TIMESHIFT_FILE_PERIOD      60      //< number of secs in each buffer file
 #define TIMESHIFT_BACKLOG_MAX      16      //< maximum elementary streams
 
@@ -96,21 +96,20 @@ typedef struct timeshift {
   int64_t                     last_time;  ///< Last time in us (PTS conversion)
   int64_t                     start_pts;  ///< Start time for packets (PTS)
   int64_t                     ref_time;   ///< Start time in us (monoclock)
+  int64_t                     buf_time;   ///< Last buffered time in us (PTS conversion)
   struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting
   int                         backlog_max;///< Maximum component index in backlog
 
   enum {
-    TS_INIT,
     TS_EXIT,
     TS_LIVE,
     TS_PAUSE,
     TS_PLAY,
   }                           state;       ///< Play state
   pthread_mutex_t             state_mutex; ///< Protect state changes
+  uint8_t                     exit;        ///< Exit from the main input thread
   uint8_t                     full;        ///< Buffer is full
   
-  streaming_start_t          *smt_start;   ///< Current stream makeup
-
   streaming_queue_t           wr_queue;   ///< Writer queue
   pthread_t                   wr_thread;  ///< Writer thread
 
@@ -129,11 +128,6 @@ typedef struct timeshift {
 extern uint64_t timeshift_total_size;
 extern uint64_t timeshift_total_ram_size;
 
-/*
- *
- */
-void timeshift_packets_clone ( timeshift_t *ts, struct streaming_message_queue *dst, int delivered );
-
 /*
  * Write functions
  */
index 4efbe49f1b09d0885aef8bf590a959952dbb9372..1b0a19e8445dd4120dece56f5c1977237b1a4ce0 100644 (file)
@@ -202,19 +202,6 @@ static ssize_t _read_msg ( timeshift_file_t *tsf, int fd, streaming_message_t **
  * Utilities
  * *************************************************************************/
 
-static streaming_message_t *_timeshift_find_sstart
-  ( timeshift_file_t *tsf, int64_t time )
-{
-  timeshift_index_data_t *ti;
-
-  /* Find the SMT_START message that relates (comes before) the given time */
-  ti = TAILQ_LAST(&tsf->sstart, timeshift_index_data_list);
-  while (ti && ti->data->sm_time > time)
-    ti = TAILQ_PREV(ti, timeshift_index_data_list, link);
-
-  return ti ? ti->data : NULL;
-}
-
 static int64_t _timeshift_first_time
   ( timeshift_t *ts, int *active )
 { 
@@ -421,21 +408,29 @@ static int _timeshift_read
       *wait     = 0;
       tvhtrace("timeshift", "ts %d eof, cur_file %p", ts->id, *cur_file);
 
-    /* Check SMT_START index */
-    } else {
-      streaming_message_t *ssm = _timeshift_find_sstart(*cur_file, (*sm)->sm_time);
-      if (ssm && ssm->sm_data != ts->smt_start) {
-        streaming_target_deliver2(ts->output, streaming_msg_clone(ssm));
-        if (ts->smt_start)
-          streaming_start_unref(ts->smt_start);
-        ts->smt_start = ssm->sm_data;
-        atomic_add(&ts->smt_start->ss_refcount, 1);
-      }
     }
   }
   return 0;
 }
 
+/*
+ * Trace packet
+ */
+static void timeshift_trace_pkt
+  ( timeshift_t *ts, streaming_message_t *sm )
+{
+  th_pkt_t *pkt = sm->sm_data;
+  tvhtrace("timeshift",
+           "ts %d pkt out - stream %d type %c pts %10"PRId64
+           " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64,
+           ts->id,
+           pkt->pkt_componentindex,
+           pkt_frametype_to_char(pkt->pkt_frametype),
+           ts_rescale(pkt->pkt_pts, 1000000),
+           ts_rescale(pkt->pkt_dts, 1000000),
+           pkt->pkt_duration,
+           pktbuf_len(pkt->pkt_payload), sm->sm_time);
+}
 
 /*
  * Flush all data to live
@@ -445,39 +440,17 @@ static int _timeshift_flush_to_live
 {
   streaming_message_t *sm;
 
-  time_t pts = 0;
   while (*cur_file) {
     if (_timeshift_read(ts, cur_file, &sm, wait) == -1)
       return -1;
     if (!sm) break;
-    if (sm->sm_type == SMT_PACKET) {
-      pts = ((th_pkt_t*)sm->sm_data)->pkt_pts;
-      tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t,
-             ts->id, sm->sm_time, pts);
-    }
+    if (tvhtrace_enabled() && sm->sm_type == SMT_PACKET)
+      timeshift_trace_pkt(ts, sm);
     streaming_target_deliver2(ts->output, sm);
   }
   return 0;
 }
 
-/*
- * Write packets from temporary queues
- */
-static void _timeshift_write_queues
-  ( timeshift_t *ts )
-{
-  struct streaming_message_queue sq;
-  streaming_message_t *sm;
-
-  TAILQ_INIT(&sq);
-  timeshift_writer_clone(ts, &sq);
-  timeshift_packets_clone(ts, &sq, 0);
-  while ((sm = TAILQ_FIRST(&sq)) != NULL) {
-    TAILQ_REMOVE(&sq, sm, sm_link);
-    streaming_target_deliver2(ts->output, sm);
-  }
-}
-
 /*
  * Send the status message
  */
@@ -488,7 +461,7 @@ static void timeshift_fill_status
   int64_t start, end;
 
   start = _timeshift_first_time(ts, &active);
-  end   = ts->last_time;
+  end   = ts->buf_time;
   if (current_time < 0)
     current_time = 0;
   if (current_time > end)
@@ -519,25 +492,6 @@ static void timeshift_status
   streaming_target_deliver2(ts->output, tsm);
 }
 
-/*
- * Trace packet
- */
-static void timeshift_trace_pkt
-  ( timeshift_t *ts, streaming_message_t *sm )
-{
-  th_pkt_t *pkt = sm->sm_data;
-  tvhtrace("timeshift",
-           "ts %d pkt out - stream %d type %c pts %10"PRId64
-           " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64,
-           ts->id,
-           pkt->pkt_componentindex,
-           pkt_frametype_to_char(pkt->pkt_frametype),
-           ts_rescale(pkt->pkt_pts, 1000000),
-           ts_rescale(pkt->pkt_dts, 1000000),
-           pkt->pkt_duration,
-           pktbuf_len(pkt->pkt_payload), sm->sm_time);
-}
-
 /* **************************************************************************
  * Thread
  * *************************************************************************/
@@ -549,7 +503,7 @@ static void timeshift_trace_pkt
 void *timeshift_reader ( void *p )
 {
   timeshift_t *ts = p;
-  int nfds, end, run = 1, wait = -1, skip_delivered = 0;
+  int nfds, end, run = 1, wait = -1;
   timeshift_file_t *cur_file = NULL, *tmp_file;
   int cur_speed = 100, keyframe_mode = 0;
   int64_t mono_now, mono_play_time = 0, mono_last_status = 0;
@@ -560,7 +514,6 @@ void *timeshift_reader ( void *p )
   streaming_skip_t *skip = NULL;
   tvhpoll_t *pd;
   tvhpoll_event_t ev = { 0 };
-  th_pkt_t *pkt;
 
   pd = tvhpoll_create(1);
   ev.fd     = ts->rd_pipe.rd;
@@ -628,13 +581,12 @@ void *timeshift_reader ( void *p )
                        ts->id);
                 timeshift_writer_flush(ts);
                 ts->dobuf = 1;
-                skip_delivered = 1;
                 tmp_file = timeshift_filemgr_newest(ts);
                 if (tmp_file != NULL) {
                   i64 = tmp_file->last;
                   tmp_file->refcount--;
                 } else {
-                  i64 = ts->last_time;
+                  i64 = ts->buf_time;
                 }
                 cur_file = timeshift_filemgr_get(ts, i64);
                 if (cur_file != NULL) {
@@ -669,7 +621,6 @@ void *timeshift_reader ( void *p )
               mono_play_time = mono_now;
               tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64, mono_now, pause_time);
             } else if (ts->state == TS_PAUSE) {
-              skip_delivered = 1;
               pause_time = last_time;
             }
             tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d", ts->id, speed);
@@ -722,9 +673,8 @@ void *timeshift_reader ( void *p )
                   cur_file->roff = cur_file->size;
                   last_time      = cur_file->last;
                 } else {
-                  last_time      = ts->last_time;
+                  last_time      = ts->buf_time;
                 }
-                skip_delivered = 0;
               }
 
               /* May have failed */
@@ -735,7 +685,7 @@ void *timeshift_reader ( void *p )
 
                 /* Live (stage2) */
                 if (ts->state == TS_LIVE) {
-                  if (skip_time >= ts->last_time - TIMESHIFT_PLAY_BUF) {
+                  if (skip_time >= ts->buf_time - TIMESHIFT_PLAY_BUF) {
                     tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id);
                     skip = NULL;
                   } else {
@@ -865,17 +815,9 @@ void *timeshift_reader ( void *p )
                 ((cur_speed > 0) && (sm->sm_time <= deliver))))) {
 
       last_time = sm->sm_time;
-      if (sm->sm_type == SMT_PACKET) {
-        pkt = sm->sm_data;
-        if (skip_delivered && pkt->pkt_delivered) {
-          streaming_msg_free(sm);
-          goto skip_pkt;
-        }
-        if (tvhtrace_enabled())
-          timeshift_trace_pkt(ts, sm);
-      }
+      if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
+        timeshift_trace_pkt(ts, sm);
       streaming_target_deliver2(ts->output, sm);
-skip_pkt:
       sm        = NULL;
       wait      = 0;
 
@@ -915,9 +857,6 @@ skip_pkt:
         if (_timeshift_flush_to_live(ts, &cur_file, &wait) == -1)
           break;
 
-        /* Flush write / backlog queues */
-        _timeshift_write_queues(ts);
-        
         ts->state = TS_LIVE;
 
         /* Close file (if open) */
index 7959657647f1f38f3853a78213e7e5fbb8d34480..ac9dc298a7d748a12a78625b5937b2b7df7800a2 100644 (file)
@@ -248,18 +248,6 @@ static void _handle_sstart ( timeshift_t *ts, timeshift_file_t *tsf, streaming_m
     }
 }
 
-static void _copy_last_sstart ( timeshift_t *ts, timeshift_file_t *tsf )
-{
-  streaming_message_t *sm;
-  streaming_start_t *ss = ts->smt_start;
-
-  if (ss) {
-    atomic_add(&ss->ss_refcount, 1);
-    sm = streaming_msg_create_data(SMT_START, ss);
-    _handle_sstart(ts, tsf, sm);
-  }
-}
-
 /* **************************************************************************
  * Thread
  * *************************************************************************/
@@ -277,8 +265,6 @@ static inline ssize_t _process_msg0
   } else if (sm->sm_type == SMT_SIGNAL_STATUS)
     err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data);
   else if (sm->sm_type == SMT_PACKET) {
-    if (TAILQ_EMPTY(&tsf->sstart))
-      _copy_last_sstart(ts, tsf);
     err = timeshift_write_packet(tsf, sm->sm_time, sm->sm_data);
     if (err > 0) {
       th_pkt_t *pkt = sm->sm_data;
@@ -293,8 +279,6 @@ static inline ssize_t _process_msg0
       }
     }
   } else if (sm->sm_type == SMT_MPEGTS) {
-    if (TAILQ_EMPTY(&tsf->sstart))
-      _copy_last_sstart(ts, tsf);
     err = timeshift_write_mpegts(tsf, sm->sm_time, sm->sm_data);
   }
   else
@@ -323,11 +307,11 @@ static void _process_msg
     /* Terminate */
     case SMT_EXIT:
       if (run) *run = 0;
-      break;
+      goto live;
     case SMT_STOP:
       if (sm->sm_code != SM_CODE_SOURCE_RECONFIGURED && run)
         *run = 0;
-      break;
+      goto live;
 
     /* Timeshifting */
     case SMT_SKIP:
@@ -341,7 +325,7 @@ static void _process_msg
     case SMT_SERVICE_STATUS:
     case SMT_TIMESHIFT_STATUS:
     case SMT_DESCRAMBLE_INFO:
-      break;
+      goto live;
 
     /* Store */
     case SMT_SIGNAL_STATUS:
@@ -349,21 +333,31 @@ static void _process_msg
     case SMT_MPEGTS:
     case SMT_PACKET:
       pthread_mutex_lock(&ts->state_mutex);
-      if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) {
-        if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
-          timeshift_filemgr_close(tsf);
-          tsf->bad = 1;
-          ts->full = 1; ///< Stop any more writing
+      ts->buf_time = sm->sm_time;
+      if (ts->state == TS_LIVE)
+        streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
+      if (ts->dobuf) {
+        if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) {
+          if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
+            timeshift_filemgr_close(tsf);
+            tsf->bad = 1;
+            ts->full = 1; ///< Stop any more writing
+          }
+          tsf->refcount--;
         }
-        tsf->refcount--;
       }
       pthread_mutex_unlock(&ts->state_mutex);
       break;
   }
 
   /* Next */
-  if (sm)
-    streaming_msg_free(sm);
+  streaming_msg_free(sm);
+
+live:
+  pthread_mutex_lock(&ts->state_mutex);
+  if (ts->state == TS_LIVE)
+    streaming_target_deliver2(ts->output, sm);
+  pthread_mutex_unlock(&ts->state_mutex);
 }
 
 void *timeshift_writer ( void *aux )