]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
DVR: rewrite DVR thread to handle better EPG running flag
authorJaroslav Kysela <perex@perex.cz>
Sat, 31 Oct 2015 19:28:12 +0000 (20:28 +0100)
committerJaroslav Kysela <perex@perex.cz>
Sat, 31 Oct 2015 19:28:12 +0000 (20:28 +0100)
src/dvr/dvr_rec.c

index 2216b0c06df8576ea3d125f457367fdb38147da0..4ecd417fef0661f84bb5334bbdb9d423d4afb22c 100644 (file)
@@ -1075,6 +1075,106 @@ dvr_streaming_restart(dvr_entry_t *de, int *run)
   }
 }
 
+/**
+ *
+ */
+static int
+dvr_thread_pkt_stats(dvr_entry_t *de, th_pkt_t *pkt, int payload)
+{
+  th_subscription_t *ts;
+  int ret = 0;
+
+  if ((ts = de->de_s) != NULL) {
+    if (pkt->pkt_err) {
+      de->de_data_errors += pkt->pkt_err;
+      ret = 1;
+    }
+    if (payload && pkt->pkt_payload)
+      subscription_add_bytes_out(ts, pktbuf_len(pkt->pkt_payload));
+  }
+  return ret;
+}
+
+/**
+ *
+ */
+static int
+dvr_thread_mpegts_stats(dvr_entry_t *de, void *sm_data)
+{
+  th_subscription_t *ts;
+  pktbuf_t *pb = sm_data;
+  int ret;
+
+  if (pb == NULL)
+    return 0;
+  if ((ts = de->de_s) != NULL) {
+    if (pb->pb_err) {
+      de->de_data_errors += pb->pb_err;
+      ret = 1;
+    }
+    subscription_add_bytes_out(ts, pktbuf_len(pb));
+  }
+  return ret;
+}
+
+/**
+ *
+ */
+static int
+dvr_thread_rec_start(dvr_entry_t *de, int started,
+                     streaming_start_t *ss, int *run,
+                     int64_t *dts_offset,
+                     int epg_running, const char *postproc)
+{
+  profile_chain_t *prch = de->de_chain;
+  int ret = 0;
+
+  if (started &&
+      muxer_reconfigure(prch->prch_muxer, ss) < 0) {
+    tvhlog(LOG_WARNING,
+           "dvr", "Unable to reconfigure \"%s\"",
+           dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));
+
+    // Try to restart the recording if the muxer doesn't
+    // support reconfiguration of the streams.
+    dvr_thread_epilog(de, postproc);
+    started = 0;
+    *dts_offset = PTS_UNSET;
+    if (epg_running) {
+      if (!dvr_thread_global_lock(de, run))
+        return 0;
+      if (de->de_config->dvr_clone)
+        de = dvr_entry_clone(de);
+      dvr_thread_global_unlock(de);
+    }
+  }
+
+  if (!started) {
+    if (!dvr_thread_global_lock(de, run))
+      return 0;
+    dvr_rec_set_state(de, DVR_RS_WAIT_PROGRAM_START, 0);
+    if(dvr_rec_start(de, ss) == 0)
+      ret = 1;
+    else
+      dvr_stop_recording(de, SM_CODE_INVALID_TARGET, 1, 0);
+    dvr_thread_global_unlock(de);
+  }
+  return ret;
+}
+
+/**
+ *
+ */
+static void
+dvr_thread_backlog_free(struct streaming_message_queue *backlog)
+{
+  streaming_message_t *sm;
+  while ((sm = TAILQ_FIRST(backlog)) != NULL) {
+    TAILQ_REMOVE(backlog, sm, sm_link);
+    streaming_msg_free(sm);
+  }
+}
+
 /**
  *
  */
@@ -1084,12 +1184,14 @@ dvr_thread(void *aux)
   dvr_entry_t *de = aux;
   profile_chain_t *prch = de->de_chain;
   streaming_queue_t *sq = &prch->prch_sq;
-  streaming_message_t *sm;
-  th_subscription_t *ts;
-  th_pkt_t *pkt;
-  int run = 1, started = 0, comm_skip, epg_running, rs;
+  struct streaming_message_queue backlog;
+  streaming_message_t *sm, *sm2;
+  th_pkt_t *pkt, *pkt2, *pkt3;
+  streaming_start_t *ss = NULL;
+  int run = 1, started = 0, muxing = 0, comm_skip, epg_running = 0, rs;
   int commercial = COMMERCIAL_UNKNOWN;
-  int64_t packets = 0;
+  int64_t packets = 0, dts_offset = PTS_UNSET;
+  time_t start_time = 0;
   char *postproc;
 
   if (!dvr_thread_global_lock(de, &run))
@@ -1098,39 +1200,35 @@ dvr_thread(void *aux)
   postproc  = de->de_config->dvr_postproc ? strdup(de->de_config->dvr_postproc) : NULL;
   dvr_thread_global_unlock(de);
 
