]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
profile: add profile work sharing
authorJaroslav Kysela <perex@perex.cz>
Fri, 24 Oct 2014 14:01:33 +0000 (16:01 +0200)
committerJaroslav Kysela <perex@perex.cz>
Sat, 25 Oct 2014 19:38:20 +0000 (21:38 +0200)
src/epggrab/module/eit.c
src/htsp_server.c
src/profile.c
src/profile.h

index 12bcf5af288a51fea865162259767544473160ae..d21f5b27a05bcd84f1cf0dacdb667707fb552164 100644 (file)
@@ -615,8 +615,10 @@ _eit_callback
 
   /* Get service */
   svc = mpegts_mux_find_service(mm, sid);
-  if (!svc)
+  if (!svc) {
+    tvhtrace("eit", "sid %i not found", sid);
     goto done;
+  }
 
   if (map->om_first) {
     map->om_tune_count++;
index 4716edd7d50093c8c1ba2be6670f6bbe1caf7504..9f14b0ba5843bdad16eb8fcc170c6927703c374e 100644 (file)
@@ -1704,7 +1704,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
   uint32_t chid, sid, weight, req90khz, timeshiftPeriod = 0;
   const char *str, *profile_id;
   channel_t *ch;
-  int pflags = 0;
   htsp_subscription_t *hs;
   profile_t *pro;
 
@@ -1725,8 +1724,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
 
   weight = htsmsg_get_u32_or_default(in, "weight", 150);
   req90khz = htsmsg_get_u32_or_default(in, "90khz", 0);
-  if (htsmsg_get_u32_or_default(in, "normts", 0))
-    pflags |= PRCH_FLAG_TSFIX;
 
   profile_id = htsmsg_get_str(in, "profile");
 
@@ -1762,7 +1759,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
 
   pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp");
   profile_chain_init(&hs->hs_prch, pro, ch);
-  if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, pflags)) {
+  if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, 0)) {
     tvhlog(LOG_ERR, "htsp", "unable to create profile chain '%s'", pro->pro_name);
     free(hs);
     return htsp_error("Stream setup error");
@@ -1779,7 +1776,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
   htsmsg_t *rep = htsmsg_create_map();
   if(req90khz)
     htsmsg_add_u32(rep, "90khz", 1);
-  if(hs->hs_prch.prch_tsfix)
+  if(hs->hs_prch.prch_sharer->prsh_tsfix)
     htsmsg_add_u32(rep, "normts", 1);
 
 #if ENABLE_TIMESHIFT
index 28cc59420712f1383f472e3aeaf538e5da6712a6..594783301b23244e4bab6e8368202609d161fa4e 100644 (file)
@@ -35,6 +35,7 @@
 profile_builders_queue profile_builders;
 
 struct profile_entry_queue profiles;
+static LIST_HEAD(,profile_chain) profile_chains;
 
 static profile_t *profile_default;
 
@@ -434,6 +435,218 @@ profile_get_htsp_list(htsmsg_t *array, htsmsg_t *filter)
   }
 }
 
