]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
profile/subscription: use profile chain everywhere, add profile refcounting
authorJaroslav Kysela <perex@perex.cz>
Wed, 22 Oct 2014 14:05:18 +0000 (16:05 +0200)
committerJaroslav Kysela <perex@perex.cz>
Wed, 22 Oct 2014 14:17:29 +0000 (16:17 +0200)
src/dvr/dvr_rec.c
src/htsp_server.c
src/input/mpegts/mpegts_mux.c
src/input/mpegts/mpegts_mux_sched.c
src/input/mpegts/mpegts_mux_sched.h
src/profile.c
src/profile.h
src/service_mapper.c
src/subscriptions.c
src/subscriptions.h
src/webui/webui.c

index cde75a6ab838221bd0e29df419bf69146151adc6..acca4a3f2b89786ef86244850b8e566cefcffb1d 100644 (file)
@@ -78,15 +78,15 @@ dvr_rec_subscribe(dvr_entry_t *de)
 
   pro = de->de_config->dvr_profile;
   prch = malloc(sizeof(*prch));
-  if (profile_chain_open(pro, prch, de->de_channel, &de->de_config->dvr_muxcnf, 0, 0)) {
+  profile_chain_init(prch, pro, de->de_channel);
+  if (profile_chain_open(prch, &de->de_config->dvr_muxcnf, 0, 0)) {
     tvherror("dvr", "unable to create new channel streaming chain for '%s'",
              channel_get_name(de->de_channel));
     return;
   }
 
-  de->de_s = subscription_create_from_channel(de->de_channel, pro, weight,
-                                             buf, prch->prch_st,
-                                             prch->prch_flags,
+  de->de_s = subscription_create_from_channel(prch, weight,
+                                             buf, prch->prch_flags,
                                              NULL, NULL, NULL);
   if (de->de_s == NULL) {
     tvherror("dvr", "unable to create new channel subcription for '%s'",
index 013b8765315cbf4515ce191809661363bbc990c3..8b5a12509f9160f96b582dcf512b7c14b2979674 100644 (file)
@@ -1762,7 +1762,8 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
 #endif
 
   pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp");
-  if (!profile_work(pro, &hs->hs_prch, ch, &hs->hs_input, timeshiftPeriod, pflags)) {
+  profile_chain_init(&hs->hs_prch, pro, ch);
+  if (!profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, pflags)) {
     tvhlog(LOG_ERR, "htsp", "unable to create profile chain '%s'", pro->pro_name);
     free(hs);
     return htsp_error("Stream setup error");
@@ -1795,9 +1796,8 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
   LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
 
   tvhdebug("htsp", "%s - subscribe to %s\n", htsp->htsp_logname, ch->ch_name ?: "");
-  hs->hs_s = subscription_create_from_channel(ch, pro, weight,
+  hs->hs_s = subscription_create_from_channel(&hs->hs_prch, weight,
                                              htsp->htsp_logname,
-                                             hs->hs_prch.prch_st,
                                              SUBSCRIPTION_STREAMING,
                                              htsp->htsp_peername,
                                              htsp->htsp_username,
index 2bcefbc1d012a37e11def7fda695f6550bb5ae1e..9fed7f08f19908b1723c4745c3ac4f7ead06012b 100644 (file)
@@ -23,6 +23,7 @@
 #include "subscriptions.h"
 #include "channels.h"
 #include "access.h"
+#include "profile.h"
 #include "dvb_charset.h"
 
 #include <assert.h>
@@ -1090,8 +1091,11 @@ mpegts_mux_subscribe
   ( mpegts_mux_t *mm, const char *name, int weight )
 {
   int err = 0;
+  profile_chain_t prch;
   th_subscription_t *s;
-  s = subscription_create_from_mux(mm, NULL, weight, name, NULL,
+  memset(&prch, 0, sizeof(prch));
+  prch.prch_id = mm;
+  s = subscription_create_from_mux(&prch, weight, name,
                                    SUBSCRIPTION_NONE,
                                    NULL, NULL, NULL, &err);
   return s ? 0 : err;
index 356b48d7040de64b7b8473cc787f790414f7c1d8..4ee2f44fe95e564e0519c39b05ed4f6c4785ea5b 100644 (file)
@@ -22,6 +22,7 @@
 #include "input/mpegts/mpegts_mux_sched.h"
 #include "streaming.h"
 #include "settings.h"
+#include "profile.h"
 
 static void mpegts_mux_sched_timer ( void *p );
 static void mpegts_mux_sched_input ( void *p, streaming_message_t *sm );
@@ -204,10 +205,14 @@ mpegts_mux_sched_timer ( void *p )
   if (!mms->mms_active) {
     assert(mms->mms_sub == NULL);
 
+    if (!mms->mms_prch)
+      mms->mms_prch = calloc(1, sizeof(mms->mms_prch));
+    mms->mms_prch->prch_id = mm;
+    mms->mms_prch->prch_st = &mms->mms_input;
+
     mms->mms_sub
-      = subscription_create_from_mux(mm, NULL, mms->mms_weight,
+      = subscription_create_from_mux(mms->mms_prch, mms->mms_weight,
                                      mms->mms_creator ?: "",
-                                     &mms->mms_input,
                                      SUBSCRIPTION_NONE,
                                      NULL, NULL, NULL, NULL);
 
@@ -311,6 +316,7 @@ mpegts_mux_sched_delete ( mpegts_mux_sched_t *mms, int delconf )
   free(mms->mms_cronstr);
   free(mms->mms_mux);
   free(mms->mms_creator);
+  free(mms->mms_prch);
   free(mms);
 }
 
index f5dfa12353e0cfa7151d751b3e7f69f573d92135..c7cd85dff941042c410c151250a73b1590c79ae3 100644 (file)
@@ -25,6 +25,8 @@
 #include "idnode.h"
 #include "subscriptions.h"
 
+struct profile_chain;
+
 typedef LIST_HEAD(,mpegts_mux_sched) mpegts_mux_sched_list_t;
 
 extern mpegts_mux_sched_list_t mpegts_mux_sched_all;
@@ -57,8 +59,9 @@ typedef struct mpegts_mux_sched
   /*
    * Subscription
    */
-  th_subscription_t  *mms_sub;      ///< Subscription handler
-  streaming_target_t  mms_input;    ///< Streaming input
+  struct profile_chain *mms_prch;     ///< Dummy profile chain
+  th_subscription_t    *mms_sub;      ///< Subscription handler
+  streaming_target_t    mms_input;    ///< Streaming input
 
 } mpegts_mux_sched_t;
 
index aa6f62eb6b0bd37fcacb53e55e90087d71e7bde9..fe57b78de072a9fb6fe6ad8d4a868b009f8f7600 100644 (file)
@@ -99,6 +99,7 @@ profile_create
     if (!htsmsg_get_bool(conf, "shield", &b))
       pro->pro_shield = !!b;
   }
+  pro->pro_refcount = 1;
   TAILQ_INSERT_TAIL(&profiles, pro, pro_link);
   if (save)
     profile_class_save((idnode_t *)pro);
@@ -107,6 +108,16 @@ profile_create
   return pro;
 }
 
+void
+profile_release_(profile_t *pro)
+{
+  if (pro->pro_free)
+    pro->pro_free(pro);
+  free(pro->pro_name);
+  free(pro->pro_comment);
+  free(pro);
+}
+
 static void
 profile_delete(profile_t *pro, int delconf)
 {
@@ -119,11 +130,7 @@ profile_delete(profile_t *pro, int delconf)
   idnode_unlink(&pro->pro_id);
   dvr_config_destroy_by_profile(pro, delconf);
   access_destroy_by_profile(pro, delconf);
-  if (pro->pro_free)
-    pro->pro_free(pro);
-  free(pro->pro_name);
-  free(pro->pro_comment);
-  free(pro);
+  profile_release(pro);
 }
 
 static void
@@ -427,17 +434,58 @@ profile_get_htsp_list(htsmsg_t *array, htsmsg_t *filter)
   }
 }
 
+/*
+ *
+ */
+void
+profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id)
+{
+  memset(prch, 0, sizeof(*prch));
+  if (pro)
+    profile_grab(pro);
+  prch->prch_pro = pro;
+  prch->prch_id  = id;
+  streaming_queue_init(&prch->prch_sq, 0, 0);
+}
+
 /*
  *
  */
 int
-profile_chain_raw_open(profile_chain_t *prch, size_t qsize)
+profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
+                   uint32_t timeshift_period, int flags)
+{
+  profile_t *pro = prch->prch_pro;
+  if (pro && pro->pro_work)
+    return pro->pro_work(prch, dst, timeshift_period, flags);
+  return -1;
+}
+
+/*
+ *
+ */
+int
+profile_chain_open(profile_chain_t *prch,
+                   muxer_config_t *m_cfg, int flags, size_t qsize)
+{
+  profile_t *pro = prch->prch_pro;
+  if (pro && pro->pro_open)
+    return pro->pro_open(prch, m_cfg, flags, qsize);
+  return -1;
+}
+
+/*
+ *
+ */
+int
+profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize)
 {
   muxer_config_t c;
 
   memset(&c, 0, sizeof(c));
   c.m_type = MC_RAW;
   memset(prch, 0, sizeof(*prch));
+  prch->prch_id    = id;
   prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
   streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
   prch->prch_st    = &prch->prch_sq.sq_st;
@@ -467,6 +515,8 @@ profile_chain_close(profile_chain_t *prch)
     muxer_destroy(prch->prch_muxer);
   streaming_queue_deinit(&prch->prch_sq);
   prch->prch_st = NULL;
+  if (prch->prch_pro)
+    profile_release(prch->prch_pro);
 }
 
 /*
@@ -484,13 +534,10 @@ const idclass_t profile_htsp_class =
 };
 
 static int
-profile_htsp_work(profile_t *_pro, profile_chain_t *prch,
-                  void *id, streaming_target_t *dst,
+profile_htsp_work(profile_chain_t *prch,
+                  streaming_target_t *dst,
                   uint32_t timeshift_period, int flags)
 {
-  if (!(flags & PRCH_FLAG_SKIPZEROING))
-    memset(prch, 0, sizeof(*prch));
-
   if (flags & PRCH_FLAG_TSFIX)
     dst = prch->prch_tsfix = tsfix_create(prch->prch_transcoder);
 
@@ -548,10 +595,10 @@ const idclass_t profile_mpegts_pass_class =
 };
 
 static int
-profile_mpegts_pass_open(profile_t *_pro, profile_chain_t *prch, void *id,
+profile_mpegts_pass_open(profile_chain_t *prch,
                          muxer_config_t *m_cfg, int flags, size_t qsize)
 {
-  profile_mpegts_t *pro = (profile_mpegts_t *)_pro;
+  profile_mpegts_t *pro = (profile_mpegts_t *)prch->prch_pro;
   muxer_config_t c;
 
   if (m_cfg)
@@ -563,7 +610,6 @@ profile_mpegts_pass_open(profile_t *_pro, profile_chain_t *prch, void *id,
   c.m_rewrite_pat = pro->pro_rewrite_pat;
   c.m_rewrite_pmt = pro->pro_rewrite_pmt;
 
-  memset(prch, 0, sizeof(*prch));
   prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
   streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
   prch->prch_muxer = muxer_create(&c);
@@ -612,10 +658,10 @@ const idclass_t profile_matroska_class =
 };
 
 static int
-profile_matroska_open(profile_t *_pro, profile_chain_t *prch, void *id,
+profile_matroska_open(profile_chain_t *prch,
                       muxer_config_t *m_cfg, int flags, size_t qsize)
 {
-  profile_matroska_t *pro = (profile_matroska_t *)_pro;
+  profile_matroska_t *pro = (profile_matroska_t *)prch->prch_pro;
   muxer_config_t c;
 
   if (m_cfg)
@@ -627,7 +673,6 @@ profile_matroska_open(profile_t *_pro, profile_chain_t *prch, void *id,
   if (pro->pro_webm)
     c.m_type = MC_WEBM;
 
-  memset(prch, 0, sizeof(*prch));
   streaming_queue_init(&prch->prch_sq, 0, qsize);
   prch->prch_gh    = globalheaders_create(&prch->prch_sq.sq_st);
   prch->prch_tsfix = tsfix_create(prch->prch_gh);
@@ -883,11 +928,11 @@ const idclass_t profile_transcode_class =
 };
 
 static int
-profile_transcode_work(profile_t *_pro, profile_chain_t *prch,
-                       void *id, streaming_target_t *dst,
+profile_transcode_work(profile_chain_t *prch,
+                       streaming_target_t *dst,
                        uint32_t timeshift_period, int flags)
 {
-  profile_transcode_t *pro = (profile_transcode_t *)_pro;
+  profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
   transcoder_props_t props;
 
   memset(&props, 0, sizeof(props));
@@ -899,9 +944,6 @@ profile_transcode_work(profile_t *_pro, profile_chain_t *prch,
   props.tp_bandwidth  = pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
   strncpy(props.tp_language, pro->pro_language ?: "", 3);
 
-  if (!(flags & PRCH_FLAG_SKIPZEROING))
-    memset(prch, 0, sizeof(*prch));
-
 #if ENABLE_TIMESHIFT
   if (timeshift_period > 0)
     dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
@@ -935,10 +977,10 @@ profile_transcode_mc_valid(int mc)
 }
 
 static int
-profile_transcode_open(profile_t *_pro, profile_chain_t *prch, void *id,
+profile_transcode_open(profile_chain_t *prch,
                        muxer_config_t *m_cfg, int flags, size_t qsize)
 {
-  profile_transcode_t *pro = (profile_transcode_t *)_pro;
+  profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
   muxer_config_t c;
   int r;
 
@@ -952,12 +994,10 @@ profile_transcode_open(profile_t *_pro, profile_chain_t *prch, void *id,
       c.m_type = MC_MATROSKA;
   }
 
-  memset(prch, 0, sizeof(*prch));
-
   streaming_queue_init(&prch->prch_sq, 0, qsize);
   prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
 
-  r = profile_transcode_work(_pro, prch, prch->prch_gh, id, 0,
+  r = profile_transcode_work(prch, prch->prch_gh, 0,
                              PRCH_FLAG_SKIPZEROING | PRCH_FLAG_TSFIX);
   if (r)
     return r;
index 5fc715772a4562e1f1b44571a26265f1a2a9f21c..7be38cfd5817df67aa7afa695afaed347dc30986 100644 (file)
@@ -51,6 +51,8 @@ extern profile_builders_queue profile_builders;
 #define PRCH_FLAG_TSFIX       (1<<1)
 
 typedef struct profile_chain {
+  struct profile          *prch_pro;
+  void                    *prch_id;
   int                      prch_flags;
   struct streaming_queue   prch_sq;
   struct streaming_target *prch_st;
@@ -69,6 +71,8 @@ typedef struct profile {
   idnode_t pro_id;
   TAILQ_ENTRY(profile) pro_link;
 
+  int pro_refcount;
+
   LIST_HEAD(,dvr_config) pro_dvr_configs;
   LIST_HEAD(,access_entry) pro_accesses;
 
@@ -83,10 +87,9 @@ typedef struct profile {
   void (*pro_conf_changed)(struct profile *pro);
   muxer_container_type_t (*pro_get_mc)(struct profile *pro);
 
-  int (*pro_work)(struct profile *pro, profile_chain_t *prch,
-                  void *id, struct streaming_target *dst,
+  int (*pro_work)(profile_chain_t *prch, struct streaming_target *dst,
                   uint32_t timeshift_period, int flags);
-  int (*pro_open)(struct profile *pro, profile_chain_t *prch, void *id,
+  int (*pro_open)(profile_chain_t *prch,
                   muxer_config_t *m_cfg, int flags, size_t qsize);
 } profile_t;
 
@@ -95,17 +98,24 @@ void profile_register(const idclass_t *clazz, profile_builder_t builder);
 profile_t *profile_create
   (const char *uuid, htsmsg_t *conf, int save);
 
-static inline int
-profile_work(profile_t *pro, profile_chain_t *prch,
-             void *id, struct streaming_target *dst,
-             uint32_t timeshift_period, int flags)
-  { return pro && pro->pro_work ? pro->pro_work(pro, prch, id, dst, timeshift_period, flags) : -1; }
-
-static inline int
-profile_chain_open(profile_t *pro, profile_chain_t *prch, void *id,
-                   muxer_config_t *m_cfg, int flags, size_t qsize)
-  { return pro && pro->pro_open ? pro->pro_open(pro, prch, id, m_cfg, flags, qsize) : -1; }
-int  profile_chain_raw_open(profile_chain_t *prch, size_t qsize);
+static inline void profile_grab( profile_t *pro )
+  { pro->pro_refcount++; }
+
+void profile_release_( profile_t *pro );
+static inline void profile_release( profile_t *pro )
+  {
+    assert(pro->pro_refcount > 0);
+    if (--pro->pro_refcount == 0) profile_release_(pro);
+  }
+
+int
+profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
+                   uint32_t timeshift_period, int flags);
+int
+profile_chain_open(profile_chain_t *prch,
+                   muxer_config_t *m_cfg, int flags, size_t qsize);
+void profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id);
+int  profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize);
 void profile_chain_close(profile_chain_t *prch);
 
 static inline profile_t *profile_find_by_uuid(const char *uuid)
index eb098ce2a40f6681cb4825a47b18b1a07ae5afcd..3409b87834110762733532725810b2f3d9de8db5 100644 (file)
@@ -329,13 +329,15 @@ static void *
 service_mapper_thread ( void *aux )
 {
   service_t *s;
+  profile_chain_t prch;
   th_subscription_t *sub;
   int run, working = 0;
-  streaming_queue_t sq;
+  streaming_queue_t *sq;
   streaming_message_t *sm;
   const char *err = NULL;
 
-  streaming_queue_init(&sq, 0, 0);
+  profile_chain_init(&prch, NULL, NULL);
+  sq = &prch.prch_sq;
 
   pthread_mutex_lock(&global_lock);
 
@@ -362,8 +364,9 @@ service_mapper_thread ( void *aux )
 
     /* Subscribe */
     tvhinfo("service_mapper", "checking %s", s->s_nicename);
-    sub = subscription_create_from_service(s, NULL, SUBSCRIPTION_PRIO_MAPPER,
-                                           "service_mapper", &sq.sq_st,
+    prch.prch_id = s;
+    sub = subscription_create_from_service(&prch, SUBSCRIPTION_PRIO_MAPPER,
+                                           "service_mapper",
                                            0, NULL, NULL, "service_mapper");
 
     /* Failed */
@@ -380,20 +383,20 @@ service_mapper_thread ( void *aux )
 
     /* Wait */
     run = 1;
-    pthread_mutex_lock(&sq.sq_mutex);
+    pthread_mutex_lock(&sq->sq_mutex);
     while(tvheadend_running && run) {
 
       /* Wait for message */
-      while((sm = TAILQ_FIRST(&sq.sq_queue)) == NULL) {
-        pthread_cond_wait(&sq.sq_cond, &sq.sq_mutex);
+      while((sm = TAILQ_FIRST(&sq->sq_queue)) == NULL) {
+        pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
         if (!tvheadend_running)
           break;
       }
       if (!tvheadend_running)
         break;
 
-      TAILQ_REMOVE(&sq.sq_queue, sm, sm_link);
-      pthread_mutex_unlock(&sq.sq_mutex);
+      TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+      pthread_mutex_unlock(&sq->sq_mutex);
 
       if(sm->sm_type == SMT_PACKET) {
         run = 0;
@@ -411,13 +414,13 @@ service_mapper_thread ( void *aux )
       }
 
       streaming_msg_free(sm);
-      pthread_mutex_lock(&sq.sq_mutex);
+      pthread_mutex_lock(&sq->sq_mutex);
     }
     if (!tvheadend_running)
       break;
 
-    streaming_queue_clear(&sq.sq_queue);
-    pthread_mutex_unlock(&sq.sq_mutex);
+    streaming_queue_clear(&sq->sq_queue);
+    pthread_mutex_unlock(&sq->sq_mutex);
  
     pthread_mutex_lock(&global_lock);
     subscription_unsubscribe(sub);
@@ -434,6 +437,7 @@ service_mapper_thread ( void *aux )
   }
 
   pthread_mutex_unlock(&global_lock);
+  profile_chain_close(&prch);
   return NULL;
 }
 
index 86abfcd709a66537eaf8b8c7b9a2faabdafa0ac4..99bfa48ac978b2a398359999a7239059a7a297f6 100644 (file)
@@ -577,13 +577,16 @@ subscription_unsubscribe(th_subscription_t *s)
  */
 th_subscription_t *
 subscription_create
-  (profile_t *pro, int weight, const char *name, streaming_target_t *st,
+  (profile_chain_t *prch, int weight, const char *name,
    int flags, st_callback_t *cb, const char *hostname,
    const char *username, const char *client)
 {
   th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
+  profile_t *pro = prch ? prch->prch_pro : NULL;
+  streaming_target_t *st = prch ? prch->prch_st : NULL;
   int reject = 0;
   static int tally;
+
   TAILQ_INIT(&s->ths_instances);
 
   if(flags & SUBSCRIPTION_NONE)
@@ -601,6 +604,7 @@ subscription_create
 
   streaming_target_init(&s->ths_input, cb, s, reject);
 
+  s->ths_prch              = prch->prch_st ? prch : NULL;
   s->ths_weight            = weight;
   s->ths_title             = strdup(name);
   s->ths_hostname          = hostname ? strdup(hostname) : NULL;
@@ -633,22 +637,29 @@ subscription_create
  *
  */
 static th_subscription_t *
-subscription_create_from_channel_or_service(channel_t *ch,
-                                            service_t *t,
-                                            profile_t *pro,
+subscription_create_from_channel_or_service(profile_chain_t *prch,
                                             unsigned int weight,
                                             const char *name,
-                                            streaming_target_t *st,
                                             int flags,
                                             const char *hostname,
                                             const char *username,
-                                            const char *client)
+                                            const char *client,
+                                            int service)
 {
   th_subscription_t *s;
-  assert(!ch || !t);
-  assert(st);
+  channel_t *ch = NULL;
+  service_t *t  = NULL;
+
+  assert(prch);
+  assert(prch->prch_id);
+  assert(prch->prch_st);
+
+  if (service)
+    t  = prch->prch_id;
+  else
+    ch = prch->prch_id;
 
-  s = subscription_create(pro, weight, name, st, flags, subscription_input,
+  s = subscription_create(prch, weight, name, flags, subscription_input,
                           hostname, username, client);
   if (ch)
     tvhtrace("subscription", "%04X: creating subscription for %s weight %d",
@@ -666,29 +677,29 @@ subscription_create_from_channel_or_service(channel_t *ch,
 }
 
 th_subscription_t *
-subscription_create_from_channel(channel_t *ch, profile_t *pro,
+subscription_create_from_channel(profile_chain_t *prch,
                                  unsigned int weight,
-                                const char *name, streaming_target_t *st,
+                                const char *name,
                                 int flags, const char *hostname,
                                 const char *username, const char *client)
 {
   return subscription_create_from_channel_or_service
-           (ch, NULL, pro, weight, name, st, flags, hostname, username, client);
+           (prch, weight, name, flags, hostname, username, client, 0);
 }
 
 /**
  *
  */
 th_subscription_t *
-subscription_create_from_service(service_t *t, profile_t *pro,
+subscription_create_from_service(profile_chain_t *prch,
                                  unsigned int weight,
                                  const char *name,
-                                streaming_target_t *st, int flags,
+                                int flags,
                                 const char *hostname, const char *username, 
                                 const char *client)
 {
   return subscription_create_from_channel_or_service
-           (NULL, t, pro, weight, name, st, flags, hostname, username, client);
+           (prch, weight, name, flags, hostname, username, client, 1);
 }
 
 /**
@@ -738,16 +749,16 @@ mux_data_timeout ( void *aux )
 }
 
 th_subscription_t *
-subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro,
+subscription_create_from_mux(profile_chain_t *prch,
                              unsigned int weight,
                              const char *name,
-                             streaming_target_t *st,
                              int flags,
                              const char *hostname,
                              const char *username,
                              const char *client,
                              int *err)
 {
+  mpegts_mux_t *mm = prch->prch_id;
   th_subscription_t *s;
   streaming_message_t *sm;
   streaming_start_t *ss;
@@ -762,9 +773,9 @@ subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro,
   }
 
   /* Create subscription */
-  if (!st)
+  if (!prch->prch_st)
     flags |= SUBSCRIPTION_NONE;
-  s = subscription_create(pro, weight, name, st, flags, NULL,
+  s = subscription_create(prch, weight, name, flags, NULL,
                           hostname, username, client);
   s->ths_mmi = mm->mm_active;
 
@@ -784,7 +795,7 @@ subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro,
   LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
 
   /* Connect */
-  if (st)
+  if (prch->prch_st)
     streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input);
 
   /* Deliver a start message */
@@ -1057,6 +1068,7 @@ void
 subscription_dummy_join(const char *id, int first)
 {
   service_t *t = service_find_by_identifier(id);
+  profile_chain_t *prch;
   streaming_target_t *st;
   th_subscription_t *s;
 
@@ -1073,9 +1085,12 @@ subscription_dummy_join(const char *id, int first)
     return;
   }
 
-  st = calloc(1, sizeof(streaming_target_t));
+  prch = calloc(1, sizeof(*prch));
+  prch->prch_id = t;
+  st = calloc(1, sizeof(*st));
   streaming_target_init(st, dummy_callback, NULL, 0);
-  s = subscription_create_from_service(t, NULL, 1, "dummy", st, 0, NULL, NULL, "dummy");
+  prch->prch_st = st;
+  s = subscription_create_from_service(prch, 1, "dummy", 0, NULL, NULL, "dummy");
 
   tvhlog(LOG_NOTICE, "subscription",
          "%04X: Dummy join %s ok", shortid(s), id);
index 76461badef84943221604a61a9757b1c2901e994..d6461444fb0391000f769f6b4e81644ff217f5bf 100644 (file)
@@ -21,7 +21,7 @@
 
 #include "service.h"
 
-struct profile;
+struct profile_chain;
 
 extern struct th_subscription_list subscriptions;
 
@@ -46,6 +46,9 @@ typedef struct th_subscription {
   
   LIST_ENTRY(th_subscription) ths_global_link;
   LIST_ENTRY(th_subscription) ths_remove_link;
+
+  struct profile_chain *ths_prch;
+
   int ths_weight;
 
   enum {
@@ -127,11 +130,9 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);
 void subscription_reschedule(void);
 
 th_subscription_t *
-subscription_create_from_channel(struct channel *ch,
-                                 struct profile *pro,
+subscription_create_from_channel(struct profile_chain *prch,
                                 unsigned int weight,
                                 const char *name,
-                                streaming_target_t *st,
                                 int flags,
                                 const char *hostname,
                                 const char *username,
@@ -139,11 +140,9 @@ subscription_create_from_channel(struct channel *ch,
 
 
 th_subscription_t *
-subscription_create_from_service(struct service *t,
-                                 struct profile *pro,
+subscription_create_from_service(struct profile_chain *prch,
                                  unsigned int weight,
                                 const char *name,
-                                streaming_target_t *st,
                                 int flags,
                                 const char *hostname,
                                 const char *username,
@@ -152,20 +151,17 @@ subscription_create_from_service(struct service *t,
 #if ENABLE_MPEGTS
 struct mpegts_mux;
 th_subscription_t *
-subscription_create_from_mux(struct mpegts_mux *m,
-                             struct profile *pro,
+subscription_create_from_mux(struct profile_chain *prch,
                              unsigned int weight,
                              const char *name,
-                             streaming_target_t *st,
                              int flags,
                              const char *hostname,
                              const char *username,
                              const char *client, int *err);
 #endif
 
-th_subscription_t *subscription_create(struct profile *pro,
+th_subscription_t *subscription_create(struct profile_chain *prch,
                                        int weight, const char *name,
-                                      streaming_target_t *st,
                                       int flags, st_callback_t *cb,
                                       const char *hostname,
                                       const char *username,
index 32cee231f04b499044288bc1f3f336c7bb7dfc8d..410b9a23da04aebff792ea89074b86cabe94c07c 100644 (file)
@@ -732,12 +732,12 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
   else
     qsize = 1500000;
 
-  if (!profile_chain_open(pro, &prch, service, NULL, 0, qsize)) {
+  profile_chain_init(&prch, pro, service);
+  if (!profile_chain_open(&prch, NULL, 0, qsize)) {
 
     tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
 
-    s = subscription_create_from_service(service, pro, weight ?: 100, "HTTP",
-                                         prch.prch_st,
+    s = subscription_create_from_service(&prch, weight ?: 100, "HTTP",
                                          prch.prch_flags | SUBSCRIPTION_STREAMING,
                                          addrbuf,
                                         hc->hc_username,
@@ -786,12 +786,11 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
   else
     qsize = 10000000;
 
-  if (!profile_chain_raw_open(&prch, qsize)) {
+  if (!profile_chain_raw_open(&prch, mm, qsize)) {
 
     tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
 
-    s = subscription_create_from_mux(mm, NULL, weight ?: 10, "HTTP",
-                                     prch.prch_st,
+    s = subscription_create_from_mux(&prch, weight ?: 10, "HTTP",
                                      prch.prch_flags |
                                      SUBSCRIPTION_FULLMUX |
                                      SUBSCRIPTION_STREAMING,
@@ -846,12 +845,13 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
   else
     qsize = 1500000;
 
-  if (!profile_chain_open(pro, &prch, ch, NULL, 0, qsize)) {
+  profile_chain_init(&prch, pro, ch);
+  if (!profile_chain_open(&prch, NULL, 0, qsize)) {
 
     tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
 
-    s = subscription_create_from_channel(ch, pro, weight ?: 100, "HTTP",
-                 prch.prch_st, prch.prch_flags | SUBSCRIPTION_STREAMING,
+    s = subscription_create_from_channel(&prch, weight ?: 100, "HTTP",
+                 prch.prch_flags | SUBSCRIPTION_STREAMING,
                  addrbuf, hc->hc_username,
                  http_arg_get(&hc->hc_args, "User-Agent"));