]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
service/mpegts: add raw service type and raw PID handling
authorJaroslav Kysela <perex@perex.cz>
Thu, 5 Mar 2015 14:13:57 +0000 (15:13 +0100)
committerJaroslav Kysela <perex@perex.cz>
Wed, 11 Mar 2015 20:41:13 +0000 (21:41 +0100)
19 files changed:
src/dvr/dvr_rec.c
src/htsp_server.c
src/input/mpegts.h
src/input/mpegts/iptv/iptv.c
src/input/mpegts/mpegts_input.c
src/input/mpegts/mpegts_mux.c
src/input/mpegts/mpegts_mux_sched.c
src/input/mpegts/mpegts_pid.c
src/input/mpegts/mpegts_service.c
src/input/mpegts/tsdemux.c
src/input/mpegts/tsdemux.h
src/satip/rtsp.c
src/service.c
src/service.h
src/service_mapper.c
src/subscriptions.c
src/subscriptions.h
src/tvheadend.h
src/webui/webui.c

index 54ad9080b1bc81e140fd967c6dd9e3fc224bd186..832a78a200c28fa97ad7b524d8ed465da24559d7 100644 (file)
@@ -85,7 +85,7 @@ dvr_rec_subscribe(dvr_entry_t *de)
     return;
   }
 
-  de->de_s = subscription_create_from_channel(prch, weight,
+  de->de_s = subscription_create_from_channel(prch, NULL, weight,
                                              buf, prch->prch_flags,
                                              NULL, NULL, NULL);
   if (de->de_s == NULL) {
index a4ce59fd3346f0018a7f190f3598706504a0af53..e19cb2dc88c63695f10358532b068ab10402f5a1 100644 (file)
@@ -2023,7 +2023,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
 
   tvhdebug("htsp", "%s - subscribe to %s using profile %s\n",
            htsp->htsp_logname, channel_get_name(ch), pro->pro_name ?: "");
-  hs->hs_s = subscription_create_from_channel(&hs->hs_prch, weight,
+  hs->hs_s = subscription_create_from_channel(&hs->hs_prch, NULL, weight,
                                              htsp->htsp_logname,
                                              SUBSCRIPTION_STREAMING,
                                              htsp->htsp_peername,
index 59e762e54e6893667d6230eac6b897bbabdab48e..c6e9325e41210ca3a0f228ddc2673d98c6bdd9fa 100644 (file)
@@ -146,12 +146,14 @@ typedef struct mpegts_table_state
 typedef struct mpegts_pid_sub
 {
   RB_ENTRY(mpegts_pid_sub) mps_link;
-  LIST_ENTRY(mpegts_pid_sub) mps_svc_link;
-#define MPS_NONE    0x0
-#define MPS_STREAM  0x1
-#define MPS_TABLE   0x2
-#define MPS_FTABLE  0x4
-#define MPS_SERVICE 0x8
+  LIST_ENTRY(mpegts_pid_sub) mps_svcraw_link;
+#define MPS_NONE    0x00
+#define MPS_ALL     0x01
+#define MPS_RAW     0x02
+#define MPS_STREAM  0x04
+#define MPS_SERVICE 0x08
+#define MPS_TABLE   0x10
+#define MPS_FTABLE  0x20
   int                       mps_type;
   void                     *mps_owner;
 } mpegts_pid_sub_t;
@@ -429,11 +431,18 @@ struct mpegts_mux
   LIST_HEAD(, mpegts_mux_instance) mm_instances;
   mpegts_mux_instance_t *mm_active;
 
+  /*
+   * Raw subscriptions
+   */
+
+  LIST_HEAD(, th_subscription) mm_raw_subs;
+
   /*
    * Data processing
    */
 
   RB_HEAD(, mpegts_pid)       mm_pids;
+  LIST_HEAD(, mpegts_pid_sub) mm_all_subs;
   int                         mm_last_pid;
   mpegts_pid_t               *mm_last_mp;
 
@@ -578,8 +587,6 @@ struct mpegts_mux_instance
   mpegts_mux_t   *mmi_mux;
   mpegts_input_t *mmi_input;
 
-  LIST_HEAD(,th_subscription) mmi_subs;
-
   tvh_input_stream_stats_t mmi_stats;
 
   int             mmi_tune_failed;
@@ -745,6 +752,8 @@ int         mpegts_input_class_network_set  ( void *o, const void *p );
 htsmsg_t   *mpegts_input_class_network_enum ( void *o );
 char       *mpegts_input_class_network_rend ( void *o );
 
+int mpegts_mps_cmp( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b );
+
 void mpegts_network_register_builder
   ( const idclass_t *idc,
     mpegts_network_t *(*build)(const idclass_t *idc, htsmsg_t *conf) );
@@ -877,6 +886,9 @@ mpegts_pid_t * mpegts_input_open_pid
 void mpegts_input_close_pid
   ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner );
 
+void mpegts_input_close_pids
+  ( mpegts_input_t *mi, mpegts_mux_t *mm, int type, void *owner );
+
 static inline void
 tsdebug_write(mpegts_mux_t *mm, uint8_t *buf, size_t len)
 {
@@ -943,6 +955,8 @@ mpegts_service_t *mpegts_service_create0
   mpegts_service_create0(calloc(1, sizeof(mpegts_service_t)),\
                          &mpegts_service_class, u, m, s, p, c)
 
+mpegts_service_t *mpegts_service_create_raw(mpegts_mux_t *mm);
+
 mpegts_service_t *mpegts_service_find 
   ( mpegts_mux_t *mm, uint16_t sid, uint16_t pmt_pid, int create, int *save );
 
index 9aac168d6483fc2637652ebf71dd08d9ac1ab9d5..b1dda026898d41a6e6546c1c304f178bb97ae7cd 100644 (file)
@@ -140,13 +140,6 @@ iptv_input_get_weight ( mpegts_input_t *mi, int flags )
   if (!iptv_input_is_free(mi)) {
     w = 1000000;
 
-    /* Direct subs */
-    LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
-      LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
-        w = MIN(w, ths->ths_weight);
-      }
-    }
-
     /* Service subs */
     pthread_mutex_lock(&mi->mi_output_lock);
     LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
index 9849da5089e596e42f8dc6493e2591e68222ea83..890120211b0c8099c8bcee75b9d45623d77e7489 100644 (file)
@@ -336,19 +336,10 @@ mpegts_input_is_free ( mpegts_input_t *mi )
 int
 mpegts_input_get_weight ( mpegts_input_t *mi, int flags )
 {
-  const mpegts_mux_instance_t *mmi;
   const service_t *s;
   const th_subscription_t *ths;
   int w = 0, count = 0;
 
-  /* Direct subs */
-  LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
-    LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
-      w = MAX(w, ths->ths_weight);
-      count++;
-    }
-  }
-
   /* Service subs */
   pthread_mutex_lock(&mi->mi_output_lock);
   LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
@@ -401,8 +392,8 @@ mpegts_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
 {
 }
 
-static int
-mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b )
+int
+mpegts_mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b )
 {
   if (a->mps_type != b->mps_type) {
     if (a->mps_type & MPS_SERVICE)
@@ -415,25 +406,65 @@ mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b )
   return 0;
 }
 
