return s;
}
+/*
+ * Find PID
+ */
+mpegts_service_t *
+mpegts_service_find_by_pid ( mpegts_mux_t *mm, int pid )
+{
+ mpegts_service_t *s;
+
+ lock_assert(&global_lock);
+
+ /* Find existing service */
+ LIST_FOREACH(s, &mm->mm_services, s_dvb_mux_link) {
+ pthread_mutex_lock(&s->s_stream_mutex);
+ if (pid == s->s_pmt_pid || pid == s->s_pcr_pid)
+ goto ok;
+ if (service_stream_find((service_t *)s, pid))
+ goto ok;
+ pthread_mutex_unlock(&s->s_stream_mutex);
+ }
+ return NULL;
+ok:
+ pthread_mutex_unlock(&s->s_stream_mutex);
+ return s;
+}
+
/*
* Raw MPEGTS Service
*/
mpegts_service_link ( mpegts_service_t *master, mpegts_service_t *slave )
{
pthread_mutex_lock(&master->s_stream_mutex);
- assert(slave->s_status == SERVICE_IDLE);
LIST_INSERT_HEAD(&slave->s_masters, master, s_masters_link);
LIST_INSERT_HEAD(&master->s_slaves, slave, s_slaves_link);
pthread_mutex_unlock(&master->s_stream_mutex);
mpegts_service_unlink ( mpegts_service_t *master, mpegts_service_t *slave )
{
pthread_mutex_lock(&master->s_stream_mutex);
- assert(slave->s_status == SERVICE_IDLE);
LIST_SAFE_REMOVE(master, s_masters_link);
LIST_SAFE_REMOVE(slave, s_slaves_link);
pthread_mutex_unlock(&master->s_stream_mutex);
ts_remux(t, tsb, error);
LIST_FOREACH(m, &t->s_masters, s_masters_link) {
- pthread_mutex_lock(&t->s_stream_mutex);
- if(streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS))
- ts_remux(t, tsb, error);
- pthread_mutex_unlock(&t->s_stream_mutex);
+ pthread_mutex_lock(&m->s_stream_mutex);
+ if(streaming_pad_probe_type(&m->s_streaming_pad, SMT_MPEGTS))
+ ts_remux(m, tsb, error);
+ pthread_mutex_unlock(&m->s_stream_mutex);
}
off = tsb[3] & 0x20 ? tsb[4] + 5 : 4;
- switch(st->es_type) {
-
- case SCT_CA:
- break;
+ if (st->es_type == SCT_CA)
+ return;
- default:
- if(!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET))
- break;
+ if(!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET))
+ return;
- if(st->es_type == SCT_TELETEXT)
- teletext_input(t, st, tsb);
- if(off > 188)
- break;
+ if(st->es_type == SCT_TELETEXT)
+ teletext_input(t, st, tsb);
- if(t->s_status == SERVICE_RUNNING)
- parse_mpeg_ts((service_t*)t, st, tsb + off, 188 - off, pusi, error);
- break;
- }
+ if(off <= 188 && t->s_status == SERVICE_RUNNING)
+ parse_mpeg_ts((service_t*)t, st, tsb + off, 188 - off, pusi, error);
}
/**
#define RTP_BUFSIZE (256*1024)
#define RTCP_BUFSIZE (16*1024)
+typedef struct slave_subscription {
+ LIST_ENTRY(slave_subscription) link;
+ mpegts_service_t *service;
+ th_subscription_t *ths;
+ profile_chain_t prch;
+} slave_subscription_t;
+
typedef struct session {
TAILQ_ENTRY(session) link;
int delsys;
int rtp_peer_port;
udp_connection_t *udp_rtp;
udp_connection_t *udp_rtcp;
+ LIST_HEAD(, slave_subscription) slaves;
} session_t;
static uint32_t session_number;
if (rs == NULL)
return NULL;
- rs->delsys = delsys;
+
rs->nsession = nsession ?: session_number;
snprintf(rs->session, sizeof(rs->session), "%08X", session_number);
if (nsession) {
return stream;
}
+/*
+ *
+ */
+static void
+rtsp_slave_add
+ (session_t *rs, mpegts_service_t *master, mpegts_service_t *slave)
+{
+ char buf[128];
+ slave_subscription_t *sub = calloc(1, sizeof(*sub));
+
+ pthread_mutex_lock(&master->s_stream_mutex);
+ if (master->s_slaves_pids == NULL)
+ master->s_slaves_pids = mpegts_pid_alloc();
+ pthread_mutex_unlock(&master->s_stream_mutex);
+ master->s_link(master, slave);
+ sub->service = slave;
+ profile_chain_init(&sub->prch, NULL, NULL);
+ sub->prch.prch_st = &sub->prch.prch_sq.sq_st;
+ sub->prch.prch_id = slave;
+ snprintf(buf, sizeof(buf), "SAT>IP Slave/%s", slave->s_nicename);
+ sub->ths = subscription_create_from_service(&sub->prch, NULL,
+ SUBSCRIPTION_NONE,
+ buf, 0, NULL, NULL,
+ buf, NULL);
+ if (sub->ths == NULL) {
+ tvherror("satips", "%i/%s/%i: unable to subscribe service %s\n",
+ rs->frontend, rs->session, rs->stream, slave->s_nicename);
+ profile_chain_close(&sub->prch);
+ free(sub);
+ master->s_unlink(master, slave);
+ } else {
+ LIST_INSERT_HEAD(&rs->slaves, sub, link);
+ tvhdebug("satips", "%i/%s/%i: slave service %s subscribed",
+ rs->frontend, rs->session, rs->stream, slave->s_nicename);
+ }
+}
+
+/*
+ *
+ */
+static void
+rtsp_slave_remove
+ (session_t *rs, mpegts_service_t *master, mpegts_service_t *slave)
+{
+ slave_subscription_t *sub;
+
+ if (master == NULL)
+ return;
+ LIST_FOREACH(sub, &rs->slaves, link)
+ if (sub->service == slave)
+ break;
+ if (sub == NULL)
+ return;
+ tvhdebug("satips", "%i/%s/%i: slave service %s unsubscribed",
+ rs->frontend, rs->session, rs->stream, slave->s_nicename);
+ master->s_unlink(master, slave);
+ if (sub->ths)
+ subscription_unsubscribe(sub->ths, 0);
+ if (sub->prch.prch_id)
+ profile_chain_close(&sub->prch);
+ LIST_REMOVE(sub, link);
+ free(sub);
+}
+
/*
*
*/
static void
rtsp_clean(session_t *rs)
{
+ slave_subscription_t *sub;
+
if (rs->subs) {
+ while ((sub = LIST_FIRST(&rs->slaves)) != NULL)
+ rtsp_slave_remove(rs, (mpegts_service_t *)rs->subs->ths_service,
+ sub->service);
subscription_unsubscribe(rs->subs, 0);
rs->subs = NULL;
}
rs->mux_created = 0;
}
+/*
+ *
+ */
+static int
+rtsp_validate_service(mpegts_service_t *s)
+{
+ elementary_stream_t *st;
+
+ pthread_mutex_lock(&s->s_stream_mutex);
+ if (s->s_pmt_pid <= 0 || s->s_pcr_pid <= 0) {
+ pthread_mutex_unlock(&s->s_stream_mutex);
+ return 0;
+ }
+ TAILQ_FOREACH(st, &s->s_components, es_link)
+ if (st->es_pid > 0 &&
+ (SCT_ISVIDEO(st->es_type) || SCT_ISAUDIO(st->es_type)))
+ break;
+ pthread_mutex_unlock(&s->s_stream_mutex);
+ return st != NULL;
+}
+
+/*
+ *
+ */
+static void
+rtsp_manage_descramble(session_t *rs)
+{
+ idnode_set_t *found;
+ mpegts_service_t *s, *snext;
+ mpegts_service_t *master = (mpegts_service_t *)rs->subs->ths_service;
+ size_t si;
+ int i, used = 0;
+
+ if (rtsp_descramble <= 0)
+ return;
+
+ found = idnode_set_create(1);
+
+ if (rs->mux == NULL || rs->subs == NULL)
+ goto end;
+
+ if (rs->pids.all) {
+ LIST_FOREACH(s, &rs->mux->mm_services, s_dvb_mux_link)
+ if (rtsp_validate_service(s))
+ idnode_set_add(found, &s->s_id, NULL);
+ } else {
+ for (i = 0; i < rs->pids.count; i++) {
+ s = mpegts_service_find_by_pid((mpegts_mux_t *)rs->mux, rs->pids.pids[i]);
+ if (s != NULL && rtsp_validate_service(s))
+ if (!idnode_set_exists(found, &s->s_id))
+ idnode_set_add(found, &s->s_id, NULL);
+ }
+ }
+
+ /* Remove already used or no-longer required services */
+ for (s = LIST_FIRST(&master->s_slaves); s; s = snext) {
+ snext = LIST_NEXT(s, s_slaves_link);
+ if (idnode_set_remove(found, &s->s_id))
+ used++;
+ else if (!idnode_set_exists(found, &s->s_id))
+ rtsp_slave_remove(rs, master, s);
+ }
+
+ /* Add new ones */
+ for (si = 0; used < rtsp_descramble && si < found->is_count; si++, used++) {
+ s = (mpegts_service_t *)found->is_array[si];
+ rtsp_slave_add(rs, master, s);
+ idnode_set_remove(found, &s->s_id);
+ }
+ if (si < found->is_count)
+ tvhwarn("satips", "%i/%s/%i: limit for descrambled services reached (wanted %zd allowed %d)",
+ rs->frontend, rs->session, rs->stream,
+ (used - si) + found->is_count, rtsp_descramble);
+
+end:
+ idnode_set_free(found);
+}
+
/*
*
*/
svc->s_update_pids(svc, &rs->pids);
rs->run = 1;
}
+ rtsp_manage_descramble(rs);
pthread_mutex_unlock(&global_lock);
return 0;
return res;
}
-
/*
*
*/
}
if (setup)
- tvhdebug("satips", "setup from %s:%d, RTP: %d, RTCP: %d, pids ",
+ tvhdebug("satips", "%i/%s/%d: setup from %s:%d, RTP: %d, RTCP: %d, pids ",
+ rs->frontend, rs->session, rs->stream,
addrbuf, IP_PORT(*hc->hc_peer),
rs->rtp_peer_port, rs->rtp_peer_port + 1);