]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
profile: run transcoder in another thread
authorJaroslav Kysela <perex@perex.cz>
Fri, 16 Jun 2017 13:33:20 +0000 (15:33 +0200)
committerJaroslav Kysela <perex@perex.cz>
Fri, 16 Jun 2017 13:33:20 +0000 (15:33 +0200)
src/profile.c
src/profile.h

index 1c045d7d7237c4230f8252caf1e226f783020327..6bf7e40f2965bebacc661beb0752bf69faa9ca7a 100644 (file)
@@ -645,6 +645,11 @@ profile_input(void *opaque, streaming_message_t *sm)
   profile_chain_t *prch = opaque, *prch2;
   profile_sharer_t *prsh = prch->prch_sharer;
 
+  if (prsh == NULL) {
+    streaming_msg_free(sm);
+    return;
+  }
+
   if (sm->sm_type == SMT_START) {
     if (!prsh->prsh_master)
       prsh->prsh_master = prch;
@@ -700,6 +705,42 @@ static streaming_ops_t profile_input_ops = {
   .st_info = profile_input_info
 };
 
+/*
+ *
+ */
+static void
+profile_input_queue(void *opaque, streaming_message_t *sm)
+{
+  profile_chain_t *prch = opaque;
+  profile_sharer_t *prsh = prch->prch_sharer;
+  profile_sharer_message_t *psm = malloc(sizeof(*psm));
+  psm->psm_prch = prch;
+  psm->psm_sm = sm;
+  pthread_mutex_lock(&prsh->prsh_queue_mutex);
+  if (prsh->prsh_queue_run) {
+    if (TAILQ_FIRST(&prsh->prsh_queue))
+      tvh_cond_signal(&prsh->prsh_queue_cond, 0);
+    TAILQ_INSERT_TAIL(&prsh->prsh_queue, psm, psm_link);
+  } else {
+    streaming_msg_free(sm);
+    free(psm);
+  }
+  pthread_mutex_unlock(&prsh->prsh_queue_mutex);
+}
+
+static htsmsg_t *
+profile_input_queue_info(void *opaque, htsmsg_t *list)
+{
+  htsmsg_add_str(list, NULL, "profile queue input");
+  profile_input_info(opaque, list);
+  return list;
+}
+
+static streaming_ops_t profile_input_queue_ops = {
+  .st_cb   = profile_input_queue,
+  .st_info = profile_input_queue_info
+};
+
 /*
  *
  */
@@ -798,6 +839,7 @@ profile_sharer_find(profile_chain_t *prch)
 {
   profile_sharer_t *prsh = NULL;
   profile_chain_t *prch2;
+  int do_queue = prch->prch_can_share != NULL;
 
   LIST_FOREACH(prch2, &profile_chains, prch_link) {
     if (prch2->prch_id != prch->prch_id)
@@ -811,12 +853,70 @@ profile_sharer_find(profile_chain_t *prch)
   }
   if (!prsh) {
     prsh = calloc(1, sizeof(*prsh));
+    prsh->prsh_do_queue = do_queue;
+    if (do_queue) {
+      pthread_mutex_init(&prsh->prsh_queue_mutex, NULL);
+      tvh_cond_init(&prsh->prsh_queue_cond);
+      TAILQ_INIT(&prsh->prsh_queue);
+    }
     streaming_target_init(&prsh->prsh_input, &profile_sharer_input_ops, prsh, 0);
     LIST_INIT(&prsh->prsh_chains);
   }
   return prsh;
 }
 
+/*
+ *
+ */
+static void *
+profile_sharer_thread(void *aux)
+{
+  profile_sharer_t *prsh = aux;
+  profile_sharer_message_t *psm;
+  int run = 1;
+
+  while (run) {
+    pthread_mutex_lock(&prsh->prsh_queue_mutex);
+    run = prsh->prsh_queue_run;
+    psm = TAILQ_FIRST(&prsh->prsh_queue);
+    if (run && psm == NULL) {
+      tvh_cond_wait(&prsh->prsh_queue_cond, &prsh->prsh_queue_mutex);
+      run = prsh->prsh_queue_run;
+      psm = TAILQ_FIRST(&prsh->prsh_queue);
+    }
+    if (run && psm) {
+      if (psm) {
+        profile_input(psm->psm_prch, psm->psm_sm);
+        TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link);
+        free(psm);
+      }
+    }
+    pthread_mutex_unlock(&prsh->prsh_queue_mutex);
+  }
+  return NULL;
+}
+
+/*
+ *
+ */
+static int
+profile_sharer_postinit(profile_sharer_t *prsh)
+{
+  int r;
+
+  if (!prsh->prsh_do_queue)
+    return 0;
+  if (prsh->prsh_queue_run)
+    return 0;
+  r = tvhthread_create(&prsh->prsh_queue_thread, NULL,
+                       profile_sharer_thread, prsh, "sharer");
+  if (!r)
+    prsh->prsh_queue_run = 1;
+  else
+    tvherror(LS_PROFILE, "unable to create sharer thread");
+  return r;
+}
+
 /*
  *
  */