+/*
+ *
+ */
+static void
+profile_deliver(profile_chain_t *prch, streaming_message_t *sm)
+{
+  if (prch->prch_start_pending) {
+    profile_sharer_t *prsh = prch->prch_sharer;
+    streaming_message_t *sm2;
+    if (!prsh->prsh_start_msg) {
+      streaming_msg_free(sm);
+      return;
+    }
+    sm2 = streaming_msg_create_data(SMT_START,
+                                   streaming_start_copy(prsh->prsh_start_msg));
+    streaming_target_deliver(prch->prch_post_share, sm2);
+    prch->prch_start_pending = 0;
+  }
+  if (sm)
+    streaming_target_deliver(prch->prch_post_share, sm);
+}
+
+/*
+ *
+ */
+static void
+profile_input(void *opaque, streaming_message_t *sm)
+{
+  profile_chain_t *prch = opaque, *prch2;
+  profile_sharer_t *prsh = prch->prch_sharer;
+
+  if (sm->sm_type == SMT_START)
+    prch->prch_stop = 0;
+
+  if (prch == prsh->prsh_master) {
+    if (sm->sm_type == SMT_STOP) {
+      prch->prch_stop = 1;
+      /* elect new master */
+      prsh->prsh_master = NULL;
+      LIST_FOREACH(prch2, &prsh->prsh_chains, prch_sharer_link)
+        if (!prch2->prch_stop) {
+          prsh->prsh_master = prch2;
+          break;
+        }
+      if (prsh->prsh_master)
+        goto direct;
+    }
+    streaming_target_deliver(prch->prch_share, sm);
+    return;
+  }
+
+  if (sm->sm_type == SMT_STOP) {
+    prch->prch_stop = 1;
+  } else if (sm->sm_type == SMT_START) {
+    prch->prch_stop = 0;
+    prch->prch_start_pending = 1;
+    streaming_msg_free(sm);
+    sm = NULL;
+  } else if (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS) {
+    streaming_msg_free(sm);
+    return;
+  }
+
+direct:
+  profile_deliver(prch, sm);
+}
+
+/*
+ *
+ */
+static void
+profile_sharer_deliver(profile_chain_t *prch, streaming_message_t *sm)
+{
+  if (sm->sm_type == SMT_PACKET) {
+    if (!prch->prch_ts_delta)
+      goto deliver;
+    th_pkt_t *pkt = sm->sm_data;
+    if (prch->prch_ts_delta == PTS_UNSET) {
+      prch->prch_ts_delta = MAX(0, pkt->pkt_dts - 10000);
+      printf("ts delta: %li\n", (long)prch->prch_ts_delta);
+    }
+    /*
+     * time correction here
+     */
+    if (pkt->pkt_pts >= prch->prch_ts_delta &&
+        pkt->pkt_dts >= prch->prch_ts_delta) {
+      th_pkt_t *n = pkt_copy_shallow(pkt);
+      pkt_ref_dec(pkt);
+      n->pkt_pts -= prch->prch_ts_delta;
+      n->pkt_dts -= prch->prch_ts_delta;
+      sm->sm_data = n;
+    } else {
+      streaming_msg_free(sm);
+      return;
+    }
+  }
+deliver:
+  profile_deliver(prch, sm);
+}
+
+/*
+ *
+ */
+static void
+profile_sharer_input(void *opaque, streaming_message_t *sm)
+{
+  profile_sharer_t *prsh = opaque;
+  profile_chain_t *prch, *next, *run = NULL;
+
+  if (sm->sm_type == SMT_STOP) {
+    if (prsh->prsh_start_msg)
+      streaming_start_unref(prsh->prsh_start_msg);
+    prsh->prsh_start_msg = NULL;
+  }
+  for (prch = LIST_FIRST(&prsh->prsh_chains); prch; prch = next) {
+    next = LIST_NEXT(prch, prch_sharer_link);
+    if (prch == prsh->prsh_master) {
+      if (sm->sm_type == SMT_START) {
+        if (prsh->prsh_start_msg)
+          streaming_start_unref(prsh->prsh_start_msg);
+        prsh->prsh_start_msg = streaming_start_copy(sm->sm_data);
+      }
+      if (run)
+        profile_sharer_deliver(run, streaming_msg_clone(sm));
+      run = prch;
+      continue;
+    }
+    if (sm->sm_type != SMT_PACKET && sm->sm_type != SMT_MPEGTS)
+      continue;
+    if (prch->prch_stop)
+      continue;
+    if (run)
+      profile_sharer_deliver(run, streaming_msg_clone(sm));
+    run = prch;
+  }
+
+  if (run)
+    profile_sharer_deliver(run, sm);
+  else
+    streaming_msg_free(sm);
+}
+
+/*
+ *
+ */
+static profile_sharer_t *
+profile_sharer_find(profile_chain_t *prch)
+{
+  profile_sharer_t *prsh = NULL;
+  profile_chain_t *prch2;
+
+  LIST_FOREACH(prch2, &profile_chains, prch_link) {
+    if (prch2->prch_id != prch->prch_id)
+      continue;
+    if (prch2 == prch)
+      continue;
+    if (prch2->prch_can_share && prch2->prch_can_share(prch2, prch)) {
+      prsh = prch2->prch_sharer;
+      break;
+    }
+  }
+  if (!prsh) {
+    prsh = calloc(1, sizeof(*prsh));
+    streaming_target_init(&prsh->prsh_input, profile_sharer_input, prsh, 0);
+    LIST_INIT(&prsh->prsh_chains);
+  }
+  return prsh;
+}
+
+/*
+ *
+ */
+static int
+profile_sharer_create(profile_sharer_t *prsh,
+                      profile_chain_t *prch,
+                      streaming_target_t *dst)
+{
+  prch->prch_post_share = dst;
+  prch->prch_ts_delta = LIST_EMPTY(&prsh->prsh_chains) ? 0 : PTS_UNSET;
+  LIST_INSERT_HEAD(&prsh->prsh_chains, prch, prch_sharer_link);
+  prch->prch_sharer = prsh;
+  if (!prsh->prsh_master)
+    prsh->prsh_master = prch;
+  return 0;
+}
+
+/*
+ *
+ */
+static void
+profile_sharer_destroy(profile_chain_t *prch)
+{
+  profile_sharer_t *prsh = prch->prch_sharer;
+
+  if (prsh == NULL)
+    return;
+  LIST_REMOVE(prch, prch_sharer_link);
+  prch->prch_sharer = NULL;
+  prch->prch_post_share = NULL;
+  if (LIST_EMPTY(&prsh->prsh_chains)) {
+    if (prsh->prsh_tsfix)
+      tsfix_destroy(prsh->prsh_tsfix);
+#if ENABLE_LIBAV
+    if (prsh->prsh_transcoder)
+      transcoder_destroy(prsh->prsh_transcoder);
+#endif
+    if (prsh->prsh_start_msg)
+      streaming_start_unref(prsh->prsh_start_msg);
+    free(prsh);
+  }
+}
+
 /*
  *
  */
