]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
SAT>IP Server: Finish service descrambling
authorJaroslav Kysela <perex@perex.cz>
Fri, 13 Mar 2015 18:18:46 +0000 (19:18 +0100)
committerJaroslav Kysela <perex@perex.cz>
Fri, 13 Mar 2015 18:26:07 +0000 (19:26 +0100)
src/descrambler/descrambler.c
src/idnode.c
src/idnode.h
src/input/mpegts.h
src/input/mpegts/mpegts_service.c
src/input/mpegts/tsdemux.c
src/queue.h
src/satip/rtsp.c

index 850b0af1e997a7a9edd8c2220dcecad5aa9d5936..8918fd6bfa2672a8482296e575457e6bbe52ba60 100644 (file)
@@ -21,6 +21,7 @@
 #include "caclient.h"
 #include "ffdecsa/FFdecsa.h"
 #include "input.h"
+#include "input/mpegts/tsdemux.h"
 
 struct caid_tab {
   const char *name;
@@ -375,8 +376,19 @@ descrambler_descramble ( service_t *t,
 
   lock_assert(&t->s_stream_mutex);
 
-  if (dr == NULL)
+  if (dr == NULL) {
+    if ((tsb[3] & 0x80) == 0) {
+      ts_recv_packet2((mpegts_service_t *)t, tsb);
+      return 1;
+    }
     return -1;
+  }
+
+  if (dr->dr_csa.csa_type == DESCRAMBLER_NONE && dr->dr_buf.sb_ptr == 0)
+    if ((tsb[3] & 0x80) == 0) {
+      ts_recv_packet2((mpegts_service_t *)t, tsb);
+      return 1;
+    }
 
   count = failed = resolved = 0;
   LIST_FOREACH(td, &t->s_descramblers, td_service_link) {
index f417be4fe20346d65d11a20aa5ecfb764c48949b..fc91ff6776d25c387073495d0ef2aee62e26f95a 100644 (file)
@@ -1011,7 +1011,7 @@ idnode_set_find_index
   return -1;
 }
 
-void
+int
 idnode_set_remove
   ( idnode_set_t *is, idnode_t *in )
 {
@@ -1020,7 +1020,9 @@ idnode_set_remove
     memmove(&is->is_array[i], &is->is_array[i+1],
             (is->is_count - i - 1) * sizeof(idnode_t *));
     is->is_count--;
+    return 1;
   }
+  return 0;
 }
 
 void
index e59366fae2e20bb0958707eb2dd5083b51f2875b..9cc8935be2509f9f6912c3fa438e7d884619ebe1 100644 (file)
@@ -203,7 +203,7 @@ static inline idnode_set_t * idnode_set_create(int sorted)
     is->is_sorted = sorted; return is; }
 void idnode_set_add
   ( idnode_set_t *is, idnode_t *in, idnode_filter_t *filt );
-void idnode_set_remove ( idnode_set_t *is, idnode_t *in );
+int idnode_set_remove ( idnode_set_t *is, idnode_t *in );
 ssize_t idnode_set_find_index( idnode_set_t *is, idnode_t *in );
 static inline int idnode_set_exists ( idnode_set_t *is, idnode_t *in )
   { return idnode_set_find_index(is, in) >= 0; }
index 78c16cf2ebba2bf5c229b301407e4665fd6252d8..8246d4350c2cd04d1956fd4991b0c1d628c291a1 100644 (file)
@@ -977,6 +977,9 @@ mpegts_service_t *mpegts_service_create_raw(mpegts_mux_t *mm);
 mpegts_service_t *mpegts_service_find 
   ( mpegts_mux_t *mm, uint16_t sid, uint16_t pmt_pid, int create, int *save );
 
+mpegts_service_t *
+mpegts_service_find_by_pid ( mpegts_mux_t *mm, int pid );
+
 static inline mpegts_service_t *mpegts_service_find_by_uuid(const char *uuid)
   { return idnode_find(uuid, &mpegts_service_class, NULL); }
 
index f003d3f79d078940fe5279661c2e21654783bd77..29dd0dd9426f7f5aae91b44a88dd6bf3e5e734d1 100644 (file)
@@ -633,6 +633,31 @@ mpegts_service_find
   return s;
 }
 