@@ -841,13 +941,24 @@ static void
 profile_sharer_destroy(profile_chain_t *prch)
 {
   profile_sharer_t *prsh = prch->prch_sharer;
+  profile_sharer_message_t *psm, *psm2;
 
   if (prsh == NULL)
     return;
   LIST_REMOVE(prch, prch_sharer_link);
-  prch->prch_sharer = NULL;
-  prch->prch_post_share = NULL;
   if (LIST_EMPTY(&prsh->prsh_chains)) {
+    if (prsh->prsh_queue_run) {
+      pthread_mutex_lock(&prsh->prsh_queue_mutex);
+      prsh->prsh_queue_run = 0;
+      tvh_cond_signal(&prsh->prsh_queue_cond, 0);
+      pthread_mutex_unlock(&prsh->prsh_queue_mutex);
+      pthread_join(prsh->prsh_queue_thread, NULL);
+      while ((psm = TAILQ_FIRST(&prsh->prsh_queue)) != NULL) {
+        streaming_msg_free(psm->psm_sm);
+        TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link);
+        free(psm);
+      }
+    }
     if (prsh->prsh_tsfix)
       tsfix_destroy(prsh->prsh_tsfix);
 #if ENABLE_LIBAV
@@ -857,6 +968,29 @@ profile_sharer_destroy(profile_chain_t *prch)
     if (prsh->prsh_start_msg)
       streaming_start_unref(prsh->prsh_start_msg);
     free(prsh);
+    prch->prch_sharer = NULL;
+    prch->prch_post_share = NULL;
+  } else {
+    if (prsh->prsh_queue_run) {
+      pthread_mutex_lock(&prsh->prsh_queue_mutex);
+      for (psm = TAILQ_FIRST(&prsh->prsh_queue); psm; psm = psm2) {
+        psm2 = TAILQ_NEXT(psm, psm_link);
+        if (psm->psm_prch != prch) continue;
+        if (psm->psm_sm->sm_type == SMT_PACKET ||
+            psm->psm_sm->sm_type == SMT_MPEGTS)
+          streaming_msg_free(psm->psm_sm);
+        else
+          profile_input(psm->psm_prch, psm->psm_sm);
+        TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link);
+        free(psm);
+      }
+    }
+    prch->prch_sharer = NULL;
+    prch->prch_post_share = NULL;
+    if (prsh->prsh_master == prch)
+      prsh->prsh_master = NULL;
+    if (prsh->prsh_queue_run)
+      pthread_mutex_unlock(&prsh->prsh_queue_mutex);
   }
 }
 