-  pthread_mutex_lock(&sq->sq_mutex);
+  TAILQ_INIT(&backlog);
 
+  pthread_mutex_lock(&sq->sq_mutex);
   while(run) {
     sm = TAILQ_FIRST(&sq->sq_queue);
     if(sm == NULL) {
       pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
       continue;
     }
+    streaming_queue_remove(sq, sm);
 
-    if ((ts = de->de_s) != NULL && started) {
-      pktbuf_t *pb = NULL;
-      if (sm->sm_type == SMT_PACKET) {
-        pb = ((th_pkt_t*)sm->sm_data)->pkt_payload;
-        if (((th_pkt_t*)sm->sm_data)->pkt_err) {
-          de->de_data_errors += ((th_pkt_t*)sm->sm_data)->pkt_err;
-          dvr_notify(de);
-        }
-      }
-      else if (sm->sm_type == SMT_MPEGTS) {
-        pb = sm->sm_data;
-        if (pb->pb_err) {
-          de->de_data_errors += pb->pb_err;
-          dvr_notify(de);
+    if (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS) {
+      if (de->de_running_start > de->de_running_stop) {
+        epg_running = 1;
+      } else if (de->de_running_start == 0 && de->de_running_stop == 0) {
+        if (start_time + 2 >= dispatch_clock) {
+          epg_running = 0;
+          TAILQ_INSERT_TAIL(&backlog, sm, sm_link);
+          continue;
+        } else {
+          if (TAILQ_FIRST(&backlog))
+            dvr_thread_backlog_free(&backlog);
+          epg_running = 1;
         }
+      } else {
+        epg_running = 0;
       }
-      if (pb)
-        subscription_add_bytes_out(ts, pktbuf_len(pb));
     }
 
-    streaming_queue_remove(sq, sm);
-
-    epg_running = de->de_running_start > de->de_running_stop ||
-                  (de->de_running_start == 0 && de->de_running_stop == 0);
     pthread_mutex_unlock(&sq->sq_mutex);
 
     switch(sm->sm_type) {
@@ -1161,67 +1259,97 @@ dvr_thread(void *aux)
 
       commercial = pkt->pkt_commercial;
 
-      if (started) {
-       muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
-       sm->sm_data = NULL;
-       dvr_notify(de);
-       packets++;
+      if (!started)
+        break;
+
+      if (muxing == 0 &&
+          !dvr_thread_rec_start(de, started, ss, &run, &dts_offset,
+                                epg_running, postproc))
+        break;
+
+      muxing = 1;
+      while ((sm2 = TAILQ_FIRST(&backlog)) != NULL) {
+        TAILQ_REMOVE(&backlog, sm2, sm_link);
+        if (pkt->pkt_dts != PTS_UNSET) {
+          if (dts_offset == PTS_UNSET) {
+            pkt2 = sm2->sm_data;
+            dts_offset = pkt2->pkt_dts;
+          }
+          pkt3 = (th_pkt_t *)sm2->sm_data;
+          if (dts_offset != PTS_UNSET && pkt->pkt_dts >= dts_offset) {
+            pkt3 = pkt_copy_shallow(pkt3);
+            pkt3->pkt_dts -= dts_offset;
+            if (pkt3->pkt_pts != PTS_UNSET)
+              pkt3->pkt_pts -= dts_offset;
+            dvr_thread_pkt_stats(de, pkt3, 1);
+            muxer_write_pkt(prch->prch_muxer, sm2->sm_type, pkt3);
+          } else {
+            dvr_thread_pkt_stats(de, pkt3, 0);
+          }
+        }
+        streaming_msg_free(sm2);
+      }
+      if (dts_offset == PTS_UNSET && pkt->pkt_dts != PTS_UNSET)
+        dts_offset = pkt->pkt_dts;
+      if (pkt->pkt_dts != PTS_UNSET && dts_offset != PTS_UNSET &&
+          pkt->pkt_dts >= dts_offset) {
+        pkt3 = pkt_copy_shallow(pkt);
+        pkt3->pkt_dts -= dts_offset;
+        if (pkt3->pkt_pts != PTS_UNSET)
+          pkt3->pkt_pts -= dts_offset;
+        dvr_thread_pkt_stats(de, pkt3, 1);
+        muxer_write_pkt(prch->prch_muxer, sm->sm_type, pkt3);
+      } else {
+        dvr_thread_pkt_stats(de, pkt, 0);
       }
+      dvr_notify(de);
+      packets++;
       break;
 
     case SMT_MPEGTS:
       dvr_rec_set_state(de, !epg_running ? DVR_RS_EPG_WAIT : DVR_RS_RUNNING, 0);
-      if(started) {
-       if (!epg_running) {
-         if (packets) {
-           dvr_streaming_restart(de, &run);
-           packets = 0;
-           started = 0;
-          }
-          break;
+
+      if (!started)
+        break;
+
+      if (!epg_running) {
+        if (packets) {
+          dvr_streaming_restart(de, &run);
+          packets = 0;
+          started = 0;
         }
-       muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
-       sm->sm_data = NULL;
-       dvr_notify(de);
-       packets++;
+        break;
       }
-      break;
 
-    case SMT_START:
-      packets = 0;
-      if(started &&
-        muxer_reconfigure(prch->prch_muxer, sm->sm_data) < 0) {
-       tvhlog(LOG_WARNING,
-              "dvr", "Unable to reconfigure \"%s\"",
-              dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));
+      if (muxing == 0 &&
+          !dvr_thread_rec_start(de, started, ss, &run, &dts_offset,
+                                epg_running, postproc))
+        break;
 
-       // Try to restart the recording if the muxer doesn't
-       // support reconfiguration of the streams.
-       dvr_thread_epilog(de, postproc);
-       started = 0;
-       if (epg_running) {
-         if (!dvr_thread_global_lock(de, &run))
-           break;
-         if (de->de_config->dvr_clone)
-           de = dvr_entry_clone(de);
-         dvr_thread_global_unlock(de);
-        }
+      muxing = 1;
+      while ((sm2 = TAILQ_FIRST(&backlog)) != NULL) {
+        TAILQ_REMOVE(&backlog, sm2, sm_link);
+        dvr_thread_mpegts_stats(de, sm2->sm_data);
+        muxer_write_pkt(prch->prch_muxer, sm2->sm_type, sm2->sm_data);
+        sm2->sm_data = NULL;
+        streaming_msg_free(sm2);
       }
+      dvr_thread_mpegts_stats(de, sm->sm_data);
+      muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
+      sm->sm_data = NULL;
+      dvr_notify(de);
+      packets++;
+      break;
 
-      if(!started) {
-        if (!dvr_thread_global_lock(de, &run))
-          break;
-        dvr_rec_set_state(de, DVR_RS_WAIT_PROGRAM_START, 0);
-        if(dvr_rec_start(de, sm->sm_data) == 0)
-          started = 1;
-        else
-          dvr_stop_recording(de, SM_CODE_INVALID_TARGET, 1, 0);
-        dvr_thread_global_unlock(de);
-      } 
+    case SMT_START:
+      start_time = dispatch_clock;
+      packets = 0;
+      started = 1;
+      ss = streaming_start_copy((streaming_start_t *)sm->sm_data);
       break;
 
     case SMT_STOP:
-       if(sm->sm_code == SM_CODE_SOURCE_RECONFIGURED) {
+       if (sm->sm_code == SM_CODE_SOURCE_RECONFIGURED) {
         // Subscription is restarting, wait for SMT_START
 
        } else if(sm->sm_code == 0) {
@@ -1232,27 +1360,32 @@ dvr_thread(void *aux)
               "dvr", "Recording completed: \"%s\"",
               dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));
 
-       dvr_thread_epilog(de, postproc);
-       started = 0;
+        goto fin;
 
-      }else if(de->de_last_error != sm->sm_code) {
+      } else if (de->de_last_error != sm->sm_code) {
         // Error during recording
 
-        dvr_rec_set_state(de, DVR_RS_ERROR, sm->sm_code);
-        tvhlog(LOG_ERR,
-               "dvr", "Recording stopped: \"%s\": %s",
-               dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL),
-               streaming_code2txt(sm->sm_code));
+       dvr_rec_set_state(de, DVR_RS_ERROR, sm->sm_code);
+       tvhlog(LOG_ERR,
+               "dvr", "Recording stopped: \"%s\": %s",
+               dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL),
+               streaming_code2txt(sm->sm_code));
 
-        dvr_thread_epilog(de, postproc);
-        started = 0;
+fin:
+        dvr_thread_backlog_free(&backlog);
+       dvr_thread_epilog(de, postproc);
+       start_time = 0;
+       started = 0;
+       muxing = 0;
+       streaming_start_unref(ss);
+       ss = NULL;
       }
       break;
 
     case SMT_SERVICE_STATUS:
-      if(sm->sm_code & TSS_PACKETS) {
+      if (sm->sm_code & TSS_PACKETS) {
        
-      } else if(sm->sm_code & TSS_ERRORS) {
+      } else if (sm->sm_code & TSS_ERRORS) {
 
        int code = SM_CODE_UNDEFINED_ERROR;
 
@@ -1274,7 +1407,7 @@ dvr_thread(void *aux)
 
     case SMT_NOSTART:
 
-      if(de->de_last_error != sm->sm_code) {
+      if (de->de_last_error != sm->sm_code) {
        dvr_rec_set_state(de, DVR_RS_PENDING, sm->sm_code);
 
        tvhlog(LOG_ERR,
@@ -1303,7 +1436,9 @@ dvr_thread(void *aux)
   }
   pthread_mutex_unlock(&sq->sq_mutex);
 
-  if(prch->prch_muxer)
+  dvr_thread_backlog_free(&backlog);
+
+  if (prch->prch_muxer)
     dvr_thread_epilog(de, postproc);
 
   free(postproc);