+/*
+ * Find PID
+ */
+mpegts_service_t *
+mpegts_service_find_by_pid ( mpegts_mux_t *mm, int pid )
+{
+  mpegts_service_t *s;
+
+  lock_assert(&global_lock);
+
+  /* Find existing service */
+  LIST_FOREACH(s, &mm->mm_services, s_dvb_mux_link) {
+    pthread_mutex_lock(&s->s_stream_mutex);
+    if (pid == s->s_pmt_pid || pid == s->s_pcr_pid)
+      goto ok;
+    if (service_stream_find((service_t *)s, pid))
+      goto ok;
+    pthread_mutex_unlock(&s->s_stream_mutex);
+  }
+  return NULL;
+ok:
+  pthread_mutex_unlock(&s->s_stream_mutex);
+  return s;
+}
+
 /*
  * Raw MPEGTS Service
  */
@@ -713,7 +738,6 @@ static int
 mpegts_service_link ( mpegts_service_t *master, mpegts_service_t *slave )
 {
   pthread_mutex_lock(&master->s_stream_mutex);
-  assert(slave->s_status == SERVICE_IDLE);
   LIST_INSERT_HEAD(&slave->s_masters, master, s_masters_link);
   LIST_INSERT_HEAD(&master->s_slaves, slave, s_slaves_link);
   pthread_mutex_unlock(&master->s_stream_mutex);
@@ -724,7 +748,6 @@ static int
 mpegts_service_unlink ( mpegts_service_t *master, mpegts_service_t *slave )
 {
   pthread_mutex_lock(&master->s_stream_mutex);
-  assert(slave->s_status == SERVICE_IDLE);
   LIST_SAFE_REMOVE(master, s_masters_link);
   LIST_SAFE_REMOVE(slave, s_slaves_link);
   pthread_mutex_unlock(&master->s_stream_mutex);
index 21511bd2243b51e05b84b5f9cc52109f47a0ed74..2db7720f3f672a02b1125e5900fc826683b1a58c 100644 (file)
@@ -86,32 +86,25 @@ ts_recv_packet0
     ts_remux(t, tsb, error);
 
   LIST_FOREACH(m, &t->s_masters, s_masters_link) {
-    pthread_mutex_lock(&t->s_stream_mutex);
-    if(streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS))
-      ts_remux(t, tsb, error);
-    pthread_mutex_unlock(&t->s_stream_mutex);
+    pthread_mutex_lock(&m->s_stream_mutex);
+    if(streaming_pad_probe_type(&m->s_streaming_pad, SMT_MPEGTS))
+      ts_remux(m, tsb, error);
+    pthread_mutex_unlock(&m->s_stream_mutex);
   }
 
   off = tsb[3] & 0x20 ? tsb[4] + 5 : 4;
 
-  switch(st->es_type) {
-
-  case SCT_CA:
-    break;
+  if (st->es_type == SCT_CA)
+    return;
 
-  default:
-    if(!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET))
-      break;
+  if(!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET))
+    return;
 
-    if(st->es_type == SCT_TELETEXT)
-      teletext_input(t, st, tsb);
-    if(off > 188)
-      break;
+  if(st->es_type == SCT_TELETEXT)
+    teletext_input(t, st, tsb);
 
-    if(t->s_status == SERVICE_RUNNING)
-      parse_mpeg_ts((service_t*)t, st, tsb + off, 188 - off, pusi, error);
-    break;
-  }
+  if(off <= 188 && t->s_status == SERVICE_RUNNING)
+    parse_mpeg_ts((service_t*)t, st, tsb + off, 188 - off, pusi, error);
 }
 
 /**
index 966836b7f0217872e2e3dfe4966ec0484994e355..5a918f2c07a715bf3a133d3187ab8676b9375123 100644 (file)
  * Complete missing LIST-ops
  */
 