+void
+mpegts_input_close_pids
+  ( mpegts_input_t *mi, mpegts_mux_t *mm, int type, void *owner )
+{
+  mpegts_pid_t *mp, *mp_next;
+  mpegts_pid_sub_t *mps, *mps_next;
+
+  for (mp = RB_FIRST(&mm->mm_pids); mp; mp = mp_next) {
+    mp_next = RB_NEXT(mp, mp_link);
+    if ((mp->mp_type & MPS_RAW) == 0) continue;
+    for (mps = RB_FIRST(&mp->mp_subs); mps; mps = mps_next) {
+      mps_next = RB_NEXT(mps, mps_link);
+      if (mps->mps_owner != owner) continue;
+      LIST_REMOVE(mps, mps_svcraw_link);
+      RB_REMOVE(&mp->mp_subs, mps, mps_link);
+      free(mps);
+      if (!RB_FIRST(&mp->mp_subs)) {
+        RB_REMOVE(&mm->mm_pids, mp, mp_link);
+        free(mp);
+      }
+    }
+  }
+}
+
 mpegts_pid_t *
 mpegts_input_open_pid
   ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner )
 {
   char buf[512];
   mpegts_pid_t *mp;
-  mpegts_pid_sub_t *mps;
+  mpegts_pid_sub_t *mps, *mps2;
   assert(owner != NULL);
-  assert((type & (MPS_STREAM|MPS_SERVICE)) == 0 ||
-         ((type & MPS_STREAM) ? 1 : 0) != ((type & MPS_SERVICE) ? 1 : 0));
+  assert((type & (MPS_STREAM|MPS_SERVICE|MPS_RAW)) == 0 ||
+         (((type & MPS_STREAM) ? 1 : 0) +
+          ((type & MPS_SERVICE) ? 1 : 0) +
+          ((type & MPS_RAW) ? 1 : 0)) == 1);
   lock_assert(&mi->mi_output_lock);
+
+  if (pid == MPEGTS_FULLMUX_PID)
+    mpegts_input_close_pids(mi, mm, MPS_RAW, owner);
+
   if ((mp = mpegts_mux_find_pid(mm, pid, 1))) {
     mps = calloc(1, sizeof(*mps));
     mps->mps_type  = type;
     mps->mps_owner = owner;
-    if (!RB_INSERT_SORTED(&mp->mp_subs, mps, mps_link, mps_cmp)) {
+    if (pid == MPEGTS_FULLMUX_PID) {
+      mp->mp_type |= type | MPS_ALL;
+      LIST_FOREACH(mps2, &mm->mm_all_subs, mps_svcraw_link)
+        if (mps2->mps_owner == owner) break;
+      if (mps2 == NULL) {
+        LIST_INSERT_HEAD(&mm->mm_all_subs, mps, mps_svcraw_link);
+        mpegts_mux_nice_name(mm, buf, sizeof(buf));
+        tvhdebug("mpegts", "%s - open PID fullmux subscription [%d/%p]",
+                 buf, type, owner);
+      }
+    } else if (!RB_INSERT_SORTED(&mp->mp_subs, mps, mps_link, mpegts_mps_cmp)) {
       mp->mp_type |= type;
-      if (type & MPS_SERVICE)
-        LIST_INSERT_HEAD(&mp->mp_svc_subs, mps, mps_svc_link);
+      if (type & (MPS_SERVICE|MPS_RAW))
+        LIST_INSERT_HEAD(&mp->mp_svc_subs, mps, mps_svcraw_link);
       mpegts_mux_nice_name(mm, buf, sizeof(buf));
       tvhdebug("mpegts", "%s - open PID %04X (%d) [%d/%p]",
                buf, mp->mp_pid, mp->mp_pid, type, owner);
@@ -456,39 +487,53 @@ mpegts_input_close_pid
   lock_assert(&mi->mi_output_lock);
   if (!(mp = mpegts_mux_find_pid(mm, pid, 0)))
     return;
-  skel.mps_type  = type;
-  skel.mps_owner = owner;
-  mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mps_cmp);
-  if (pid == mm->mm_last_pid) {
-    mm->mm_last_pid = -1;
-    mm->mm_last_mp = NULL;
-  }
-  if (mps) {
+  if (pid == MPEGTS_FULLMUX_PID) {
     mpegts_mux_nice_name(mm, buf, sizeof(buf));
-    tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]",
-             buf, mp->mp_pid, mp->mp_pid, type, owner);
-    if (type & MPS_SERVICE)
-      LIST_REMOVE(mps, mps_svc_link);
-    RB_REMOVE(&mp->mp_subs, mps, mps_link);
+    tvhdebug("mpegts", "%s - close PID fullmux subscription [%d/%p]",
+             buf, type, owner);
+    mpegts_input_close_pids(mi, mm, MPS_RAW, owner);
+    LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
+      if (mps->mps_owner == owner) break;
+    if (mps == NULL) return;
+    LIST_REMOVE(mps, mps_svcraw_link);
     free(mps);
-    if (!RB_FIRST(&mp->mp_subs)) {
-      RB_REMOVE(&mm->mm_pids, mp, mp_link);
-      if (mp->mp_fd != -1)
-        linuxdvb_filter_close(mp->mp_fd);
-      free(mp);
-    } else {
-      type = 0;
-      RB_FOREACH(mps, &mp->mp_subs, mps_link)
-        type |= mps->mps_type;
-      mp->mp_type = type;
+  } else {
+    skel.mps_type  = type;
+    skel.mps_owner = owner;
+    mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mpegts_mps_cmp);
+    if (pid == mm->mm_last_pid) {
+      mm->mm_last_pid = -1;
+      mm->mm_last_mp = NULL;
+    }
+    if (mps) {
+      mpegts_mux_nice_name(mm, buf, sizeof(buf));
+      tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]",
+               buf, mp->mp_pid, mp->mp_pid, type, owner);
+      if (type & (MPS_SERVICE|MPS_RAW))
+        LIST_REMOVE(mps, mps_svcraw_link);
+      RB_REMOVE(&mp->mp_subs, mps, mps_link);
+      free(mps);
     }
   }
+  if (!RB_FIRST(&mp->mp_subs)) {
+    RB_REMOVE(&mm->mm_pids, mp, mp_link);
+    if (mp->mp_fd != -1)
+      linuxdvb_filter_close(mp->mp_fd);
+    free(mp);
+  } else {
+    type = 0;
+    RB_FOREACH(mps, &mp->mp_subs, mps_link)
+      type |= mps->mps_type;
+    mp->mp_type = type;
+  }
 }
 
 void
 mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init )
 {
   elementary_stream_t *st;
+  mpegts_apids_t *pids;
+  int i;
 
   /* Add to list */
   pthread_mutex_lock(&mi->mi_output_lock);
@@ -499,24 +544,37 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init )
 
   /* Register PIDs */
   pthread_mutex_lock(&s->s_stream_mutex);
-  mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
-  mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
-  /* Open only filtered components here */
-  TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link) {
-    if (st->es_type != SCT_CA) {
-      st->es_pid_opened = 1;
-      mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
+  if (s->s_type == STYPE_STD) {
+    mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
+    mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
+    /* Open only filtered components here */
+    TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link) {
+      if (st->es_type != SCT_CA) {
+        st->es_pid_opened = 1;
+        mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
+      }
+    }
+  } else {
+    if ((pids = s->s_pids) != NULL) {
+      if (pids->all) {
+        mi->mi_open_pid(mi, s->s_dvb_mux, MPEGTS_FULLMUX_PID, MPS_RAW, s);
+      } else {
+        for (i = 0; i < pids->count; i++)
+          mi->mi_open_pid(mi, s->s_dvb_mux, pids->pids[i], MPS_RAW, s);
+      }
     }
   }
 
   pthread_mutex_unlock(&s->s_stream_mutex);
   pthread_mutex_unlock(&mi->mi_output_lock);
 
-   /* Add PMT monitor */
-  s->s_pmt_mon =
-    mpegts_table_add(s->s_dvb_mux, DVB_PMT_BASE, DVB_PMT_MASK,
-                     dvb_pmt_callback, s, "pmt",
-                     MT_CRC, s->s_pmt_pid);
+  /* Add PMT monitor */
+  if(s->s_type == STYPE_STD) {
+    s->s_pmt_mon =
+      mpegts_table_add(s->s_dvb_mux, DVB_PMT_BASE, DVB_PMT_MASK,
+                       dvb_pmt_callback, s, "pmt",
+                       MT_CRC, s->s_pmt_pid);
+  }
 }
 
 void
@@ -525,7 +583,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
   elementary_stream_t *st;
 
   /* Close PMT table */
-  if (s->s_pmt_mon)
+  if (s->s_type == STYPE_STD && s->s_pmt_mon)
     mpegts_table_destroy(s->s_pmt_mon);
   s->s_pmt_mon = NULL;
 
@@ -538,14 +596,18 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
   
   /* Close PID */
   pthread_mutex_lock(&s->s_stream_mutex);
-  mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
-  mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
-  /* Close all opened PIDs (the component filter may be changed at runtime) */
-  TAILQ_FOREACH(st, &s->s_components, es_link) {
-    if (st->es_pid_opened) {
-      st->es_pid_opened = 0;
-      mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
+  if (s->s_type == STYPE_STD) {
+    mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
+    mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
+    /* Close all opened PIDs (the component filter may be changed at runtime) */
+    TAILQ_FOREACH(st, &s->s_components, es_link) {
+      if (st->es_pid_opened) {
+        st->es_pid_opened = 0;
+        mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
+      }
     }
+  } else {
+    mpegts_input_close_pids(mi, s->s_dvb_mux, MPS_RAW, s);
   }
 
 
@@ -676,10 +738,16 @@ static int
 mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
 {
   int ret = 0;
-  service_t *t;
+  const service_t *t;
+  const th_subscription_t *ths;
   pthread_mutex_lock(&mi->mi_output_lock);
   LIST_FOREACH(t, &mi->mi_transports, s_active_link) {
     if (((mpegts_service_t*)t)->s_dvb_mux == mm) {
+      if (t->s_type == STYPE_RAW) {
+        LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link)
+          if (!strcmp(ths->ths_title, "keep")) break;
+        if (ths) continue;
+      }
       ret = 1;
       break;
     }
@@ -940,7 +1008,6 @@ mpegts_input_process
   uint8_t *end = mpkt->mp_data + len;
   mpegts_mux_t          *mm  = mpkt->mp_mux;
   mpegts_mux_instance_t *mmi;
-  th_subscription_t *ths;
 #if ENABLE_TSDEBUG
   off_t tsdebug_pos;
 #endif
@@ -948,9 +1015,6 @@ mpegts_input_process
   if (mm == NULL || (mmi = mm->mm_active) == NULL)
     return;
 
-  LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link)
-    ths->ths_live = 1;
-
   assert(mm == mmi->mmi_mux);
 
 #if ENABLE_TSDEBUG
@@ -959,7 +1023,7 @@ mpegts_input_process
 
   /* Process */
   assert((len % 188) == 0);
-  while ( tsb < end ) {
+  while (tsb < end) {
     pid = (tsb[1] << 8) | tsb[2];
     cc  = tsb[3];
 
@@ -993,10 +1057,18 @@ mpegts_input_process
       }
 
       type = mp->mp_type;
+      
+      /* Stream raw PIDs */
+      if (type & MPS_RAW) {
+        LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
+          ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb);
+        LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svcraw_link)
+          ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb);
+      }
 
       /* Stream service data */
       if (type & MPS_SERVICE) {
-        LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svc_link) {
+        LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svcraw_link) {
           s = mps->mps_owner;
           f = (type & (MPS_TABLE|MPS_FTABLE)) ||
               (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
@@ -1006,6 +1078,7 @@ mpegts_input_process
       /* Stream table data */
       if (type & MPS_STREAM) {
         LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
+          if (s->s_type != STYPE_STD) continue;
           f = (type & (MPS_TABLE|MPS_FTABLE)) ||
               (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
           ts_recv_packet1((mpegts_service_t*)s, tsb, NULL, f);
@@ -1031,6 +1104,13 @@ mpegts_input_process
           //tvhdebug("tsdemux", "%s - SI packet had errors", name);
         }
       }
+
+    } else {
+
+      /* Stream to all fullmux subscribers */
+      LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
+        ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb);
+
     }
 
 done:
@@ -1205,28 +1285,17 @@ mpegts_input_stream_status
 {
   int s = 0, w = 0;
   char buf[512];
-  th_subscription_t *sub;
+  th_subscription_t *ths;
+  const service_t *t;
   mpegts_mux_t *mm = mmi->mmi_mux;
   mpegts_input_t *mi = mmi->mmi_input;
 
-  /* Get number of subs */
-  // Note: this is a bit of a mess
-  LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) {
-    s++;
-    w = MAX(w, sub->ths_weight);
-  }
-  // Note: due to satconf acting as proxy for input we can't always
-  //       use mi_transports, so we can in via a convoluted route
-  LIST_FOREACH(sub, &subscriptions, ths_global_link) {
-    if (!sub->ths_service) continue;
-    if (!idnode_is_instance(&sub->ths_service->s_id,
-                            &mpegts_service_class)) continue;
-    mpegts_service_t *ms = (mpegts_service_t*)sub->ths_service;
-    if (ms->s_dvb_mux == mm) {
-      s++;
-      w = MAX(w, sub->ths_weight);
-    }
-  }
+  LIST_FOREACH(t, &mi->mi_transports, s_active_link)
+    if (((mpegts_service_t *)t)->s_dvb_mux == mm)
+      LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link) {
+        s++;
+        w = MAX(w, ths->ths_weight);
+      }
 
   st->uuid        = strdup(idnode_uuid_as_str(&mmi->mmi_id));
   mi->mi_display_name(mi, buf, sizeof(buf));
index fad436a6f7f072ef4c7ba9c38f149c82475020d5..59eddcc7206ac68dea0a258fe503a6a0c715d113 100644 (file)
@@ -99,20 +99,26 @@ static int
 mpegts_mux_keep_exists
   ( mpegts_input_t *mi )
 {
-  mpegts_mux_instance_t *mmi;
-  th_subscription_t *s;
+  const mpegts_mux_instance_t *mmi;
+  const service_t *s;
+  const th_subscription_t *ths;
+  int ret;
+
+  lock_assert(&global_lock);
 
   if (!mi)
     return 0;
 
-  mmi = LIST_FIRST(&mi->mi_mux_active);
-  if (mmi)
-    LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
-      LIST_FOREACH(s, &mmi->mmi_subs, ths_mmi_link)
-        if (!strcmp(s->ths_title, "keep"))
-          return 1;
-
-  return 0;
+  ret = 0;
+  LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
+    LIST_FOREACH(ths, &mmi->mmi_mux->mm_raw_subs, ths_mux_link) {
+      s = ths->ths_service;
+      if (s && s->s_type == STYPE_RAW && !strcmp(ths->ths_title, "keep")) {
+        ret = 1;
+        break;
+      }
+    }
+  return ret;
 }
 
 static int
@@ -255,11 +261,6 @@ mpegts_mux_instance_weight ( mpegts_mux_instance_t *mmi )
   mpegts_input_t *mi = mmi->mmi_input;
   lock_assert(&mi->mi_output_lock);
 
-  /* Direct subs */
-  LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
-    w = MAX(w, ths->ths_weight);
-  }
-
   /* Service subs */
   LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
     mpegts_service_t *ms = (mpegts_service_t*)s;
@@ -806,15 +807,9 @@ static int
 mpegts_mux_has_subscribers ( mpegts_mux_t *mm, const char *name )
 {
   mpegts_mux_instance_t *mmi = mm->mm_active;
-  th_subscription_t *sub;
   if (mmi) {
-    if ((sub = LIST_FIRST(&mmi->mmi_subs)) != NULL)
-      if (strcmp(sub->ths_title, "keep") || LIST_NEXT(sub, ths_mmi_link)) {
-        tvhtrace("mpegts", "%s - keeping mux (direct subscription)", name);
-        return 1;
-      }
     if (mmi->mmi_input->mi_has_subscription(mmi->mmi_input, mm)) {
-      tvhtrace("mpegts", "%s - keeping mux (service)", name);
+      tvhtrace("mpegts", "%s - keeping mux", name);
       return 1;
     }
   }
@@ -827,7 +822,6 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force, int reason )
   char buf[256], buf2[256], *s;
   mpegts_mux_instance_t *mmi = mm->mm_active, *mmi2;
   mpegts_input_t *mi = NULL, *mi2;
-  th_subscription_t *sub;
   mpegts_pid_t *mp;
   mpegts_pid_sub_t *mps;
 
@@ -864,8 +858,6 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force, int reason )
   }
 
   mi->mi_stopping_mux(mi, mmi);
-  LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
-    subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
   mi->mi_stop_mux(mi, mmi);
   mi->mi_stopped_mux(mi, mmi);
 
@@ -885,13 +877,22 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force, int reason )
   mm->mm_last_mp = NULL;
   while ((mp = RB_FIRST(&mm->mm_pids))) {
     assert(mi);
-    while ((mps = RB_FIRST(&mp->mp_subs))) {
-      tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", buf,
-               mp->mp_pid, mp->mp_pid, mps->mps_type, mps->mps_owner);
-      RB_REMOVE(&mp->mp_subs, mps, mps_link);
-      if (mps->mps_type & MPS_SERVICE)
-        LIST_REMOVE(mps, mps_svc_link);
-      free(mps);
+    if (mp->mp_pid == MPEGTS_FULLMUX_PID) {
+      while ((mps = LIST_FIRST(&mm->mm_all_subs))) {
+        tvhdebug("mpegts", "%s - close PID fullmux subscription [%d/%p]",
+                 buf, mps->mps_type, mps->mps_owner);
+        LIST_REMOVE(mps, mps_svcraw_link);
+        free(mps);
+      }
+    } else {
+      while ((mps = RB_FIRST(&mp->mp_subs))) {
+        tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", buf,
+                 mp->mp_pid, mp->mp_pid, mps->mps_type, mps->mps_owner);
+        RB_REMOVE(&mp->mp_subs, mps, mps_link);
+        if (mps->mps_type & (MPS_SERVICE|MPS_RAW|MPS_ALL))
+          LIST_REMOVE(mps, mps_svcraw_link);
+        free(mps);
+      }
     }
     RB_REMOVE(&mm->mm_pids, mp, mp_link);
     if (mp->mp_fd != -1)
@@ -1285,7 +1286,6 @@ mpegts_mux_remove_subscriber
   mpegts_mux_nice_name(mm, buf, sizeof(buf));
   tvhtrace("mpegts", "%s - remove subscriber (reason %i)", buf, reason);
 #endif
-  subscription_unlink_mux(s, reason);
   mm->mm_stop(mm, 0, reason);
 }
 
@@ -1294,7 +1294,6 @@ mpegts_mux_subscribe
   ( mpegts_mux_t *mm, mpegts_input_t *mi,
     const char *name, int weight, int flags )
 {
-  int err = 0;
   profile_chain_t prch;
   th_subscription_t *s;
   memset(&prch, 0, sizeof(prch));
@@ -1302,8 +1301,8 @@ mpegts_mux_subscribe
   s = subscription_create_from_mux(&prch, (tvh_input_t *)mi,
                                    weight, name,
                                    SUBSCRIPTION_NONE | flags,
-                                   NULL, NULL, NULL, &err);
-  return s ? 0 : err;
+                                   NULL, NULL, NULL);
+  return s ? 0 : -EIO;
 }
 
 void