@@ -446,6 +659,9 @@ profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id)
   prch->prch_pro = pro;
   prch->prch_id  = id;
   streaming_queue_init(&prch->prch_sq, 0, 0);
+  LIST_INSERT_HEAD(&profile_chains, prch, prch_link);
+  prch->prch_linked = 1;
+  prch->prch_stop = 1;
 }
 
 /*
@@ -499,24 +715,39 @@ profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize)
 void
 profile_chain_close(profile_chain_t *prch)
 {
+  profile_sharer_destroy(prch);
+
 #if ENABLE_TIMESHIFT
-  if (prch->prch_timeshift)
+  if (prch->prch_timeshift) {
     timeshift_destroy(prch->prch_timeshift);
+    prch->prch_timeshift = NULL;
+  }
 #endif
-  if (prch->prch_tsfix)
-    tsfix_destroy(prch->prch_tsfix);
-  if (prch->prch_gh)
+  if (prch->prch_gh) {
     globalheaders_destroy(prch->prch_gh);
-#if ENABLE_LIBAV
-  if (prch->prch_transcoder)
-    transcoder_destroy(prch->prch_transcoder);
-#endif
-  if (prch->prch_muxer)
+    prch->prch_gh = NULL;
+  }
+  if (prch->prch_tsfix) {
+    globalheaders_destroy(prch->prch_tsfix);
+    prch->prch_tsfix = NULL;
+  }
+  if (prch->prch_muxer) {
     muxer_destroy(prch->prch_muxer);
-  streaming_queue_deinit(&prch->prch_sq);
+    prch->prch_muxer = NULL;
+  }
+
   prch->prch_st = NULL;
-  if (prch->prch_pro)
+
+  if (prch->prch_linked) {
+    streaming_queue_deinit(&prch->prch_sq);
+    LIST_REMOVE(prch, prch_link);
+    prch->prch_linked = 0;
+  }
+
+  if (prch->prch_pro) {
     profile_release(prch->prch_pro);
+    prch->prch_pro = NULL;
+  }
 }
 
 /*
@@ -538,19 +769,31 @@ profile_htsp_work(profile_chain_t *prch,
                   streaming_target_t *dst,
                   uint32_t timeshift_period, int flags)
 {
+  profile_sharer_t *prsh;
+
+  prsh = profile_sharer_find(prch);
+  if (!prsh)
+    goto fail;
+
 #if ENABLE_TIMESHIFT
-  if (timeshift_period > 0) {
+  if (timeshift_period > 0)
     dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
-    flags |= PRCH_FLAG_TSFIX;
-  }
 #endif
 
-  if (flags & PRCH_FLAG_TSFIX)
-    dst = prch->prch_tsfix = tsfix_create(dst);
+  if (profile_sharer_create(prsh, prch, dst))
+    goto fail;
 
-  prch->prch_st = dst;
+  if (!prsh->prsh_tsfix)
+    prsh->prsh_tsfix = tsfix_create(&prsh->prsh_input);
 
+  prch->prch_share = prsh->prsh_tsfix;
+  streaming_target_init(&prch->prch_input, profile_input, prch, 0);
+  prch->prch_st = &prch->prch_input;
   return 0;
+
+fail:
+  profile_chain_close(prch);
+  return -1;
 }
 
 static muxer_container_type_t
@@ -618,7 +861,10 @@ profile_mpegts_pass_open(profile_chain_t *prch,
   c.m_rewrite_pmt = pro->pro_rewrite_pmt;
 
   prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
-  streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
+
+  prch->prch_sq.sq_st.st_reject_filter = SMT_PACKET;
+  prch->prch_sq.sq_maxsize = qsize;
+
   prch->prch_muxer = muxer_create(&c);
   prch->prch_st    = &prch->prch_sq.sq_st;
   return 0;
@@ -669,6 +915,7 @@ profile_matroska_open(profile_chain_t *prch,
                       muxer_config_t *m_cfg, int flags, size_t qsize)
 {
   profile_matroska_t *pro = (profile_matroska_t *)prch->prch_pro;
+  streaming_target_t *dst;
   muxer_config_t c;
 
   if (m_cfg)
@@ -680,11 +927,12 @@ profile_matroska_open(profile_chain_t *prch,
   if (pro->pro_webm)
     c.m_type = MC_WEBM;
 
-  streaming_queue_init(&prch->prch_sq, 0, qsize);
-  prch->prch_gh    = globalheaders_create(&prch->prch_sq.sq_st);
-  prch->prch_tsfix = tsfix_create(prch->prch_gh);
+  prch->prch_sq.sq_maxsize = qsize;
+
+  dst = prch->prch_gh    = globalheaders_create(&prch->prch_sq.sq_st);
+  dst = prch->prch_tsfix = tsfix_create(dst);
+  prch->prch_st    = dst;
   prch->prch_muxer = muxer_create(&c);
-  prch->prch_st    = prch->prch_tsfix;
   return 0;
 }
 
@@ -934,38 +1182,92 @@ const idclass_t profile_transcode_class =
   }
 };
 
+static int
+profile_transcode_resolution(profile_transcode_t *pro)
+{
+  return pro->pro_resolution >= 240 ? pro->pro_resolution : 240;
+}
+
+static int
+profile_transcode_bandwidth(profile_transcode_t *pro)
+{
+  return pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
+}
+
+static int
+profile_transcode_can_share(profile_chain_t *prch,
+                            profile_chain_t *joiner)
+{
+  profile_transcode_t *pro1 = (profile_transcode_t *)prch->prch_pro;
+  profile_transcode_t *pro2 = (profile_transcode_t *)joiner->prch_pro;
+  if (pro1 == pro2)
+    return 1;
+  if (!idnode_is_instance(&pro2->pro_id, &profile_transcode_class))
+    return 0;
+  /*
+   * Do full params check here, note that profiles might differ
+   * only in the muxer setup.
+   */
+  if (strcmp(pro1->pro_vcodec ?: "", pro2->pro_vcodec ?: ""))
+    return 0;
+  if (strcmp(pro1->pro_acodec ?: "", pro2->pro_acodec ?: ""))
+    return 0;
+  if (strcmp(pro1->pro_scodec ?: "", pro2->pro_scodec ?: ""))
+    return 0;
+  if (profile_transcode_resolution(pro1) != profile_transcode_resolution(pro2))
+    return 0;
+  if (profile_transcode_bandwidth(pro1) != profile_transcode_bandwidth(pro2))
+    return 0;
+  if (strcmp(pro1->pro_language ?: "", pro2->pro_language ?: ""))
+    return 0;
+  return 1;
+}
+
 static int
 profile_transcode_work(profile_chain_t *prch,
                        streaming_target_t *dst,
                        uint32_t timeshift_period, int flags)
 {
+  profile_sharer_t *prsh;
   profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
   transcoder_props_t props;
 
+  prsh = profile_sharer_find(prch);
+  if (!prsh)
+    goto fail;
+
+  prch->prch_can_share = profile_transcode_can_share;
+
   memset(&props, 0, sizeof(props));
   strncpy(props.tp_vcodec, pro->pro_vcodec ?: "", sizeof(props.tp_vcodec)-1);
   strncpy(props.tp_acodec, pro->pro_acodec ?: "", sizeof(props.tp_acodec)-1);
   strncpy(props.tp_scodec, pro->pro_scodec ?: "", sizeof(props.tp_scodec)-1);
-  props.tp_resolution = pro->pro_resolution >= 240 ? pro->pro_resolution : 240;
+  props.tp_resolution = profile_transcode_resolution(pro);
   props.tp_channels   = pro->pro_channels;
-  props.tp_bandwidth  = pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
+  props.tp_bandwidth  = profile_transcode_bandwidth(pro);
   strncpy(props.tp_language, pro->pro_language ?: "", 3);
 
 #if ENABLE_TIMESHIFT
   if (timeshift_period > 0)
     dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
 #endif
-  dst = prch->prch_transcoder = transcoder_create(dst);
-  if (!dst) {
-    profile_chain_close(prch);
-    return -1;
+  if (profile_sharer_create(prsh, prch, dst))
+    goto fail;
+  if (!prsh->prsh_transcoder) {
+    assert(!prsh->prsh_tsfix);
+    dst = prsh->prsh_transcoder = transcoder_create(&prsh->prsh_input);
+    if (!dst)
+      goto fail;
+    transcoder_set_properties(dst, &props);
+    prsh->prsh_tsfix = tsfix_create(dst);
   }
-  transcoder_set_properties(dst, &props);
-  prch->prch_tsfix = tsfix_create(dst);
-
-  prch->prch_st = prch->prch_tsfix;
-
+  prch->prch_share = prsh->prsh_tsfix;
+  streaming_target_init(&prch->prch_input, profile_input, prch, 0);
+  prch->prch_st = &prch->prch_input;
   return 0;
+fail:
+  profile_chain_close(prch);
+  return -1;
 }
 
 static int
@@ -1001,13 +1303,15 @@ profile_transcode_open(profile_chain_t *prch,
       c.m_type = MC_MATROSKA;
   }
 
-  streaming_queue_init(&prch->prch_sq, 0, qsize);
+  prch->prch_sq.sq_maxsize = qsize;
+
   prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
 
-  r = profile_transcode_work(prch, prch->prch_gh, 0,
-                             PRCH_FLAG_SKIPZEROING | PRCH_FLAG_TSFIX);
-  if (r)
+  r = profile_transcode_work(prch, prch->prch_gh, 0, 0);
+  if (r) {
+    profile_chain_close(prch);
     return r;
+  }
 
   prch->prch_muxer = muxer_create(&c);
   return 0;
@@ -1055,6 +1359,7 @@ profile_init(void)
 
   LIST_INIT(&profile_builders);
   TAILQ_INIT(&profiles);
+  LIST_INIT(&profile_chains);
 
   profile_register(&profile_mpegts_pass_class, profile_mpegts_pass_builder);
   profile_register(&profile_matroska_class, profile_matroska_builder);
index 7be38cfd5817df67aa7afa695afaed347dc30986..e18701ddc4b6daa4c8836a5c65c7e25623eba850 100644 (file)
@@ -26,6 +26,7 @@
 struct profile;
 struct muxer;
 struct streaming_target;
+struct streaming_start;
 
 extern const idclass_t profile_class;
 extern const idclass_t profile_mpegts_pass_class;
@@ -47,24 +48,46 @@ typedef LIST_HEAD(, profile_build) profile_builders_queue;
 
 extern profile_builders_queue profile_builders;
 
-#define PRCH_FLAG_SKIPZEROING (1<<0)
-#define PRCH_FLAG_TSFIX       (1<<1)
-
-typedef struct profile_chain {
-  struct profile          *prch_pro;
-  void                    *prch_id;
-  int                      prch_flags;
-  struct streaming_queue   prch_sq;
-  struct streaming_target *prch_st;
-  struct muxer            *prch_muxer;
-  struct streaming_target *prch_gh;
-  struct streaming_target *prch_tsfix;
+typedef struct profile_sharer {
+  streaming_target_t        prsh_input;
+  LIST_HEAD(,profile_chain) prsh_chains;
+  struct profile_chain     *prsh_master;
+  struct streaming_start   *prsh_start_msg;
+  struct streaming_target  *prsh_tsfix;
 #if ENABLE_LIBAV
-  struct streaming_target *prch_transcoder;
+  struct streaming_target  *prsh_transcoder;
 #endif
+} profile_sharer_t;
+
+typedef struct profile_chain {
+  LIST_ENTRY(profile_chain) prch_link;
+  int                       prch_linked;
+
+  struct profile_sharer    *prch_sharer;
+  LIST_ENTRY(profile_chain) prch_sharer_link;
+
+  struct profile           *prch_pro;
+  void                     *prch_id;
+
+  int64_t                   prch_ts_delta;
+
+  int                       prch_flags;
+  int                       prch_stop;
+  int                       prch_start_pending;
+  struct streaming_queue    prch_sq;
+  struct streaming_target  *prch_post_share;
+  struct streaming_target  *prch_st;
+  struct muxer             *prch_muxer;
+  struct streaming_target  *prch_gh;
+  struct streaming_target  *prch_tsfix;
 #if ENABLE_TIMESHIFT
-  struct streaming_target *prch_timeshift;
+  struct streaming_target  *prch_timeshift;
 #endif
+  struct streaming_target   prch_input;
+  struct streaming_target  *prch_share;
+
+  int (*prch_can_share)(struct profile_chain *prch,
+                        struct profile_chain *joiner);
 } profile_chain_t;
 
 typedef struct profile {