@@ -1064,8 +1198,13 @@ profile_htsp_work(profile_chain_t *prch,
 
   prch->prch_share = prsh->prsh_tsfix;
   prch->prch_flags = SUBSCRIPTION_PACKET;
-  streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
+  streaming_target_init(&prch->prch_input,
+                        prsh->prsh_do_queue ?
+                          &profile_input_queue_ops : &profile_input_ops, prch,
+                        0);
   prch->prch_st = &prch->prch_input;
+  if (profile_sharer_postinit(prsh))
+    goto fail;
   return 0;
 
 fail:
@@ -2178,8 +2317,6 @@ profile_transcode_work(profile_chain_t *prch,
   if (!prsh)
     goto fail;
 
-  prch->prch_can_share = profile_transcode_can_share;
-
   memset(&props, 0, sizeof(props));
   strncpy(props.tp_vcodec, pro->pro_vcodec ?: "", sizeof(props.tp_vcodec)-1);
   strncpy(props.tp_vcodec_preset, pro->pro_vcodec_preset ?: "", sizeof(props.tp_vcodec_preset)-1);
@@ -2216,8 +2353,13 @@ profile_transcode_work(profile_chain_t *prch,
     prsh->prsh_tsfix = tsfix_create(dst);
   }
   prch->prch_share = prsh->prsh_tsfix;
-  streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
+  streaming_target_init(&prch->prch_input,
+                        prsh->prsh_do_queue ?
+                          &profile_input_queue_ops : &profile_input_ops, prch,
+                        0);
   prch->prch_st = &prch->prch_input;
+  if (profile_sharer_postinit(prsh))
+    goto fail;
   return 0;
 fail:
   profile_chain_close(prch);
@@ -2272,6 +2414,8 @@ profile_transcode_open(profile_chain_t *prch,
 {
   int r;
 
+  prch->prch_can_share = profile_transcode_can_share;
+
   prch->prch_flags = SUBSCRIPTION_PACKET;
   prch->prch_sq.sq_maxsize = qsize;
 
index cd518c52e76948f8da470c98a252bdb34b5a6567..a5c1925074be06f3a7f88ee3152e0c53e9247351 100644 (file)
@@ -69,17 +69,6 @@ typedef LIST_HEAD(, profile_build) profile_builders_queue;
 
 extern profile_builders_queue profile_builders;
 
-typedef struct profile_sharer {
-  streaming_target_t        prsh_input;
-  LIST_HEAD(,profile_chain) prsh_chains;
-  struct profile_chain     *prsh_master;
-  struct streaming_start   *prsh_start_msg;
-  struct streaming_target  *prsh_tsfix;
-#if ENABLE_LIBAV
-  struct streaming_target  *prsh_transcoder;
-#endif
-} profile_sharer_t;
-
 typedef struct profile_chain {
   LIST_ENTRY(profile_chain) prch_link;
   int                       prch_linked;
@@ -147,6 +136,29 @@ typedef struct profile {
                   muxer_config_t *m_cfg, int flags, size_t qsize);
 } profile_t;
 
+typedef struct profile_sharer_message {
+  TAILQ_ENTRY(profile_sharer_message) psm_link;
+  profile_chain_t *psm_prch;
+  streaming_message_t *psm_sm;
+} profile_sharer_message_t;
+
+typedef struct profile_sharer {
+  uint32_t                  prsh_do_queue: 1;
+  uint32_t                  prsh_queue_run: 1;
+  pthread_t                 prsh_queue_thread;
+  pthread_mutex_t           prsh_queue_mutex;
+  tvh_cond_t                prsh_queue_cond;
+  TAILQ_HEAD(,profile_sharer_message) prsh_queue;
+  streaming_target_t        prsh_input;
+  LIST_HEAD(,profile_chain) prsh_chains;
+  struct profile_chain     *prsh_master;
+  struct streaming_start   *prsh_start_msg;
+  struct streaming_target  *prsh_tsfix;
+#if ENABLE_LIBAV
+  struct streaming_target  *prsh_transcoder;
+#endif
+} profile_sharer_t;
+
 void profile_register(const idclass_t *clazz, profile_builder_t builder);
 
 profile_t *profile_create