@@ -1311,13 +1310,15 @@ mpegts_mux_unsubscribe_by_name
   ( mpegts_mux_t *mm, const char *name )
 {
   mpegts_mux_instance_t *mmi;
+  const service_t *t;
   th_subscription_t *s, *n;
 
   LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link) {
-    s = LIST_FIRST(&mmi->mmi_subs);
+    s = LIST_FIRST(&subscriptions);
     while (s) {
-      n = LIST_NEXT(s, ths_mmi_link);
-      if (!strcmp(s->ths_title, name))
+      n = LIST_NEXT(s, ths_global_link);
+      t = s->ths_service;
+      if (t && t->s_type == STYPE_RAW && !strcmp(s->ths_title, name))
         subscription_unsubscribe(s);
       s = n;
     }
@@ -1328,9 +1329,7 @@ void
 mpegts_mux_tuning_error ( const char *mux_uuid, mpegts_mux_instance_t *mmi_match )
 {
   mpegts_mux_t *mm;
-  th_subscription_t *sub;
   mpegts_mux_instance_t *mmi;
-  streaming_message_t *sm;
   struct timespec timeout;
 
   timeout.tv_sec = 2;
@@ -1339,14 +1338,9 @@ mpegts_mux_tuning_error ( const char *mux_uuid, mpegts_mux_instance_t *mmi_match
   if (!pthread_mutex_timedlock(&global_lock, &timeout)) {
     mm = mpegts_mux_find(mux_uuid);
     if (mm) {
-      if ((mmi = mm->mm_active) != NULL && mmi == mmi_match) {
-        LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) {
-          sm = streaming_msg_create_code(SMT_SERVICE_STATUS, TSS_TUNING);
-          streaming_target_deliver(sub->ths_output, sm);
-        }
+      if ((mmi = mm->mm_active) != NULL && mmi == mmi_match)
         if (mmi->mmi_input)
           mmi->mmi_input->mi_tuning_error(mmi->mmi_input, mm);
-      }
     }
     pthread_mutex_unlock(&global_lock);
   }
@@ -1380,15 +1374,17 @@ mpegts_mux_find_pid_ ( mpegts_mux_t *mm, int pid, int create )
 
   skel.mp_pid = pid;
   mp = RB_FIND(&mm->mm_pids, &skel, mp_link, mp_cmp);
-  if (mp == NULL && create) {
-    mp = calloc(1, sizeof(*mp));
-    mp->mp_pid = pid;
-    if (!RB_INSERT_SORTED(&mm->mm_pids, mp, mp_link, mp_cmp)) {
-      mp->mp_fd = -1;
-      mp->mp_cc = -1;
-    } else {
-      free(mp);
-      mp = NULL;
+  if (mp == NULL) {
+    if (create) {
+      mp = calloc(1, sizeof(*mp));
+      mp->mp_pid = pid;
+      if (!RB_INSERT_SORTED(&mm->mm_pids, mp, mp_link, mp_cmp)) {
+        mp->mp_fd = -1;
+        mp->mp_cc = -1;
+      } else {
+        free(mp);
+        mp = NULL;
+      }
     }
   }
   if (mp) {
index 3794fad0c6fe3a724f5508bc3487ee1effe72a08..06689f148959e92faea70a6990a924cc2f48bbda 100644 (file)
@@ -214,7 +214,7 @@ mpegts_mux_sched_timer ( void *p )
       = subscription_create_from_mux(mms->mms_prch, NULL, mms->mms_weight,
                                      mms->mms_creator ?: "",
                                      SUBSCRIPTION_NONE,
-                                     NULL, NULL, NULL, NULL);
+                                     NULL, NULL, NULL);
 
     /* Failed (try-again soon) */
     if (!mms->mms_sub) {
index ede0a80b4393351cab28bf2190fcee7bebb2da52..1d645173398884ed03bfe1fb92a78c1091b88dc1 100644 (file)
@@ -159,9 +159,16 @@ mpegts_pid_compare(mpegts_apids_t *dst, mpegts_apids_t *src,
 {
   int i;
 
+  assert(dst);
+  assert(add);
+  assert(del);
   if (mpegts_pid_init(add, NULL, 0) ||
       mpegts_pid_init(del, NULL, 0))
     return -1;
+  if (src == NULL) {
+    mpegts_pid_copy(add, dst);
+    return add->count > 0;
+  }
   for (i = 0; i < src->count; i++)
     if (mpegts_pid_find_index(dst, src->pids[i]) < 0)
       mpegts_pid_add(del, src->pids[i]);
index 49b165f2261bd38f3a27b25a281b1a24061527dc..48e52de712918f0c2134861513aa27a1b949a9cb 100644 (file)
@@ -236,7 +236,8 @@ mpegts_service_config_save ( service_t *t )
  * Service instance list
  */
 static void
-mpegts_service_enlist(service_t *t, struct service_instance_list *sil, int flags)
+mpegts_service_enlist(service_t *t, tvh_input_t *ti,
+                      struct service_instance_list *sil, int flags)
 {
   int p = 0, w;
   mpegts_service_t      *s = (mpegts_service_t*)t;
@@ -256,6 +257,9 @@ mpegts_service_enlist(service_t *t, struct service_instance_list *sil, int flags
 
     mi = mmi->mmi_input;
 
+    if (ti && (tvh_input_t *)mi != ti)
+      continue;
+
     if (!mi->mi_is_enabled(mi, mmi->mmi_mux, flags)) continue;
 
     /* Set weight to -1 (forced) for already active mux */
@@ -495,7 +499,7 @@ mpegts_service_delete ( service_t *t, int delconf )
   mpegts_mux_t     *mm = ms->s_dvb_mux;
 
   /* Remove config */
-  if (delconf)
+  if (delconf && t->s_type == STYPE_STD)
     hts_settings_remove("input/dvb/networks/%s/muxes/%s/services/%s",
                       idnode_uuid_as_str(&mm->mm_network->mn_id),
                       idnode_uuid_as_str(&mm->mm_id),
@@ -531,7 +535,8 @@ mpegts_service_create0
   /* defaults for older version */
   s->s_dvb_created = dispatch_clock;
 
-  if (service_create0((service_t*)s, class, uuid, S_MPEG_TS, conf) == NULL)
+  if (service_create0((service_t*)s, STYPE_STD, class, uuid,
+                      S_MPEG_TS, conf) == NULL)
     return NULL;
 
   /* Create */
@@ -547,7 +552,6 @@ mpegts_service_create0
   
   s->s_delete         = mpegts_service_delete;
   s->s_is_enabled     = mpegts_service_is_enabled;
-  s->s_config_save    = mpegts_service_config_save;
   s->s_enlist         = mpegts_service_enlist;
   s->s_start_feed     = mpegts_service_start;
   s->s_stop_feed      = mpegts_service_stop;
@@ -618,6 +622,128 @@ mpegts_service_find
   return s;
 }
 
+/*
+ * Raw MPEGTS Service
+ */
+
+const idclass_t mpegts_service_raw_class =
+{
+  .ic_super      = &service_raw_class,
+  .ic_class      = "mpegts_raw_service",
+  .ic_caption    = "MPEGTS Raw Service",
+  .ic_properties = NULL
+};
+
+static void
+mpegts_service_raw_setsourceinfo(service_t *t, source_info_t *si)
+{
+  mpegts_service_setsourceinfo(t, si);
+
+  free(si->si_service);
+  si->si_service = strdup("Raw Service");
+}
+
+static int
+mpegts_service_raw_update_pids(service_t *t, mpegts_apids_t *pids)
+{
+  mpegts_service_t *ms = (mpegts_service_t *)t;
+  mpegts_input_t *mi = ms->s_dvb_active_input;
+  mpegts_mux_t *mm = ms->s_dvb_mux;
+  mpegts_apids_t *p, *x;
+  mpegts_apids_t add, del;
+  int i;
+
+  lock_assert(&global_lock);
+  if (pids) {
+    p = calloc(1, sizeof(*p));
+    mpegts_pid_init(p, NULL, 0);
+    mpegts_pid_copy(p, pids);
+  } else
+    p = NULL;
+  if (mi && mm) {
+    pthread_mutex_lock(&mi->mi_output_lock);
+    pthread_mutex_lock(&t->s_stream_mutex);
+    x = t->s_pids;
+    t->s_pids = p;
+    if (!pids->all && x && x->all) {
+      mi->mi_close_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW, t);
+      mpegts_input_close_pids(mi, mm, MPS_RAW, t);
+      for (i = 0; i < x->count; i++)
+        mi->mi_open_pid(mi, mm, x->pids[i], MPS_RAW, t);
+    } else {
+      if (pids->all) {
+        mpegts_input_close_pids(mi, mm, MPS_RAW, t);
+        mi->mi_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW, t);
+      } else {
+        mpegts_pid_compare(p, x, &add, &del);
+        for (i = 0; i < del.count; i++)
+          mi->mi_close_pid(mi, mm, del.pids[i], MPS_RAW, t);
+        for (i = 0; i < add.count; i++)
+          mi->mi_open_pid(mi, mm, add.pids[i], MPS_RAW, t);
+        mpegts_pid_done(&add);
+        mpegts_pid_done(&del);
+      }
+    }
+    pthread_mutex_unlock(&t->s_stream_mutex);
+    pthread_mutex_unlock(&mi->mi_output_lock);
+  } else {
+    pthread_mutex_lock(&t->s_stream_mutex);
+    x = t->s_pids;
+    t->s_pids = p;
+    pthread_mutex_unlock(&t->s_stream_mutex);
+  }
+  if (x) {
+    mpegts_pid_done(x);
+    free(x);
+  }
+  return 0;
+}
+
+mpegts_service_t *
+mpegts_service_create_raw ( mpegts_mux_t *mm )
+{
+  mpegts_service_t *s = calloc(1, sizeof(*s));
+  char buf[256];
+
+  mpegts_mux_nice_name(mm, buf, sizeof(buf));
+
+  if (service_create0((service_t*)s, STYPE_RAW,
+                      &mpegts_service_raw_class, NULL,
+                      S_MPEG_TS, NULL) == NULL) {
+    free(s);
+    return NULL;
+  }
+
+  sbuf_init(&s->s_tsbuf);
+
+  s->s_dvb_mux        = mm;
+
+  s->s_delete         = mpegts_service_delete;
+  s->s_is_enabled     = mpegts_service_is_enabled;
+  s->s_config_save    = mpegts_service_config_save;
+  s->s_enlist         = mpegts_service_enlist;
+  s->s_start_feed     = mpegts_service_start;
+  s->s_stop_feed      = mpegts_service_stop;
+  s->s_refresh_feed   = mpegts_service_refresh;
+  s->s_setsourceinfo  = mpegts_service_raw_setsourceinfo;
+  s->s_grace_period   = mpegts_service_grace_period;
+  s->s_channel_number = mpegts_service_channel_number;
+  s->s_channel_name   = mpegts_service_channel_name;
+  s->s_provider_name  = mpegts_service_provider_name;
+  s->s_channel_icon   = mpegts_service_channel_icon;
+  s->s_mapped         = mpegts_service_mapped;
+  s->s_update_pids    = mpegts_service_raw_update_pids;
+
+  pthread_mutex_lock(&s->s_stream_mutex);
+  free(s->s_nicename);
+  s->s_nicename = strdup(buf);
+  pthread_mutex_unlock(&s->s_stream_mutex);
+
+  tvhlog(LOG_DEBUG, "mpegts", "%s - add raw service", buf);
+
+  return s;
+}
+
 /******************************************************************************
  * Editor Configuration
  *
index 93ab1c5189aff6ebf49d22386c7f131843e409da..9c8be7e31db0e8d8ebb6fd1e8b645a1266546e17 100644 (file)
@@ -214,6 +214,28 @@ ts_recv_packet2(mpegts_service_t *t, const uint8_t *tsb)
     ts_recv_packet0(t, st, tsb);
 }
 
+/*
+ *
+ */
+void
+ts_recv_raw(mpegts_service_t *t, const uint8_t *tsb)
+{
+  elementary_stream_t *st = NULL;
+  int pid;
+
+  pthread_mutex_lock(&t->s_stream_mutex);
+  if (t->s_parent) {
+    /* If PID is owned by parent, let parent service to
+     * deliver this PID (decrambling)
+     */
+    pid = (tsb[1] & 0x1f) << 8 | tsb[2];
+    st = service_stream_find(t->s_parent, pid);
+  }
+  if(st == NULL)
+    if (streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS))
+      ts_remux(t, tsb, 0);
+  pthread_mutex_unlock(&t->s_stream_mutex);
+}
 
 /**
  *
index 756f9acbc4431ebc6e78c3fc58033d977a712c56..74d19eca019840fbbf9381daabaee7d1920258b1 100644 (file)
@@ -26,4 +26,6 @@ int ts_recv_packet1
 
 void ts_recv_packet2(struct mpegts_service *t, const uint8_t *tsb);
 
+void ts_recv_raw(struct mpegts_service *t, const uint8_t *tsb);
+
 #endif /* TSDEMUX_H */
index 04a987ea76add3c0152db351b18cb6354b1a17c2..c41fa4e4fbd42503c4827a286c8c671ddc6478bd 100644 (file)
@@ -247,6 +247,7 @@ rtsp_start
   mpegts_network_t *mn, *mn2;
   dvb_network_t *ln;
   dvb_mux_t *mux;
+  service_t *svc;
   char buf[384];
   int res = HTTP_STATUS_SERVICE, qsize = 3000000, created = 0;
 
@@ -290,10 +291,9 @@ rtsp_start
                                    config_get_int("satip_weight", 100),
                                    "SAT>IP",
                                    rs->prch.prch_flags |
-                                   SUBSCRIPTION_FULLMUX |
                                    SUBSCRIPTION_STREAMING,
                                    addrbuf, hc->hc_username,
-                                   http_arg_get(&hc->hc_args, "User-Agent"), NULL);
+                                   http_arg_get(&hc->hc_args, "User-Agent"));
     if (!rs->subs)
       goto endrtp;
     if (rs->run) {
@@ -303,6 +303,8 @@ rtsp_start
     }
   } else {
 pids:
+    svc = rs->subs->ths_service;
+    svc->s_update_pids(svc, &rs->pids);
     satip_rtp_update_pids((void *)(intptr_t)rs->stream, &rs->pids);
   }
   if (!setup && !rs->run) {
@@ -314,6 +316,8 @@ pids:
                     rs->udp_rtp->fd, rs->udp_rtcp->fd,
                     rs->frontend, rs->findex, &rs->mux->lm_tuning,
                     &rs->pids);
+    svc = rs->subs->ths_service;
+    svc->s_update_pids(svc, &rs->pids);
     rs->run = 1;
   }
   pthread_mutex_unlock(&global_lock);
index c78b4aa73bd462600e6af47239f94f733f41d66d..bf2b0e4b593ba3b76e292bdc427ce25a2cd7724c 100644 (file)
@@ -53,6 +53,7 @@ static void service_class_delete(struct idnode *self);
 static void service_class_save(struct idnode *self);
 
 struct service_queue service_all;
+struct service_queue service_raw_all;
 
 static void
 service_class_notify_enabled ( void *obj )
@@ -246,6 +247,17 @@ const idclass_t service_class = {
   }
 };
 
+const idclass_t service_raw_class = {
+  .ic_class      = "service_raw",
+  .ic_caption    = "Service Raw",
+  .ic_event      = "service_raw",
+  .ic_perm_def   = ACCESS_ADMIN,
+  .ic_delete     = service_class_delete,
+  .ic_save       = NULL,
+  .ic_get_title  = service_class_get_title,
+  .ic_properties = NULL
+};
+
 /**
  *
  */
@@ -682,7 +694,8 @@ service_start(service_t *t, int instance, int timeout, int postpone)
  */
 service_instance_t *
 service_find_instance
-  (service_t *s, channel_t *ch, service_instance_list_t *sil,
+  (service_t *s, channel_t *ch, tvh_input_t *ti,
+   service_instance_list_t *sil,
    int *error, int weight, int flags, int timeout, int postpone)
 {
   channel_service_mapping_t *csm;
@@ -703,10 +716,10 @@ service_find_instance
     LIST_FOREACH(csm, &ch->ch_services, csm_chn_link) {
       s = csm->csm_svc;
       if (s->s_is_enabled(s, flags))
-        s->s_enlist(s, sil, flags);
+        s->s_enlist(s, ti, sil, flags);
     }
   } else {
-    s->s_enlist(s, sil, flags);
+    s->s_enlist(s, ti, sil, flags);
   }
 
   /* Clean */
@@ -842,7 +855,10 @@ service_destroy(service_t *t, int delconf)
 
   avgstat_flush(&t->s_rate);
 
-  TAILQ_REMOVE(&service_all, t, s_all_link);
+  if (t->s_type == STYPE_RAW)
+    TAILQ_REMOVE(&service_raw_all, t, s_all_link);
+  else
+    TAILQ_REMOVE(&service_all, t, s_all_link);
 
   service_unref(t);
 }
@@ -882,7 +898,8 @@ service_provider_name ( service_t *s )
  */
 service_t *
 service_create0
-  ( service_t *t, const idclass_t *class, const char *uuid,
+  ( service_t *t, int service_type,
+    const idclass_t *class, const char *uuid,
     int source_type, htsmsg_t *conf )
 {
   if (idnode_insert(&t->s_id, uuid, class, 0)) {
@@ -894,10 +911,14 @@ service_create0
 
   lock_assert(&global_lock);
   
-  TAILQ_INSERT_TAIL(&service_all, t, s_all_link);
+  if (service_type == STYPE_RAW)
+    TAILQ_INSERT_TAIL(&service_raw_all, t, s_all_link);
+  else
+    TAILQ_INSERT_TAIL(&service_all, t, s_all_link);
 
   pthread_mutex_init(&t->s_stream_mutex, NULL);
   pthread_cond_init(&t->s_tss_cond, NULL);
+  t->s_type = service_type;
   t->s_source_type = source_type;
   t->s_refcount = 1;
   t->s_enabled = 1;
@@ -1201,6 +1222,9 @@ service_restart(service_t *t)
 {
   int had_components;
 
+  if(t->s_type != STYPE_STD)
+    goto refresh;
+
   pthread_mutex_lock(&t->s_stream_mutex);
 
   had_components = TAILQ_FIRST(&t->s_filt_components) != NULL &&
@@ -1227,6 +1251,7 @@ service_restart(service_t *t)
 
   pthread_mutex_unlock(&t->s_stream_mutex);
 
+refresh:
   if(t->s_refresh_feed != NULL)
     t->s_refresh_feed(t);
 
@@ -1298,6 +1323,9 @@ static struct service_queue pending_save_queue;
 void
 service_request_save(service_t *t, int restart)
 {
+  if (t->s_type != STYPE_STD && !restart)
+    return;
+
   pthread_mutex_lock(&pending_save_mutex);
 
   if(!t->s_ps_onqueue) {
@@ -1359,7 +1387,7 @@ service_saver(void *aux)
     pthread_mutex_unlock(&pending_save_mutex);
     pthread_mutex_lock(&global_lock);
 
-    if(t->s_status != SERVICE_ZOMBIE)
+    if(t->s_status != SERVICE_ZOMBIE && t->s_config_save)
       t->s_config_save(t);
     if(t->s_status == SERVICE_RUNNING && restart)
       service_restart(t);
@@ -1384,6 +1412,7 @@ service_init(void)
 {
   TAILQ_INIT(&pending_save_queue);
   TAILQ_INIT(&service_all);
+  TAILQ_INIT(&service_raw_all);
   pthread_mutex_init(&pending_save_mutex, NULL);
   pthread_cond_init(&pending_save_cond, NULL);
   tvhthread_create(&service_saver_tid, NULL, service_saver, NULL);
index dd660fec2efc5766d6cdb8edf5dadab69954f227..e9e1b525471ef718e4117873631aa3725460132d 100644 (file)
 #include "descrambler.h"
 
 extern const idclass_t service_class;
+extern const idclass_t service_raw_class;
 
 extern struct service_queue service_all;
+extern struct service_queue service_raw_all;
 
 struct channel;
+struct tvh_input;
+struct mpegts_apids;
 
 /**
  * Stream, one media component for a service.
@@ -232,6 +236,14 @@ typedef struct service {
    */ 
   int s_refcount;
 
+  /**
+   * Service type, standard or raw (for mux or partial mux streaming)
+   */
+  enum {
+    STYPE_STD,
+    STYPE_RAW
+  } s_type;
+
   /**
    * Source type is used to determine if an output requesting
    * MPEG-TS can shortcut all the parsing and remuxing.
@@ -282,7 +294,8 @@ typedef struct service {
 
   int (*s_is_enabled)(struct service *t, int flags);
 
-  void (*s_enlist)(struct service *s, service_instance_list_t *sil, int flags);
+  void (*s_enlist)(struct service *s, struct tvh_input *ti,
+                   service_instance_list_t *sil, int flags);
 
   int (*s_start_feed)(struct service *s, int instance);
 
@@ -298,6 +311,10 @@ typedef struct service {
 
   void (*s_delete)(struct service *t, int delconf);
 
+#if ENABLE_MPEGTS
+  int (*s_update_pids)(struct service *t, struct mpegts_apids *pids);
+#endif
+
   /**
    * Channel info
    */
@@ -441,7 +458,10 @@ typedef struct service {
   struct elementary_stream_queue s_filt_components;
   int s_last_pid;
   elementary_stream_t *s_last_es;
-
+#if ENABLE_MPEGTS
+  struct service *s_parent;
+  struct mpegts_apids *s_pids;
+#endif
 
   /**
    * Delivery pad, this is were we finally deliver all streaming output
@@ -471,10 +491,11 @@ void service_stop(service_t *t);
 
 void service_build_filter(service_t *t);
 
-service_t *service_create0(service_t *t, const idclass_t *idc, const char *uuid, int source_type, htsmsg_t *conf);
+service_t *service_create0(service_t *t, int service_type, const idclass_t *idc,
+                           const char *uuid, int source_type, htsmsg_t *conf);
 
-#define service_create(t, c, u, s, m)\
-  (struct t*)service_create0(calloc(1, sizeof(struct t), &t##_class, c, u, s, m)
+#define service_create(t, y, c, u, s, m)\
+  (struct t*)service_create0(calloc(1, sizeof(struct t), y, &t##_class, c, u, s, m)
 
 void service_unref(service_t *t);
 
@@ -486,6 +507,7 @@ static inline service_t *service_find(const char *identifier)
 
 service_instance_t *service_find_instance(struct service *s,
                                           struct channel *ch,
+                                          struct tvh_input *source,
                                           service_instance_list_t *sil,
                                           int *error, int weight,
                                           int flags, int timeout,
index 777da9ec85a71c622b74d8b545d060146ab178d9..b7d98bf3deaaac20376b8ce78dfeec4dc4842adc 100644 (file)
@@ -375,7 +375,7 @@ service_mapper_thread ( void *aux )
     /* Subscribe */
     tvhinfo("service_mapper", "checking %s", s->s_nicename);
     prch.prch_id = s;
-    sub = subscription_create_from_service(&prch, SUBSCRIPTION_PRIO_MAPPER,
+    sub = subscription_create_from_service(&prch, NULL, SUBSCRIPTION_PRIO_MAPPER,
                                            "service_mapper",
                                            0, NULL, NULL, "service_mapper");
 
index d6c9ca6833b36263425c1f08807abeee2e413313..28abf6231fe75f848812b273c10f8b3b28a4fc1d 100644 (file)
@@ -73,11 +73,13 @@ subscription_link_service(th_subscription_t *s, service_t *t)
   s->ths_service = t;
   LIST_INSERT_HEAD(&t->s_subscriptions, s, ths_service_link);
 
-  tvhtrace("subscription", "%04X: linking sub %p to svc %p", shortid(s), s, t);
+  tvhtrace("subscription", "%04X: linking sub %p to svc %p type %i",
+           shortid(s), s, t, t->s_type);
 
   pthread_mutex_lock(&t->s_stream_mutex);
 
-  if(TAILQ_FIRST(&t->s_filt_components) != NULL) {
+  if(TAILQ_FIRST(&t->s_filt_components) != NULL ||
+     t->s_type != STYPE_STD) {
 
     streaming_msg_free(s->ths_start_message);
 
@@ -148,37 +150,6 @@ subscription_unlink_service(th_subscription_t *s, int reason)
   subscription_unlink_service0(s, reason, 1);
 }
 
-/*
- * Called from mpegts code
- */
-void
-subscription_unlink_mux(th_subscription_t *s, int reason)
-{
-  streaming_message_t *sm;
-  mpegts_mux_instance_t *mmi = s->ths_mmi;
-  mpegts_mux_t   *mm = mmi->mmi_mux;
-  mpegts_input_t *mi = mmi->mmi_input;
-
-  gtimer_disarm(&s->ths_receive_timer);
-
-  assert(mi);
-
-  pthread_mutex_lock(&mi->mi_output_lock);
-  s->ths_mmi = NULL;
-
-  if (!(s->ths_flags & SUBSCRIPTION_NONE))
-    streaming_target_disconnect(&mmi->mmi_streaming_pad, &s->ths_input);
-
-  sm = streaming_msg_create_code(SMT_STOP, reason);
-  streaming_target_deliver(s->ths_output, sm);
-
-  if (s->ths_flags & SUBSCRIPTION_FULLMUX)
-    mi->mi_close_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_NONE, s);
-  LIST_REMOVE(s, ths_mmi_link);
-
-  pthread_mutex_unlock(&mi->mi_output_lock);
-}
-
 /* **************************************************************************
  * Scheduling
  * *************************************************************************/
@@ -212,10 +183,9 @@ subscription_show_info(th_subscription_t *s)
   size_t buflen;
 
   s->ths_service->s_setsourceinfo(s->ths_service, &si);
-  snprintf(buf, sizeof(buf),
+  buflen = snprintf(buf, sizeof(buf),
           "\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", "
-          "network: \"%s\", mux: \"%s\", provider: \"%s\", "
-          "service: \"%s\"",
+          "network: \"%s\", mux: \"%s\", provider: \"%s\", service: \"%s\"",
           s->ths_title, ch ? channel_get_name(ch) : "none", s->ths_weight,
           si.si_adapter  ?: "<N/A>",
           si.si_network  ?: "<N/A>",
@@ -224,8 +194,12 @@ subscription_show_info(th_subscription_t *s)
           si.si_service  ?: "<N/A>");
   service_source_info_free(&si);
 
+  if (s->ths_prch && s->ths_prch->prch_pro)
+    buflen += snprintf(buf + buflen, sizeof(buf) - buflen,
+                       ", profile=\"%s\"",
+                       s->ths_prch->prch_pro->pro_name ?: "");
+
   if (s->ths_hostname) {
-    buflen = strlen(buf);
     snprintf(buf + buflen, sizeof(buf) - buflen,
              ", hostname=\"%s\", username=\"%s\", client=\"%s\"",
              s->ths_hostname ?: "<N/A>",
@@ -264,7 +238,6 @@ subscription_reschedule(void)
   lock_assert(&global_lock);
 
   LIST_FOREACH(s, &subscriptions, ths_global_link) {
-    if (s->ths_mmi) continue;
     if (!s->ths_service && !s->ths_channel) continue;
 
     /* Postpone the tuner decision */
@@ -312,6 +285,7 @@ subscription_reschedule(void)
       tvhtrace("subscription", "%04X: find instance for %s weight %d",
                shortid(s), s->ths_service->s_nicename, s->ths_weight);
     si = service_find_instance(s->ths_service, s->ths_channel,
+                               s->ths_source,
                                &s->ths_instances, &error, s->ths_weight,
                                s->ths_flags, s->ths_timeout,
                                dispatch_clock > s->ths_postpone_end ?
@@ -527,7 +501,7 @@ subscription_unsubscribe(th_subscription_t *s)
 
   LIST_REMOVE(s, ths_global_link);
 
-  if(s->ths_channel != NULL) {
+  if (s->ths_channel != NULL) {
     LIST_REMOVE(s, ths_channel_link);
     snprintf(buf, sizeof(buf), "\"%s\" unsubscribing from \"%s\"",
              s->ths_title, channel_get_name(s->ths_channel));
@@ -545,13 +519,13 @@ subscription_unsubscribe(th_subscription_t *s)
   }
   tvhlog(LOG_INFO, "subscription", "%04X: %s", shortid(s), buf);
 
-  if(t)
+  if (t) {
     service_remove_subscriber(t, s, SM_CODE_OK);
-
 #if ENABLE_MPEGTS
-  if(s->ths_mmi)
-    mpegts_mux_remove_subscriber(s->ths_mmi->mmi_mux, s, SM_CODE_OK);
+    if (t->s_type == STYPE_RAW)
+      LIST_REMOVE(s, ths_mux_link);
 #endif
+  }
 
   streaming_msg_free(s->ths_start_message);
 
@@ -640,26 +614,22 @@ subscription_create
  */
 static th_subscription_t *
 subscription_create_from_channel_or_service(profile_chain_t *prch,
+                                            tvh_input_t *ti,
                                             unsigned int weight,
                                             const char *name,
                                             int flags,
                                             const char *hostname,
                                             const char *username,
                                             const char *client,
-                                            int service)
+                                            service_t *service)
 {
   th_subscription_t *s;
   channel_t *ch = NULL;
-  service_t *t  = NULL;
 
   assert(prch);
   assert(prch->prch_id);
-  assert(prch->prch_st);
 
-
-  if (service)
-    t  = prch->prch_id;
-  else
+  if (!service)
     ch = prch->prch_id;
 
   s = subscription_create(prch, weight, name, flags, subscription_input,
@@ -671,26 +641,36 @@ subscription_create_from_channel_or_service(profile_chain_t *prch,
              shortid(s), channel_get_name(ch), weight, pro_name);
   else
     tvhtrace("subscription", "%04X: creating subscription for service %s weight %d sing profile %s",
-             shortid(s), t->s_nicename, weight, pro_name);
+             shortid(s), service->s_nicename, weight, pro_name);
 #endif
   s->ths_channel = ch;
-  s->ths_service = t;
+  s->ths_service = service;
+  s->ths_source  = ti;
   if (ch)
     LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
 
+#if ENABLE_MPEGTS
+  if (service && service->s_type == STYPE_RAW) {
+    mpegts_mux_t *mm = prch->prch_id;
+    LIST_INSERT_HEAD(&mm->mm_raw_subs, s, ths_mux_link);
+  }
+#endif
+
   subscription_reschedule();
   return s;
 }
 
 th_subscription_t *
 subscription_create_from_channel(profile_chain_t *prch,
+                                 tvh_input_t *ti,
                                  unsigned int weight,
                                 const char *name,
                                 int flags, const char *hostname,
                                 const char *username, const char *client)
 {
+  assert(prch->prch_st);
   return subscription_create_from_channel_or_service
-           (prch, weight, name, flags, hostname, username, client, 0);
+           (prch, ti, weight, name, flags, hostname, username, client, NULL);
 }
 
 /**
@@ -698,65 +678,24 @@ subscription_create_from_channel(profile_chain_t *prch,
  */
 th_subscription_t *
 subscription_create_from_service(profile_chain_t *prch,
+                                 tvh_input_t *ti,
                                  unsigned int weight,
                                  const char *name,
                                 int flags,
                                 const char *hostname, const char *username, 
                                 const char *client)
 {
+  assert(prch->prch_st);
   return subscription_create_from_channel_or_service
-           (prch, weight, name, flags, hostname, username, client, 1);
+           (prch, ti, weight, name, flags, hostname, username, client,
+            prch->prch_id);
 }
 
-/**
- *
- */
 /**
  *
  */
 #if ENABLE_MPEGTS
-// TODO: move this
-static void
-mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si )
-{
-  char buf[256];
-
-  /* Validate */
-  lock_assert(&global_lock);
-
-  /* Update */
-  if(mm->mm_network->mn_network_name != NULL)
-    si->si_network = strdup(mm->mm_network->mn_network_name);
-
-  mm->mm_display_name(mm, buf, sizeof(buf));
-  si->si_mux = strdup(buf);
-
-  if(mm->mm_active && mm->mm_active->mmi_input) {
-    mpegts_input_t *mi = mm->mm_active->mmi_input;
-    mi->mi_display_name(mi, buf, sizeof(buf));
-    si->si_adapter = strdup(buf);
-  }
-}
-
-static void
-mux_data_timeout ( void *aux )
-{
-  th_subscription_t *s = aux;
-
-  if (!s->ths_mmi)
-    return;
-
-  if (!s->ths_live) {
-    tvhwarn("subscription", "%04X: mux data timeout for %s", shortid(s), s->ths_title);
-    mpegts_mux_remove_subscriber(s->ths_mmi->mmi_mux, s, SM_CODE_NO_INPUT);
-    return;
-  }
-  s->ths_live = 0;
-
-  if (s->ths_timeout > 0)
-    gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, s->ths_timeout);
-}
-
+#include "input/mpegts.h"
 th_subscription_t *
 subscription_create_from_mux(profile_chain_t *prch,
                              tvh_input_t *ti,
@@ -765,81 +704,17 @@ subscription_create_from_mux(profile_chain_t *prch,
                              int flags,
                              const char *hostname,
                              const char *username,
-                             const char *client,
-                             int *err)
+                             const char *client)
 {
   mpegts_mux_t *mm = prch->prch_id;
-  th_subscription_t *s;
-  streaming_message_t *sm;
-  streaming_start_t *ss;
-  mpegts_input_t *mi;
-  int r;
-
-  /* Tune */
-  r = mm->mm_start(mm, (mpegts_input_t *)ti, name, weight, flags);
-  if (r) {
-    if (err) *err = r;
-    return NULL;
-  }
-
-  /* Create subscription */
-  if (!prch->prch_st)
-    flags |= SUBSCRIPTION_NONE;
-  s = subscription_create(prch, weight, name, flags, NULL,
-                          hostname, username, client);
-  s->ths_mmi = mm->mm_active;
+  mpegts_service_t *s = mpegts_service_create_raw(mm);
 
-  /* Install full mux handler */
-  mi = s->ths_mmi->mmi_input;
-  assert(mi);
-
-  pthread_mutex_lock(&mi->mi_output_lock);
-
-  if (s->ths_flags & SUBSCRIPTION_FULLMUX)
-    mi->mi_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_NONE, s);
-
-  /* Store */
-  LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
-
-  /* Connect */
-  if (prch->prch_st)
-    streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input);
-
-  /* Deliver a start message */
-  ss = calloc(1, sizeof(streaming_start_t));
-  ss->ss_num_components = 0;
-  ss->ss_refcount       = 1;
-    
-  mpegts_mux_setsourceinfo(mm, &ss->ss_si);
-  ss->ss_si.si_service = strdup("rawmux");
-
-  tvhinfo("subscription", 
-         "%04X: \"%s\" subscribing to mux, weight: %d, adapter: \"%s\", "
-         "network: \"%s\", mux: \"%s\", hostname: \"%s\", username: \"%s\", "
-         "client: \"%s\"",
-         shortid(s),
-         s->ths_title,
-          s->ths_weight,
-         ss->ss_si.si_adapter  ?: "<N/A>",
-         ss->ss_si.si_network  ?: "<N/A>",
-         ss->ss_si.si_mux      ?: "<N/A>",
-         hostname              ?: "<N/A>",
-         username              ?: "<N/A>",
-         client                ?: "<N/A>");
-
-  sm = streaming_msg_create_data(SMT_START, ss);
-  streaming_target_deliver(s->ths_output, sm);
-
-  r = (mi->mi_get_grace ? mi->mi_get_grace(mi, mm) : 0) + 20;
-  sm = streaming_msg_create_code(SMT_GRACE, r);
-  streaming_target_deliver(s->ths_output, sm);
-
-  pthread_mutex_unlock(&mi->mi_output_lock);
-
-  if (r > 0)
-    gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r);
+  if (!s)
+    return NULL;
 
-  return s;
+  return subscription_create_from_channel_or_service
+    (prch, ti, weight, name, flags, hostname, username, client,
+     (service_t *)s);
 }
 #endif
 
@@ -903,13 +778,6 @@ subscription_create_msg(th_subscription_t *s)
   else if(s->ths_dvrfile != NULL)
     htsmsg_add_str(m, "service", s->ths_dvrfile ?: "");
 
-  else if (s->ths_mmi != NULL && s->ths_mmi->mmi_mux != NULL) {
-    char buf[512];
-    mpegts_mux_t *mm = s->ths_mmi->mmi_mux;
-    mpegts_mux_nice_name(mm, buf, sizeof(buf));
-    htsmsg_add_str(m, "service", buf);
-  }
-  
   return m;
 }
 
@@ -961,7 +829,11 @@ subscription_init(void)
 void
 subscription_done(void)
 {
+  th_subscription_t *s;
+
   pthread_mutex_lock(&global_lock);
+  LIST_FOREACH(s, &subscriptions, ths_global_link)
+    subscription_unsubscribe(s);
   /* clear remaining subscriptions */
   subscription_reschedule();
   pthread_mutex_unlock(&global_lock);
@@ -1100,7 +972,7 @@ subscription_dummy_join(const char *id, int first)
   st = calloc(1, sizeof(*st));
   streaming_target_init(st, dummy_callback, NULL, 0);
   prch->prch_st = st;
-  s = subscription_create_from_service(prch, 1, "dummy", 0, NULL, NULL, "dummy");
+  s = subscription_create_from_service(prch, NULL, 1, "dummy", 0, NULL, NULL, "dummy");
 
   tvhlog(LOG_NOTICE, "subscription",
          "%04X: Dummy join %s ok", shortid(s), id);
index 7ef65dcf84b1bfd3a57acd7b45092fa7e46d500c..229bcc9b823291ad4eee0b6c2f238f51c51034fd 100644 (file)
@@ -27,13 +27,12 @@ extern struct th_subscription_list subscriptions;
 
 #define SUBSCRIPTION_RAW_MPEGTS 0x001
 #define SUBSCRIPTION_NONE       0x002
-#define SUBSCRIPTION_FULLMUX    0x004
-#define SUBSCRIPTION_STREAMING  0x008
-#define SUBSCRIPTION_RESTART    0x010
-#define SUBSCRIPTION_INITSCAN   0x020 ///< for mux subscriptions
-#define SUBSCRIPTION_IDLESCAN   0x040 ///< for mux subscriptions
-#define SUBSCRIPTION_USERSCAN   0x080 ///< for mux subscriptions
-#define SUBSCRIPTION_EPG        0x100 ///< for mux subscriptions
+#define SUBSCRIPTION_STREAMING  0x004
+#define SUBSCRIPTION_RESTART    0x008
+#define SUBSCRIPTION_INITSCAN   0x010 ///< for mux subscriptions
+#define SUBSCRIPTION_IDLESCAN   0x020 ///< for mux subscriptions
+#define SUBSCRIPTION_USERSCAN   0x040 ///< for mux subscriptions
+#define SUBSCRIPTION_EPG        0x080 ///< for mux subscriptions
 
 /* Some internal priorities */
 #define SUBSCRIPTION_PRIO_KEEP        1 ///< Keep input rolling
@@ -75,6 +74,8 @@ typedef struct th_subscription {
   struct service *ths_service;   /* if NULL, ths_service_link
                                           is not linked */
 
+  struct tvh_input *ths_source;  /* if NULL, all sources are allowed */
+
   char *ths_title; /* display title */
   time_t ths_start;  /* time when subscription started */
   int ths_total_err; /* total errors during entire subscription */
@@ -110,15 +111,10 @@ typedef struct th_subscription {
   int    ths_postpone;
   time_t ths_postpone_end;
 
-#if ENABLE_MPEGTS
-  // Note: its a bit ugly linking MPEG-TS code directly here, but to do
-  //       otherwise would probably require adding lots of additional
-  //       (repeated) logic elsewhere
-  LIST_ENTRY(th_subscription) ths_mmi_link;
-  struct mpegts_mux_instance *ths_mmi;
-  gtimer_t ths_receive_timer;
-  uint8_t ths_live;
-#endif
+  /*
+   * MPEG-TS mux chain
+   */
+  LIST_ENTRY(th_subscription) ths_mux_link;
 
 } th_subscription_t;
 
@@ -138,6 +134,7 @@ void subscription_reschedule(void);
 
 th_subscription_t *
 subscription_create_from_channel(struct profile_chain *prch,
+                                 struct tvh_input *ti,
                                 unsigned int weight,
                                 const char *name,
                                 int flags,
@@ -148,6 +145,7 @@ subscription_create_from_channel(struct profile_chain *prch,
 
 th_subscription_t *
 subscription_create_from_service(struct profile_chain *prch,
+                                 struct tvh_input *ti,
                                  unsigned int weight,
                                 const char *name,
                                 int flags,
@@ -165,7 +163,7 @@ subscription_create_from_mux(struct profile_chain *prch,
                              int flags,
                              const char *hostname,
                              const char *username,
-                             const char *client, int *err);
+                             const char *client);
 #endif
 
 th_subscription_t *subscription_create(struct profile_chain *prch,
@@ -188,8 +186,6 @@ void subscription_stop(th_subscription_t *s);
 
 void subscription_unlink_service(th_subscription_t *s, int reason);
 
-void subscription_unlink_mux(th_subscription_t *s, int reason);
-
 void subscription_dummy_join(const char *id, int first);
 
 
index 5c77108fd1accf3a140dfd4cd8cf25c5c525e5d9..04a18c4afed802b0beb212c75a2f99b07beb1a8b 100644 (file)
@@ -215,7 +215,8 @@ int get_device_connection(const char *dev);
 typedef enum {
   SCT_NONE = -1,
   SCT_UNKNOWN = 0,
-  SCT_MPEG2VIDEO = 1,
+  SCT_RAW = 1,
+  SCT_MPEG2VIDEO,
   SCT_MPEG2AUDIO,
   SCT_H264,
   SCT_AC3,
index d04cd821790c43b4a42a9dab4da16d35623632c9..66494e9cf30bb9a0a79fc224f7b826ad3a84ca70 100644 (file)
@@ -259,7 +259,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch,
   while(!hc->hc_shutdown && run && tvheadend_running) {
     pthread_mutex_lock(&sq->sq_mutex);
     sm = TAILQ_FIRST(&sq->sq_queue);
-    if(sm == NULL) {      
+    if(sm == NULL) {
       gettimeofday(&tp, NULL);
       ts.tv_sec  = tp.tv_sec + 1;
       ts.tv_nsec = tp.tv_usec * 1000;
@@ -769,7 +769,7 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
 
     tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
 
-    s = subscription_create_from_service(&prch, weight ?: 100, "HTTP",
+    s = subscription_create_from_service(&prch, NULL, weight ?: 100, "HTTP",
                                          prch.prch_flags | SUBSCRIPTION_STREAMING,
                                          addrbuf,
                                         hc->hc_username,
@@ -801,11 +801,12 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
   th_subscription_t *s;
   profile_chain_t prch;
   size_t qsize;
-  const char *name;
+  const char *name, *str;
   char addrbuf[50];
   void *tcp_id;
-  const char *str;
-  int res = HTTP_STATUS_SERVICE;
+  char *p, *saveptr;
+  mpegts_apids_t pids;
+  int res = HTTP_STATUS_SERVICE, i;
 
   if(http_access_verify(hc, ACCESS_ADVANCED_STREAMING))
     return HTTP_STATUS_UNAUTHORIZED;
@@ -818,21 +819,46 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
   else
     qsize = 10000000;
 
+  mpegts_pid_init(&pids, NULL, 0);
+  if ((str = http_arg_get(&hc->hc_req_args, "pids"))) {
+    p = tvh_strdupa(str);
+    p = strtok_r(p, ",", &saveptr);
+    while (p) {
+      if (strcmp(p, "all") == 0) {
+        pids.all = 1;
+      } else {
+        i = atoi(p);
+        if (i < 0 || i > 8192)
+          return HTTP_STATUS_BAD_REQUEST;
+        if (i == 8192)
+          pids.all = 1;
+        else
+          mpegts_pid_add(&pids, i);
+      }
+      p = strtok_r(NULL, ",", &saveptr);
+    }
+    if (!pids.all && pids.count <= 0)
+      return HTTP_STATUS_BAD_REQUEST;
+  } else {
+    pids.all = 1;
+  }
+
   if (!profile_chain_raw_open(&prch, mm, qsize)) {
 
     tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
 
     s = subscription_create_from_mux(&prch, NULL, weight ?: 10, "HTTP",
                                      prch.prch_flags |
-                                     SUBSCRIPTION_FULLMUX |
                                      SUBSCRIPTION_STREAMING,
                                      addrbuf, hc->hc_username,
-                                     http_arg_get(&hc->hc_args, "User-Agent"), NULL);
+                                     http_arg_get(&hc->hc_args, "User-Agent"));
     if (s) {
       name = tvh_strdupa(s->ths_title);
-      pthread_mutex_unlock(&global_lock);
-      http_stream_run(hc, &prch, name, s);
-      pthread_mutex_lock(&global_lock);
+      if (s->ths_service->s_update_pids(s->ths_service, &pids) == 0) {
+        pthread_mutex_unlock(&global_lock);
+        http_stream_run(hc, &prch, name, s);
+        pthread_mutex_lock(&global_lock);
+      }
       subscription_unsubscribe(s);
       res = 0;
     }
@@ -882,7 +908,8 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
 
     tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
 
-    s = subscription_create_from_channel(&prch, weight ?: 100, "HTTP",
+    s = subscription_create_from_channel(&prch,
+                 NULL, weight ?: 100, "HTTP",
                  prch.prch_flags | SUBSCRIPTION_STREAMING,
                  addrbuf, hc->hc_username,
                  http_arg_get(&hc->hc_args, "User-Agent"));