+#ifndef LIST_ENTRY_INIT
+#define LIST_ENTRY_INIT(elm, field) \
+        (elm)->field.le_next = NULL, (elm)->field.le_prev = NULL
+#endif
+
 #ifndef LIST_FOREACH
 #define        LIST_FOREACH(var, head, field)                                  \
        for ((var) = ((head)->lh_first);                                \
 
 #ifndef LIST_SAFE_REMOVE
 #define LIST_SAFE_REMOVE(elm, field) \
-        if ((elm)->field.le_next != NULL || (elm)->field.le_prev != NULL) \
-                LIST_REMOVE(elm, field)
+        if ((elm)->field.le_next != NULL || (elm)->field.le_prev != NULL) { \
+                LIST_REMOVE(elm, field); \
+                LIST_ENTRY_INIT(elm, field); \
+        }
 #endif
 
 #ifndef LIST_INSERT_BEFORE
index 05c4bdb84d6ef481850aaa75d5799b8f782c62b5..390b0773de315770de54fef7a4d23b3a034ce826 100644 (file)
 #define RTP_BUFSIZE  (256*1024)
 #define RTCP_BUFSIZE (16*1024)
 
+typedef struct slave_subscription {
+  LIST_ENTRY(slave_subscription) link;
+  mpegts_service_t *service;
+  th_subscription_t *ths;
+  profile_chain_t prch;
+} slave_subscription_t;
+
 typedef struct session {
   TAILQ_ENTRY(session) link;
   int delsys;
@@ -49,6 +56,7 @@ typedef struct session {
   int rtp_peer_port;
   udp_connection_t *udp_rtp;
   udp_connection_t *udp_rtcp;
+  LIST_HEAD(, slave_subscription) slaves;
 } session_t;
 
 static uint32_t session_number;
@@ -110,7 +118,7 @@ rtsp_new_session(int delsys, uint32_t nsession, int session)
 
   if (rs == NULL)
     return NULL;
-  rs->delsys = delsys;
+
   rs->nsession = nsession ?: session_number;
   snprintf(rs->session, sizeof(rs->session), "%08X", session_number);
   if (nsession) {
@@ -220,13 +228,82 @@ rtsp_parse_args(http_connection_t *hc, char *u)
   return stream;
 }
 
+/*
+ *
+ */
+static void
+rtsp_slave_add
+  (session_t *rs, mpegts_service_t *master, mpegts_service_t *slave)
+{
+  char buf[128];
+  slave_subscription_t *sub = calloc(1, sizeof(*sub));
+
+  pthread_mutex_lock(&master->s_stream_mutex);
+  if (master->s_slaves_pids == NULL)
+    master->s_slaves_pids = mpegts_pid_alloc();
+  pthread_mutex_unlock(&master->s_stream_mutex);
+  master->s_link(master, slave);
+  sub->service = slave;
+  profile_chain_init(&sub->prch, NULL, NULL);
+  sub->prch.prch_st = &sub->prch.prch_sq.sq_st;
+  sub->prch.prch_id = slave;
+  snprintf(buf, sizeof(buf), "SAT>IP Slave/%s", slave->s_nicename);
+  sub->ths = subscription_create_from_service(&sub->prch, NULL,
+                                              SUBSCRIPTION_NONE,
+                                              buf, 0, NULL, NULL,
+                                              buf, NULL);
+  if (sub->ths == NULL) {
+    tvherror("satips", "%i/%s/%i: unable to subscribe service %s\n",
+             rs->frontend, rs->session, rs->stream, slave->s_nicename);
+    profile_chain_close(&sub->prch);
+    free(sub);
+    master->s_unlink(master, slave);
+  } else {
+    LIST_INSERT_HEAD(&rs->slaves, sub, link);
+    tvhdebug("satips", "%i/%s/%i: slave service %s subscribed",
+             rs->frontend, rs->session, rs->stream, slave->s_nicename);
+  }
+}
+
+/*
+ *
+ */
+static void
+rtsp_slave_remove
+  (session_t *rs, mpegts_service_t *master, mpegts_service_t *slave)
+{
+  slave_subscription_t *sub;
+
+  if (master == NULL)
+    return;
+  LIST_FOREACH(sub, &rs->slaves, link)
+    if (sub->service == slave)
+      break;
+  if (sub == NULL)
+    return;
+  tvhdebug("satips", "%i/%s/%i: slave service %s unsubscribed",
+          rs->frontend, rs->session, rs->stream, slave->s_nicename);
+  master->s_unlink(master, slave);
+  if (sub->ths)
+    subscription_unsubscribe(sub->ths, 0);
+  if (sub->prch.prch_id)
+    profile_chain_close(&sub->prch);
+  LIST_REMOVE(sub, link);
+  free(sub);
+}
+
 /*
  *
  */
 static void
 rtsp_clean(session_t *rs)
 {
+  slave_subscription_t *sub;
+
   if (rs->subs) {
+    while ((sub = LIST_FIRST(&rs->slaves)) != NULL)
+      rtsp_slave_remove(rs, (mpegts_service_t *)rs->subs->ths_service,
+                        sub->service);
     subscription_unsubscribe(rs->subs, 0);
     rs->subs = NULL;
   }
@@ -238,6 +315,84 @@ rtsp_clean(session_t *rs)
   rs->mux_created = 0;
 }
 
+/*
+ *
+ */
+static int
+rtsp_validate_service(mpegts_service_t *s)
+{
+  elementary_stream_t *st;
+
+  pthread_mutex_lock(&s->s_stream_mutex);
+  if (s->s_pmt_pid <= 0 || s->s_pcr_pid <= 0) {
+    pthread_mutex_unlock(&s->s_stream_mutex);
+    return 0;
+  }
+  TAILQ_FOREACH(st, &s->s_components, es_link)
+    if (st->es_pid > 0 &&
+        (SCT_ISVIDEO(st->es_type) || SCT_ISAUDIO(st->es_type)))
+      break;
+  pthread_mutex_unlock(&s->s_stream_mutex);
+  return st != NULL;
+}
+
+/*
+ *
+ */
+static void
+rtsp_manage_descramble(session_t *rs)
+{
+  idnode_set_t *found;
+  mpegts_service_t *s, *snext;
+  mpegts_service_t *master = (mpegts_service_t *)rs->subs->ths_service;
+  size_t si;
+  int i, used = 0;
+
+  if (rtsp_descramble <= 0)
+    return;
+
+  found = idnode_set_create(1);
+
+  if (rs->mux == NULL || rs->subs == NULL)
+    goto end;
+
+  if (rs->pids.all) {
+    LIST_FOREACH(s, &rs->mux->mm_services, s_dvb_mux_link)
+      if (rtsp_validate_service(s))
+        idnode_set_add(found, &s->s_id, NULL);
+  } else {
+    for (i = 0; i < rs->pids.count; i++) {
+      s = mpegts_service_find_by_pid((mpegts_mux_t *)rs->mux, rs->pids.pids[i]);
+      if (s != NULL && rtsp_validate_service(s))
+        if (!idnode_set_exists(found, &s->s_id))
+          idnode_set_add(found, &s->s_id, NULL);
+    }
+  }
+
+  /* Remove already used or no-longer required services */
+  for (s = LIST_FIRST(&master->s_slaves); s; s = snext) {
+    snext = LIST_NEXT(s, s_slaves_link);
+    if (idnode_set_remove(found, &s->s_id))
+      used++;
+    else if (!idnode_set_exists(found, &s->s_id))
+      rtsp_slave_remove(rs, master, s);
+  }
+
+  /* Add new ones */
+  for (si = 0; used < rtsp_descramble && si < found->is_count; si++, used++) {
+    s = (mpegts_service_t *)found->is_array[si];
+    rtsp_slave_add(rs, master, s);
+    idnode_set_remove(found, &s->s_id);
+  }
+  if (si < found->is_count)
+    tvhwarn("satips", "%i/%s/%i: limit for descrambled services reached (wanted %zd allowed %d)",
+            rs->frontend, rs->session, rs->stream,
+            (used - si) + found->is_count, rtsp_descramble);
+  
+end:
+  idnode_set_free(found);
+}
+
 /*
  *
  */
@@ -322,6 +477,7 @@ pids:
     svc->s_update_pids(svc, &rs->pids);
     rs->run = 1;
   }
+  rtsp_manage_descramble(rs);
   pthread_mutex_unlock(&global_lock);
   return 0;
 
@@ -334,7 +490,6 @@ endclean:
   return res;
 }
 
-
 /*
  *
  */
@@ -957,7 +1112,8 @@ play:
   }
 
   if (setup)
-    tvhdebug("satips", "setup from %s:%d, RTP: %d, RTCP: %d, pids ",
+    tvhdebug("satips", "%i/%s/%d: setup from %s:%d, RTP: %d, RTCP: %d, pids ",
+             rs->frontend, rs->session, rs->stream,
              addrbuf, IP_PORT(*hc->hc_peer),
              rs->rtp_peer_port, rs->rtp_peer_port + 1);