return;
}
- de->de_s = subscription_create_from_channel(prch, weight,
+ de->de_s = subscription_create_from_channel(prch, NULL, weight,
buf, prch->prch_flags,
NULL, NULL, NULL);
if (de->de_s == NULL) {
tvhdebug("htsp", "%s - subscribe to %s using profile %s\n",
htsp->htsp_logname, channel_get_name(ch), pro->pro_name ?: "");
- hs->hs_s = subscription_create_from_channel(&hs->hs_prch, weight,
+ hs->hs_s = subscription_create_from_channel(&hs->hs_prch, NULL, weight,
htsp->htsp_logname,
SUBSCRIPTION_STREAMING,
htsp->htsp_peername,
typedef struct mpegts_pid_sub
{
RB_ENTRY(mpegts_pid_sub) mps_link;
- LIST_ENTRY(mpegts_pid_sub) mps_svc_link;
-#define MPS_NONE 0x0
-#define MPS_STREAM 0x1
-#define MPS_TABLE 0x2
-#define MPS_FTABLE 0x4
-#define MPS_SERVICE 0x8
+ LIST_ENTRY(mpegts_pid_sub) mps_svcraw_link;
+#define MPS_NONE 0x00
+#define MPS_ALL 0x01
+#define MPS_RAW 0x02
+#define MPS_STREAM 0x04
+#define MPS_SERVICE 0x08
+#define MPS_TABLE 0x10
+#define MPS_FTABLE 0x20
int mps_type;
void *mps_owner;
} mpegts_pid_sub_t;
LIST_HEAD(, mpegts_mux_instance) mm_instances;
mpegts_mux_instance_t *mm_active;
+ /*
+ * Raw subscriptions
+ */
+
+ LIST_HEAD(, th_subscription) mm_raw_subs;
+
/*
* Data processing
*/
RB_HEAD(, mpegts_pid) mm_pids;
+ LIST_HEAD(, mpegts_pid_sub) mm_all_subs;
int mm_last_pid;
mpegts_pid_t *mm_last_mp;
mpegts_mux_t *mmi_mux;
mpegts_input_t *mmi_input;
- LIST_HEAD(,th_subscription) mmi_subs;
-
tvh_input_stream_stats_t mmi_stats;
int mmi_tune_failed;
htsmsg_t *mpegts_input_class_network_enum ( void *o );
char *mpegts_input_class_network_rend ( void *o );
+int mpegts_mps_cmp( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b );
+
void mpegts_network_register_builder
( const idclass_t *idc,
mpegts_network_t *(*build)(const idclass_t *idc, htsmsg_t *conf) );
void mpegts_input_close_pid
( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner );
+void mpegts_input_close_pids
+ ( mpegts_input_t *mi, mpegts_mux_t *mm, int type, void *owner );
+
static inline void
tsdebug_write(mpegts_mux_t *mm, uint8_t *buf, size_t len)
{
mpegts_service_create0(calloc(1, sizeof(mpegts_service_t)),\
&mpegts_service_class, u, m, s, p, c)
+mpegts_service_t *mpegts_service_create_raw(mpegts_mux_t *mm);
+
mpegts_service_t *mpegts_service_find
( mpegts_mux_t *mm, uint16_t sid, uint16_t pmt_pid, int create, int *save );
if (!iptv_input_is_free(mi)) {
w = 1000000;
- /* Direct subs */
- LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
- LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
- w = MIN(w, ths->ths_weight);
- }
- }
-
/* Service subs */
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
int
mpegts_input_get_weight ( mpegts_input_t *mi, int flags )
{
- const mpegts_mux_instance_t *mmi;
const service_t *s;
const th_subscription_t *ths;
int w = 0, count = 0;
- /* Direct subs */
- LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
- LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
- w = MAX(w, ths->ths_weight);
- count++;
- }
- }
-
/* Service subs */
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
{
}
-static int
-mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b )
+int
+mpegts_mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b )
{
if (a->mps_type != b->mps_type) {
if (a->mps_type & MPS_SERVICE)
return 0;
}
+void
+mpegts_input_close_pids
+ ( mpegts_input_t *mi, mpegts_mux_t *mm, int type, void *owner )
+{
+ mpegts_pid_t *mp, *mp_next;
+ mpegts_pid_sub_t *mps, *mps_next;
+
+ for (mp = RB_FIRST(&mm->mm_pids); mp; mp = mp_next) {
+ mp_next = RB_NEXT(mp, mp_link);
+ if ((mp->mp_type & MPS_RAW) == 0) continue;
+ for (mps = RB_FIRST(&mp->mp_subs); mps; mps = mps_next) {
+ mps_next = RB_NEXT(mps, mps_link);
+ if (mps->mps_owner != owner) continue;
+ LIST_REMOVE(mps, mps_svcraw_link);
+ RB_REMOVE(&mp->mp_subs, mps, mps_link);
+ free(mps);
+ if (!RB_FIRST(&mp->mp_subs)) {
+ RB_REMOVE(&mm->mm_pids, mp, mp_link);
+ free(mp);
+ }
+ }
+ }
+}
+
mpegts_pid_t *
mpegts_input_open_pid
( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner )
{
char buf[512];
mpegts_pid_t *mp;
- mpegts_pid_sub_t *mps;
+ mpegts_pid_sub_t *mps, *mps2;
assert(owner != NULL);
- assert((type & (MPS_STREAM|MPS_SERVICE)) == 0 ||
- ((type & MPS_STREAM) ? 1 : 0) != ((type & MPS_SERVICE) ? 1 : 0));
+ assert((type & (MPS_STREAM|MPS_SERVICE|MPS_RAW)) == 0 ||
+ (((type & MPS_STREAM) ? 1 : 0) +
+ ((type & MPS_SERVICE) ? 1 : 0) +
+ ((type & MPS_RAW) ? 1 : 0)) == 1);
lock_assert(&mi->mi_output_lock);
+
+ if (pid == MPEGTS_FULLMUX_PID)
+ mpegts_input_close_pids(mi, mm, MPS_RAW, owner);
+
if ((mp = mpegts_mux_find_pid(mm, pid, 1))) {
mps = calloc(1, sizeof(*mps));
mps->mps_type = type;
mps->mps_owner = owner;
- if (!RB_INSERT_SORTED(&mp->mp_subs, mps, mps_link, mps_cmp)) {
+ if (pid == MPEGTS_FULLMUX_PID) {
+ mp->mp_type |= type | MPS_ALL;
+ LIST_FOREACH(mps2, &mm->mm_all_subs, mps_svcraw_link)
+ if (mps2->mps_owner == owner) break;
+ if (mps2 == NULL) {
+ LIST_INSERT_HEAD(&mm->mm_all_subs, mps, mps_svcraw_link);
+ mpegts_mux_nice_name(mm, buf, sizeof(buf));
+ tvhdebug("mpegts", "%s - open PID fullmux subscription [%d/%p]",
+ buf, type, owner);
+ }
+ } else if (!RB_INSERT_SORTED(&mp->mp_subs, mps, mps_link, mpegts_mps_cmp)) {
mp->mp_type |= type;
- if (type & MPS_SERVICE)
- LIST_INSERT_HEAD(&mp->mp_svc_subs, mps, mps_svc_link);
+ if (type & (MPS_SERVICE|MPS_RAW))
+ LIST_INSERT_HEAD(&mp->mp_svc_subs, mps, mps_svcraw_link);
mpegts_mux_nice_name(mm, buf, sizeof(buf));
tvhdebug("mpegts", "%s - open PID %04X (%d) [%d/%p]",
buf, mp->mp_pid, mp->mp_pid, type, owner);
lock_assert(&mi->mi_output_lock);
if (!(mp = mpegts_mux_find_pid(mm, pid, 0)))
return;
- skel.mps_type = type;
- skel.mps_owner = owner;
- mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mps_cmp);
- if (pid == mm->mm_last_pid) {
- mm->mm_last_pid = -1;
- mm->mm_last_mp = NULL;
- }
- if (mps) {
+ if (pid == MPEGTS_FULLMUX_PID) {
mpegts_mux_nice_name(mm, buf, sizeof(buf));
- tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]",
- buf, mp->mp_pid, mp->mp_pid, type, owner);
- if (type & MPS_SERVICE)
- LIST_REMOVE(mps, mps_svc_link);
- RB_REMOVE(&mp->mp_subs, mps, mps_link);
+ tvhdebug("mpegts", "%s - close PID fullmux subscription [%d/%p]",
+ buf, type, owner);
+ mpegts_input_close_pids(mi, mm, MPS_RAW, owner);
+ LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
+ if (mps->mps_owner == owner) break;
+ if (mps == NULL) return;
+ LIST_REMOVE(mps, mps_svcraw_link);
free(mps);
- if (!RB_FIRST(&mp->mp_subs)) {
- RB_REMOVE(&mm->mm_pids, mp, mp_link);
- if (mp->mp_fd != -1)
- linuxdvb_filter_close(mp->mp_fd);
- free(mp);
- } else {
- type = 0;
- RB_FOREACH(mps, &mp->mp_subs, mps_link)
- type |= mps->mps_type;
- mp->mp_type = type;
+ } else {
+ skel.mps_type = type;
+ skel.mps_owner = owner;
+ mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mpegts_mps_cmp);
+ if (pid == mm->mm_last_pid) {
+ mm->mm_last_pid = -1;
+ mm->mm_last_mp = NULL;
+ }
+ if (mps) {
+ mpegts_mux_nice_name(mm, buf, sizeof(buf));
+ tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]",
+ buf, mp->mp_pid, mp->mp_pid, type, owner);
+ if (type & (MPS_SERVICE|MPS_RAW))
+ LIST_REMOVE(mps, mps_svcraw_link);
+ RB_REMOVE(&mp->mp_subs, mps, mps_link);
+ free(mps);
}
}
+ if (!RB_FIRST(&mp->mp_subs)) {
+ RB_REMOVE(&mm->mm_pids, mp, mp_link);
+ if (mp->mp_fd != -1)
+ linuxdvb_filter_close(mp->mp_fd);
+ free(mp);
+ } else {
+ type = 0;
+ RB_FOREACH(mps, &mp->mp_subs, mps_link)
+ type |= mps->mps_type;
+ mp->mp_type = type;
+ }
}
void
mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init )
{
elementary_stream_t *st;
+ mpegts_apids_t *pids;
+ int i;
/* Add to list */
pthread_mutex_lock(&mi->mi_output_lock);
/* Register PIDs */
pthread_mutex_lock(&s->s_stream_mutex);
- mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
- mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
- /* Open only filtered components here */
- TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link) {
- if (st->es_type != SCT_CA) {
- st->es_pid_opened = 1;
- mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
+ if (s->s_type == STYPE_STD) {
+ mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
+ mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
+ /* Open only filtered components here */
+ TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link) {
+ if (st->es_type != SCT_CA) {
+ st->es_pid_opened = 1;
+ mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
+ }
+ }
+ } else {
+ if ((pids = s->s_pids) != NULL) {
+ if (pids->all) {
+ mi->mi_open_pid(mi, s->s_dvb_mux, MPEGTS_FULLMUX_PID, MPS_RAW, s);
+ } else {
+ for (i = 0; i < pids->count; i++)
+ mi->mi_open_pid(mi, s->s_dvb_mux, pids->pids[i], MPS_RAW, s);
+ }
}
}
pthread_mutex_unlock(&s->s_stream_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
- /* Add PMT monitor */
- s->s_pmt_mon =
- mpegts_table_add(s->s_dvb_mux, DVB_PMT_BASE, DVB_PMT_MASK,
- dvb_pmt_callback, s, "pmt",
- MT_CRC, s->s_pmt_pid);
+ /* Add PMT monitor */
+ if(s->s_type == STYPE_STD) {
+ s->s_pmt_mon =
+ mpegts_table_add(s->s_dvb_mux, DVB_PMT_BASE, DVB_PMT_MASK,
+ dvb_pmt_callback, s, "pmt",
+ MT_CRC, s->s_pmt_pid);
+ }
}
void
elementary_stream_t *st;
/* Close PMT table */
- if (s->s_pmt_mon)
+ if (s->s_type == STYPE_STD && s->s_pmt_mon)
mpegts_table_destroy(s->s_pmt_mon);
s->s_pmt_mon = NULL;
/* Close PID */
pthread_mutex_lock(&s->s_stream_mutex);
- mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
- mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
- /* Close all opened PIDs (the component filter may be changed at runtime) */
- TAILQ_FOREACH(st, &s->s_components, es_link) {
- if (st->es_pid_opened) {
- st->es_pid_opened = 0;
- mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
+ if (s->s_type == STYPE_STD) {
+ mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s);
+ mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s);
+ /* Close all opened PIDs (the component filter may be changed at runtime) */
+ TAILQ_FOREACH(st, &s->s_components, es_link) {
+ if (st->es_pid_opened) {
+ st->es_pid_opened = 0;
+ mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s);
+ }
}
+ } else {
+ mpegts_input_close_pids(mi, s->s_dvb_mux, MPS_RAW, s);
}
mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
{
int ret = 0;
- service_t *t;
+ const service_t *t;
+ const th_subscription_t *ths;
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(t, &mi->mi_transports, s_active_link) {
if (((mpegts_service_t*)t)->s_dvb_mux == mm) {
+ if (t->s_type == STYPE_RAW) {
+ LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link)
+ if (!strcmp(ths->ths_title, "keep")) break;
+ if (ths) continue;
+ }
ret = 1;
break;
}
uint8_t *end = mpkt->mp_data + len;
mpegts_mux_t *mm = mpkt->mp_mux;
mpegts_mux_instance_t *mmi;
- th_subscription_t *ths;
#if ENABLE_TSDEBUG
off_t tsdebug_pos;
#endif
if (mm == NULL || (mmi = mm->mm_active) == NULL)
return;
- LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link)
- ths->ths_live = 1;
-
assert(mm == mmi->mmi_mux);
#if ENABLE_TSDEBUG
/* Process */
assert((len % 188) == 0);
- while ( tsb < end ) {
+ while (tsb < end) {
pid = (tsb[1] << 8) | tsb[2];
cc = tsb[3];
}
type = mp->mp_type;
+
+ /* Stream raw PIDs */
+ if (type & MPS_RAW) {
+ LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
+ ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb);
+ LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svcraw_link)
+ ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb);
+ }
/* Stream service data */
if (type & MPS_SERVICE) {
- LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svc_link) {
+ LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svcraw_link) {
s = mps->mps_owner;
f = (type & (MPS_TABLE|MPS_FTABLE)) ||
(pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
/* Stream table data */
if (type & MPS_STREAM) {
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
+ if (s->s_type != STYPE_STD) continue;
f = (type & (MPS_TABLE|MPS_FTABLE)) ||
(pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
ts_recv_packet1((mpegts_service_t*)s, tsb, NULL, f);
//tvhdebug("tsdemux", "%s - SI packet had errors", name);
}
}
+
+ } else {
+
+ /* Stream to all fullmux subscribers */
+ LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
+ ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb);
+
}
done:
{
int s = 0, w = 0;
char buf[512];
- th_subscription_t *sub;
+ th_subscription_t *ths;
+ const service_t *t;
mpegts_mux_t *mm = mmi->mmi_mux;
mpegts_input_t *mi = mmi->mmi_input;
- /* Get number of subs */
- // Note: this is a bit of a mess
- LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) {
- s++;
- w = MAX(w, sub->ths_weight);
- }
- // Note: due to satconf acting as proxy for input we can't always
- // use mi_transports, so we can in via a convoluted route
- LIST_FOREACH(sub, &subscriptions, ths_global_link) {
- if (!sub->ths_service) continue;
- if (!idnode_is_instance(&sub->ths_service->s_id,
- &mpegts_service_class)) continue;
- mpegts_service_t *ms = (mpegts_service_t*)sub->ths_service;
- if (ms->s_dvb_mux == mm) {
- s++;
- w = MAX(w, sub->ths_weight);
- }
- }
+ LIST_FOREACH(t, &mi->mi_transports, s_active_link)
+ if (((mpegts_service_t *)t)->s_dvb_mux == mm)
+ LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link) {
+ s++;
+ w = MAX(w, ths->ths_weight);
+ }
st->uuid = strdup(idnode_uuid_as_str(&mmi->mmi_id));
mi->mi_display_name(mi, buf, sizeof(buf));
mpegts_mux_keep_exists
( mpegts_input_t *mi )
{
- mpegts_mux_instance_t *mmi;
- th_subscription_t *s;
+ const mpegts_mux_instance_t *mmi;
+ const service_t *s;
+ const th_subscription_t *ths;
+ int ret;
+
+ lock_assert(&global_lock);
if (!mi)
return 0;
- mmi = LIST_FIRST(&mi->mi_mux_active);
- if (mmi)
- LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
- LIST_FOREACH(s, &mmi->mmi_subs, ths_mmi_link)
- if (!strcmp(s->ths_title, "keep"))
- return 1;
-
- return 0;
+ ret = 0;
+ LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
+ LIST_FOREACH(ths, &mmi->mmi_mux->mm_raw_subs, ths_mux_link) {
+ s = ths->ths_service;
+ if (s && s->s_type == STYPE_RAW && !strcmp(ths->ths_title, "keep")) {
+ ret = 1;
+ break;
+ }
+ }
+ return ret;
}
static int
mpegts_input_t *mi = mmi->mmi_input;
lock_assert(&mi->mi_output_lock);
- /* Direct subs */
- LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
- w = MAX(w, ths->ths_weight);
- }
-
/* Service subs */
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
mpegts_service_t *ms = (mpegts_service_t*)s;
mpegts_mux_has_subscribers ( mpegts_mux_t *mm, const char *name )
{
mpegts_mux_instance_t *mmi = mm->mm_active;
- th_subscription_t *sub;
if (mmi) {
- if ((sub = LIST_FIRST(&mmi->mmi_subs)) != NULL)
- if (strcmp(sub->ths_title, "keep") || LIST_NEXT(sub, ths_mmi_link)) {
- tvhtrace("mpegts", "%s - keeping mux (direct subscription)", name);
- return 1;
- }
if (mmi->mmi_input->mi_has_subscription(mmi->mmi_input, mm)) {
- tvhtrace("mpegts", "%s - keeping mux (service)", name);
+ tvhtrace("mpegts", "%s - keeping mux", name);
return 1;
}
}
char buf[256], buf2[256], *s;
mpegts_mux_instance_t *mmi = mm->mm_active, *mmi2;
mpegts_input_t *mi = NULL, *mi2;
- th_subscription_t *sub;
mpegts_pid_t *mp;
mpegts_pid_sub_t *mps;
}
mi->mi_stopping_mux(mi, mmi);
- LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
- subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
mi->mi_stop_mux(mi, mmi);
mi->mi_stopped_mux(mi, mmi);
mm->mm_last_mp = NULL;
while ((mp = RB_FIRST(&mm->mm_pids))) {
assert(mi);
- while ((mps = RB_FIRST(&mp->mp_subs))) {
- tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", buf,
- mp->mp_pid, mp->mp_pid, mps->mps_type, mps->mps_owner);
- RB_REMOVE(&mp->mp_subs, mps, mps_link);
- if (mps->mps_type & MPS_SERVICE)
- LIST_REMOVE(mps, mps_svc_link);
- free(mps);
+ if (mp->mp_pid == MPEGTS_FULLMUX_PID) {
+ while ((mps = LIST_FIRST(&mm->mm_all_subs))) {
+ tvhdebug("mpegts", "%s - close PID fullmux subscription [%d/%p]",
+ buf, mps->mps_type, mps->mps_owner);
+ LIST_REMOVE(mps, mps_svcraw_link);
+ free(mps);
+ }
+ } else {
+ while ((mps = RB_FIRST(&mp->mp_subs))) {
+ tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", buf,
+ mp->mp_pid, mp->mp_pid, mps->mps_type, mps->mps_owner);
+ RB_REMOVE(&mp->mp_subs, mps, mps_link);
+ if (mps->mps_type & (MPS_SERVICE|MPS_RAW|MPS_ALL))
+ LIST_REMOVE(mps, mps_svcraw_link);
+ free(mps);
+ }
}
RB_REMOVE(&mm->mm_pids, mp, mp_link);
if (mp->mp_fd != -1)
mpegts_mux_nice_name(mm, buf, sizeof(buf));
tvhtrace("mpegts", "%s - remove subscriber (reason %i)", buf, reason);
#endif
- subscription_unlink_mux(s, reason);
mm->mm_stop(mm, 0, reason);
}
( mpegts_mux_t *mm, mpegts_input_t *mi,
const char *name, int weight, int flags )
{
- int err = 0;
profile_chain_t prch;
th_subscription_t *s;
memset(&prch, 0, sizeof(prch));
s = subscription_create_from_mux(&prch, (tvh_input_t *)mi,
weight, name,
SUBSCRIPTION_NONE | flags,
- NULL, NULL, NULL, &err);
- return s ? 0 : err;
+ NULL, NULL, NULL);
+ return s ? 0 : -EIO;
}
void
( mpegts_mux_t *mm, const char *name )
{
mpegts_mux_instance_t *mmi;
+ const service_t *t;
th_subscription_t *s, *n;
LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link) {
- s = LIST_FIRST(&mmi->mmi_subs);
+ s = LIST_FIRST(&subscriptions);
while (s) {
- n = LIST_NEXT(s, ths_mmi_link);
- if (!strcmp(s->ths_title, name))
+ n = LIST_NEXT(s, ths_global_link);
+ t = s->ths_service;
+ if (t && t->s_type == STYPE_RAW && !strcmp(s->ths_title, name))
subscription_unsubscribe(s);
s = n;
}
mpegts_mux_tuning_error ( const char *mux_uuid, mpegts_mux_instance_t *mmi_match )
{
mpegts_mux_t *mm;
- th_subscription_t *sub;
mpegts_mux_instance_t *mmi;
- streaming_message_t *sm;
struct timespec timeout;
timeout.tv_sec = 2;
if (!pthread_mutex_timedlock(&global_lock, &timeout)) {
mm = mpegts_mux_find(mux_uuid);
if (mm) {
- if ((mmi = mm->mm_active) != NULL && mmi == mmi_match) {
- LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) {
- sm = streaming_msg_create_code(SMT_SERVICE_STATUS, TSS_TUNING);
- streaming_target_deliver(sub->ths_output, sm);
- }
+ if ((mmi = mm->mm_active) != NULL && mmi == mmi_match)
if (mmi->mmi_input)
mmi->mmi_input->mi_tuning_error(mmi->mmi_input, mm);
- }
}
pthread_mutex_unlock(&global_lock);
}
skel.mp_pid = pid;
mp = RB_FIND(&mm->mm_pids, &skel, mp_link, mp_cmp);
- if (mp == NULL && create) {
- mp = calloc(1, sizeof(*mp));
- mp->mp_pid = pid;
- if (!RB_INSERT_SORTED(&mm->mm_pids, mp, mp_link, mp_cmp)) {
- mp->mp_fd = -1;
- mp->mp_cc = -1;
- } else {
- free(mp);
- mp = NULL;
+ if (mp == NULL) {
+ if (create) {
+ mp = calloc(1, sizeof(*mp));
+ mp->mp_pid = pid;
+ if (!RB_INSERT_SORTED(&mm->mm_pids, mp, mp_link, mp_cmp)) {
+ mp->mp_fd = -1;
+ mp->mp_cc = -1;
+ } else {
+ free(mp);
+ mp = NULL;
+ }
}
}
if (mp) {
= subscription_create_from_mux(mms->mms_prch, NULL, mms->mms_weight,
mms->mms_creator ?: "",
SUBSCRIPTION_NONE,
- NULL, NULL, NULL, NULL);
+ NULL, NULL, NULL);
/* Failed (try-again soon) */
if (!mms->mms_sub) {
{
int i;
+ assert(dst);
+ assert(add);
+ assert(del);
if (mpegts_pid_init(add, NULL, 0) ||
mpegts_pid_init(del, NULL, 0))
return -1;
+ if (src == NULL) {
+ mpegts_pid_copy(add, dst);
+ return add->count > 0;
+ }
for (i = 0; i < src->count; i++)
if (mpegts_pid_find_index(dst, src->pids[i]) < 0)
mpegts_pid_add(del, src->pids[i]);
* Service instance list
*/
static void
-mpegts_service_enlist(service_t *t, struct service_instance_list *sil, int flags)
+mpegts_service_enlist(service_t *t, tvh_input_t *ti,
+ struct service_instance_list *sil, int flags)
{
int p = 0, w;
mpegts_service_t *s = (mpegts_service_t*)t;
mi = mmi->mmi_input;
+ if (ti && (tvh_input_t *)mi != ti)
+ continue;
+
if (!mi->mi_is_enabled(mi, mmi->mmi_mux, flags)) continue;
/* Set weight to -1 (forced) for already active mux */
mpegts_mux_t *mm = ms->s_dvb_mux;
/* Remove config */
- if (delconf)
+ if (delconf && t->s_type == STYPE_STD)
hts_settings_remove("input/dvb/networks/%s/muxes/%s/services/%s",
idnode_uuid_as_str(&mm->mm_network->mn_id),
idnode_uuid_as_str(&mm->mm_id),
/* defaults for older version */
s->s_dvb_created = dispatch_clock;
- if (service_create0((service_t*)s, class, uuid, S_MPEG_TS, conf) == NULL)
+ if (service_create0((service_t*)s, STYPE_STD, class, uuid,
+ S_MPEG_TS, conf) == NULL)
return NULL;
/* Create */
s->s_delete = mpegts_service_delete;
s->s_is_enabled = mpegts_service_is_enabled;
- s->s_config_save = mpegts_service_config_save;
s->s_enlist = mpegts_service_enlist;
s->s_start_feed = mpegts_service_start;
s->s_stop_feed = mpegts_service_stop;
return s;
}
+/*
+ * Raw MPEGTS Service
+ */
+
+const idclass_t mpegts_service_raw_class =
+{
+ .ic_super = &service_raw_class,
+ .ic_class = "mpegts_raw_service",
+ .ic_caption = "MPEGTS Raw Service",
+ .ic_properties = NULL
+};
+
+static void
+mpegts_service_raw_setsourceinfo(service_t *t, source_info_t *si)
+{
+ mpegts_service_setsourceinfo(t, si);
+
+ free(si->si_service);
+ si->si_service = strdup("Raw Service");
+}
+
+static int
+mpegts_service_raw_update_pids(service_t *t, mpegts_apids_t *pids)
+{
+ mpegts_service_t *ms = (mpegts_service_t *)t;
+ mpegts_input_t *mi = ms->s_dvb_active_input;
+ mpegts_mux_t *mm = ms->s_dvb_mux;
+ mpegts_apids_t *p, *x;
+ mpegts_apids_t add, del;
+ int i;
+
+ lock_assert(&global_lock);
+ if (pids) {
+ p = calloc(1, sizeof(*p));
+ mpegts_pid_init(p, NULL, 0);
+ mpegts_pid_copy(p, pids);
+ } else
+ p = NULL;
+ if (mi && mm) {
+ pthread_mutex_lock(&mi->mi_output_lock);
+ pthread_mutex_lock(&t->s_stream_mutex);
+ x = t->s_pids;
+ t->s_pids = p;
+ if (!pids->all && x && x->all) {
+ mi->mi_close_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW, t);
+ mpegts_input_close_pids(mi, mm, MPS_RAW, t);
+ for (i = 0; i < x->count; i++)
+ mi->mi_open_pid(mi, mm, x->pids[i], MPS_RAW, t);
+ } else {
+ if (pids->all) {
+ mpegts_input_close_pids(mi, mm, MPS_RAW, t);
+ mi->mi_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW, t);
+ } else {
+ mpegts_pid_compare(p, x, &add, &del);
+ for (i = 0; i < del.count; i++)
+ mi->mi_close_pid(mi, mm, del.pids[i], MPS_RAW, t);
+ for (i = 0; i < add.count; i++)
+ mi->mi_open_pid(mi, mm, add.pids[i], MPS_RAW, t);
+ mpegts_pid_done(&add);
+ mpegts_pid_done(&del);
+ }
+ }
+ pthread_mutex_unlock(&t->s_stream_mutex);
+ pthread_mutex_unlock(&mi->mi_output_lock);
+ } else {
+ pthread_mutex_lock(&t->s_stream_mutex);
+ x = t->s_pids;
+ t->s_pids = p;
+ pthread_mutex_unlock(&t->s_stream_mutex);
+ }
+ if (x) {
+ mpegts_pid_done(x);
+ free(x);
+ }
+ return 0;
+}
+
+mpegts_service_t *
+mpegts_service_create_raw ( mpegts_mux_t *mm )
+{
+ mpegts_service_t *s = calloc(1, sizeof(*s));
+ char buf[256];
+
+ mpegts_mux_nice_name(mm, buf, sizeof(buf));
+
+ if (service_create0((service_t*)s, STYPE_RAW,
+ &mpegts_service_raw_class, NULL,
+ S_MPEG_TS, NULL) == NULL) {
+ free(s);
+ return NULL;
+ }
+
+ sbuf_init(&s->s_tsbuf);
+
+ s->s_dvb_mux = mm;
+
+ s->s_delete = mpegts_service_delete;
+ s->s_is_enabled = mpegts_service_is_enabled;
+ s->s_config_save = mpegts_service_config_save;
+ s->s_enlist = mpegts_service_enlist;
+ s->s_start_feed = mpegts_service_start;
+ s->s_stop_feed = mpegts_service_stop;
+ s->s_refresh_feed = mpegts_service_refresh;
+ s->s_setsourceinfo = mpegts_service_raw_setsourceinfo;
+ s->s_grace_period = mpegts_service_grace_period;
+ s->s_channel_number = mpegts_service_channel_number;
+ s->s_channel_name = mpegts_service_channel_name;
+ s->s_provider_name = mpegts_service_provider_name;
+ s->s_channel_icon = mpegts_service_channel_icon;
+ s->s_mapped = mpegts_service_mapped;
+ s->s_update_pids = mpegts_service_raw_update_pids;
+
+ pthread_mutex_lock(&s->s_stream_mutex);
+ free(s->s_nicename);
+ s->s_nicename = strdup(buf);
+ pthread_mutex_unlock(&s->s_stream_mutex);
+
+ tvhlog(LOG_DEBUG, "mpegts", "%s - add raw service", buf);
+
+ return s;
+}
+
/******************************************************************************
* Editor Configuration
*
ts_recv_packet0(t, st, tsb);
}
+/*
+ *
+ */
+void
+ts_recv_raw(mpegts_service_t *t, const uint8_t *tsb)
+{
+ elementary_stream_t *st = NULL;
+ int pid;
+
+ pthread_mutex_lock(&t->s_stream_mutex);
+ if (t->s_parent) {
+ /* If PID is owned by parent, let parent service to
+ * deliver this PID (decrambling)
+ */
+ pid = (tsb[1] & 0x1f) << 8 | tsb[2];
+ st = service_stream_find(t->s_parent, pid);
+ }
+ if(st == NULL)
+ if (streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS))
+ ts_remux(t, tsb, 0);
+ pthread_mutex_unlock(&t->s_stream_mutex);
+}
/**
*
void ts_recv_packet2(struct mpegts_service *t, const uint8_t *tsb);
+void ts_recv_raw(struct mpegts_service *t, const uint8_t *tsb);
+
#endif /* TSDEMUX_H */
mpegts_network_t *mn, *mn2;
dvb_network_t *ln;
dvb_mux_t *mux;
+ service_t *svc;
char buf[384];
int res = HTTP_STATUS_SERVICE, qsize = 3000000, created = 0;
config_get_int("satip_weight", 100),
"SAT>IP",
rs->prch.prch_flags |
- SUBSCRIPTION_FULLMUX |
SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
- http_arg_get(&hc->hc_args, "User-Agent"), NULL);
+ http_arg_get(&hc->hc_args, "User-Agent"));
if (!rs->subs)
goto endrtp;
if (rs->run) {
}
} else {
pids:
+ svc = rs->subs->ths_service;
+ svc->s_update_pids(svc, &rs->pids);
satip_rtp_update_pids((void *)(intptr_t)rs->stream, &rs->pids);
}
if (!setup && !rs->run) {
rs->udp_rtp->fd, rs->udp_rtcp->fd,
rs->frontend, rs->findex, &rs->mux->lm_tuning,
&rs->pids);
+ svc = rs->subs->ths_service;
+ svc->s_update_pids(svc, &rs->pids);
rs->run = 1;
}
pthread_mutex_unlock(&global_lock);
static void service_class_save(struct idnode *self);
struct service_queue service_all;
+struct service_queue service_raw_all;
static void
service_class_notify_enabled ( void *obj )
}
};
+const idclass_t service_raw_class = {
+ .ic_class = "service_raw",
+ .ic_caption = "Service Raw",
+ .ic_event = "service_raw",
+ .ic_perm_def = ACCESS_ADMIN,
+ .ic_delete = service_class_delete,
+ .ic_save = NULL,
+ .ic_get_title = service_class_get_title,
+ .ic_properties = NULL
+};
+
/**
*
*/
*/
service_instance_t *
service_find_instance
- (service_t *s, channel_t *ch, service_instance_list_t *sil,
+ (service_t *s, channel_t *ch, tvh_input_t *ti,
+ service_instance_list_t *sil,
int *error, int weight, int flags, int timeout, int postpone)
{
channel_service_mapping_t *csm;
LIST_FOREACH(csm, &ch->ch_services, csm_chn_link) {
s = csm->csm_svc;
if (s->s_is_enabled(s, flags))
- s->s_enlist(s, sil, flags);
+ s->s_enlist(s, ti, sil, flags);
}
} else {
- s->s_enlist(s, sil, flags);
+ s->s_enlist(s, ti, sil, flags);
}
/* Clean */
avgstat_flush(&t->s_rate);
- TAILQ_REMOVE(&service_all, t, s_all_link);
+ if (t->s_type == STYPE_RAW)
+ TAILQ_REMOVE(&service_raw_all, t, s_all_link);
+ else
+ TAILQ_REMOVE(&service_all, t, s_all_link);
service_unref(t);
}
*/
service_t *
service_create0
- ( service_t *t, const idclass_t *class, const char *uuid,
+ ( service_t *t, int service_type,
+ const idclass_t *class, const char *uuid,
int source_type, htsmsg_t *conf )
{
if (idnode_insert(&t->s_id, uuid, class, 0)) {
lock_assert(&global_lock);
- TAILQ_INSERT_TAIL(&service_all, t, s_all_link);
+ if (service_type == STYPE_RAW)
+ TAILQ_INSERT_TAIL(&service_raw_all, t, s_all_link);
+ else
+ TAILQ_INSERT_TAIL(&service_all, t, s_all_link);
pthread_mutex_init(&t->s_stream_mutex, NULL);
pthread_cond_init(&t->s_tss_cond, NULL);
+ t->s_type = service_type;
t->s_source_type = source_type;
t->s_refcount = 1;
t->s_enabled = 1;
{
int had_components;
+ if(t->s_type != STYPE_STD)
+ goto refresh;
+
pthread_mutex_lock(&t->s_stream_mutex);
had_components = TAILQ_FIRST(&t->s_filt_components) != NULL &&
pthread_mutex_unlock(&t->s_stream_mutex);
+refresh:
if(t->s_refresh_feed != NULL)
t->s_refresh_feed(t);
void
service_request_save(service_t *t, int restart)
{
+ if (t->s_type != STYPE_STD && !restart)
+ return;
+
pthread_mutex_lock(&pending_save_mutex);
if(!t->s_ps_onqueue) {
pthread_mutex_unlock(&pending_save_mutex);
pthread_mutex_lock(&global_lock);
- if(t->s_status != SERVICE_ZOMBIE)
+ if(t->s_status != SERVICE_ZOMBIE && t->s_config_save)
t->s_config_save(t);
if(t->s_status == SERVICE_RUNNING && restart)
service_restart(t);
{
TAILQ_INIT(&pending_save_queue);
TAILQ_INIT(&service_all);
+ TAILQ_INIT(&service_raw_all);
pthread_mutex_init(&pending_save_mutex, NULL);
pthread_cond_init(&pending_save_cond, NULL);
tvhthread_create(&service_saver_tid, NULL, service_saver, NULL);
#include "descrambler.h"
extern const idclass_t service_class;
+extern const idclass_t service_raw_class;
extern struct service_queue service_all;
+extern struct service_queue service_raw_all;
struct channel;
+struct tvh_input;
+struct mpegts_apids;
/**
* Stream, one media component for a service.
*/
int s_refcount;
+ /**
+ * Service type, standard or raw (for mux or partial mux streaming)
+ */
+ enum {
+ STYPE_STD,
+ STYPE_RAW
+ } s_type;
+
/**
* Source type is used to determine if an output requesting
* MPEG-TS can shortcut all the parsing and remuxing.
int (*s_is_enabled)(struct service *t, int flags);
- void (*s_enlist)(struct service *s, service_instance_list_t *sil, int flags);
+ void (*s_enlist)(struct service *s, struct tvh_input *ti,
+ service_instance_list_t *sil, int flags);
int (*s_start_feed)(struct service *s, int instance);
void (*s_delete)(struct service *t, int delconf);
+#if ENABLE_MPEGTS
+ int (*s_update_pids)(struct service *t, struct mpegts_apids *pids);
+#endif
+
/**
* Channel info
*/
struct elementary_stream_queue s_filt_components;
int s_last_pid;
elementary_stream_t *s_last_es;
-
+#if ENABLE_MPEGTS
+ struct service *s_parent;
+ struct mpegts_apids *s_pids;
+#endif
/**
* Delivery pad, this is were we finally deliver all streaming output
void service_build_filter(service_t *t);
-service_t *service_create0(service_t *t, const idclass_t *idc, const char *uuid, int source_type, htsmsg_t *conf);
+service_t *service_create0(service_t *t, int service_type, const idclass_t *idc,
+ const char *uuid, int source_type, htsmsg_t *conf);
-#define service_create(t, c, u, s, m)\
- (struct t*)service_create0(calloc(1, sizeof(struct t), &t##_class, c, u, s, m)
+#define service_create(t, y, c, u, s, m)\
+ (struct t*)service_create0(calloc(1, sizeof(struct t), y, &t##_class, c, u, s, m)
void service_unref(service_t *t);
service_instance_t *service_find_instance(struct service *s,
struct channel *ch,
+ struct tvh_input *source,
service_instance_list_t *sil,
int *error, int weight,
int flags, int timeout,
/* Subscribe */
tvhinfo("service_mapper", "checking %s", s->s_nicename);
prch.prch_id = s;
- sub = subscription_create_from_service(&prch, SUBSCRIPTION_PRIO_MAPPER,
+ sub = subscription_create_from_service(&prch, NULL, SUBSCRIPTION_PRIO_MAPPER,
"service_mapper",
0, NULL, NULL, "service_mapper");
s->ths_service = t;
LIST_INSERT_HEAD(&t->s_subscriptions, s, ths_service_link);
- tvhtrace("subscription", "%04X: linking sub %p to svc %p", shortid(s), s, t);
+ tvhtrace("subscription", "%04X: linking sub %p to svc %p type %i",
+ shortid(s), s, t, t->s_type);
pthread_mutex_lock(&t->s_stream_mutex);
- if(TAILQ_FIRST(&t->s_filt_components) != NULL) {
+ if(TAILQ_FIRST(&t->s_filt_components) != NULL ||
+ t->s_type != STYPE_STD) {
streaming_msg_free(s->ths_start_message);
subscription_unlink_service0(s, reason, 1);
}
-/*
- * Called from mpegts code
- */
-void
-subscription_unlink_mux(th_subscription_t *s, int reason)
-{
- streaming_message_t *sm;
- mpegts_mux_instance_t *mmi = s->ths_mmi;
- mpegts_mux_t *mm = mmi->mmi_mux;
- mpegts_input_t *mi = mmi->mmi_input;
-
- gtimer_disarm(&s->ths_receive_timer);
-
- assert(mi);
-
- pthread_mutex_lock(&mi->mi_output_lock);
- s->ths_mmi = NULL;
-
- if (!(s->ths_flags & SUBSCRIPTION_NONE))
- streaming_target_disconnect(&mmi->mmi_streaming_pad, &s->ths_input);
-
- sm = streaming_msg_create_code(SMT_STOP, reason);
- streaming_target_deliver(s->ths_output, sm);
-
- if (s->ths_flags & SUBSCRIPTION_FULLMUX)
- mi->mi_close_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_NONE, s);
- LIST_REMOVE(s, ths_mmi_link);
-
- pthread_mutex_unlock(&mi->mi_output_lock);
-}
-
/* **************************************************************************
* Scheduling
* *************************************************************************/
size_t buflen;
s->ths_service->s_setsourceinfo(s->ths_service, &si);
- snprintf(buf, sizeof(buf),
+ buflen = snprintf(buf, sizeof(buf),
"\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", "
- "network: \"%s\", mux: \"%s\", provider: \"%s\", "
- "service: \"%s\"",
+ "network: \"%s\", mux: \"%s\", provider: \"%s\", service: \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none", s->ths_weight,
si.si_adapter ?: "<N/A>",
si.si_network ?: "<N/A>",
si.si_service ?: "<N/A>");
service_source_info_free(&si);
+ if (s->ths_prch && s->ths_prch->prch_pro)
+ buflen += snprintf(buf + buflen, sizeof(buf) - buflen,
+ ", profile=\"%s\"",
+ s->ths_prch->prch_pro->pro_name ?: "");
+
if (s->ths_hostname) {
- buflen = strlen(buf);
snprintf(buf + buflen, sizeof(buf) - buflen,
", hostname=\"%s\", username=\"%s\", client=\"%s\"",
s->ths_hostname ?: "<N/A>",
lock_assert(&global_lock);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
- if (s->ths_mmi) continue;
if (!s->ths_service && !s->ths_channel) continue;
/* Postpone the tuner decision */
tvhtrace("subscription", "%04X: find instance for %s weight %d",
shortid(s), s->ths_service->s_nicename, s->ths_weight);
si = service_find_instance(s->ths_service, s->ths_channel,
+ s->ths_source,
&s->ths_instances, &error, s->ths_weight,
s->ths_flags, s->ths_timeout,
dispatch_clock > s->ths_postpone_end ?
LIST_REMOVE(s, ths_global_link);
- if(s->ths_channel != NULL) {
+ if (s->ths_channel != NULL) {
LIST_REMOVE(s, ths_channel_link);
snprintf(buf, sizeof(buf), "\"%s\" unsubscribing from \"%s\"",
s->ths_title, channel_get_name(s->ths_channel));
}
tvhlog(LOG_INFO, "subscription", "%04X: %s", shortid(s), buf);
- if(t)
+ if (t) {
service_remove_subscriber(t, s, SM_CODE_OK);
-
#if ENABLE_MPEGTS
- if(s->ths_mmi)
- mpegts_mux_remove_subscriber(s->ths_mmi->mmi_mux, s, SM_CODE_OK);
+ if (t->s_type == STYPE_RAW)
+ LIST_REMOVE(s, ths_mux_link);
#endif
+ }
streaming_msg_free(s->ths_start_message);
*/
static th_subscription_t *
subscription_create_from_channel_or_service(profile_chain_t *prch,
+ tvh_input_t *ti,
unsigned int weight,
const char *name,
int flags,
const char *hostname,
const char *username,
const char *client,
- int service)
+ service_t *service)
{
th_subscription_t *s;
channel_t *ch = NULL;
- service_t *t = NULL;
assert(prch);
assert(prch->prch_id);
- assert(prch->prch_st);
-
- if (service)
- t = prch->prch_id;
- else
+ if (!service)
ch = prch->prch_id;
s = subscription_create(prch, weight, name, flags, subscription_input,
shortid(s), channel_get_name(ch), weight, pro_name);
else
tvhtrace("subscription", "%04X: creating subscription for service %s weight %d sing profile %s",
- shortid(s), t->s_nicename, weight, pro_name);
+ shortid(s), service->s_nicename, weight, pro_name);
#endif
s->ths_channel = ch;
- s->ths_service = t;
+ s->ths_service = service;
+ s->ths_source = ti;
if (ch)
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
+#if ENABLE_MPEGTS
+ if (service && service->s_type == STYPE_RAW) {
+ mpegts_mux_t *mm = prch->prch_id;
+ LIST_INSERT_HEAD(&mm->mm_raw_subs, s, ths_mux_link);
+ }
+#endif
+
subscription_reschedule();
return s;
}
th_subscription_t *
subscription_create_from_channel(profile_chain_t *prch,
+ tvh_input_t *ti,
unsigned int weight,
const char *name,
int flags, const char *hostname,
const char *username, const char *client)
{
+ assert(prch->prch_st);
return subscription_create_from_channel_or_service
- (prch, weight, name, flags, hostname, username, client, 0);
+ (prch, ti, weight, name, flags, hostname, username, client, NULL);
}
/**
*/
th_subscription_t *
subscription_create_from_service(profile_chain_t *prch,
+ tvh_input_t *ti,
unsigned int weight,
const char *name,
int flags,
const char *hostname, const char *username,
const char *client)
{
+ assert(prch->prch_st);
return subscription_create_from_channel_or_service
- (prch, weight, name, flags, hostname, username, client, 1);
+ (prch, ti, weight, name, flags, hostname, username, client,
+ prch->prch_id);
}
-/**
- *
- */
/**
*
*/
#if ENABLE_MPEGTS
-// TODO: move this
-static void
-mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si )
-{
- char buf[256];
-
- /* Validate */
- lock_assert(&global_lock);
-
- /* Update */
- if(mm->mm_network->mn_network_name != NULL)
- si->si_network = strdup(mm->mm_network->mn_network_name);
-
- mm->mm_display_name(mm, buf, sizeof(buf));
- si->si_mux = strdup(buf);
-
- if(mm->mm_active && mm->mm_active->mmi_input) {
- mpegts_input_t *mi = mm->mm_active->mmi_input;
- mi->mi_display_name(mi, buf, sizeof(buf));
- si->si_adapter = strdup(buf);
- }
-}
-
-static void
-mux_data_timeout ( void *aux )
-{
- th_subscription_t *s = aux;
-
- if (!s->ths_mmi)
- return;
-
- if (!s->ths_live) {
- tvhwarn("subscription", "%04X: mux data timeout for %s", shortid(s), s->ths_title);
- mpegts_mux_remove_subscriber(s->ths_mmi->mmi_mux, s, SM_CODE_NO_INPUT);
- return;
- }
- s->ths_live = 0;
-
- if (s->ths_timeout > 0)
- gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, s->ths_timeout);
-}
-
+#include "input/mpegts.h"
th_subscription_t *
subscription_create_from_mux(profile_chain_t *prch,
tvh_input_t *ti,
int flags,
const char *hostname,
const char *username,
- const char *client,
- int *err)
+ const char *client)
{
mpegts_mux_t *mm = prch->prch_id;
- th_subscription_t *s;
- streaming_message_t *sm;
- streaming_start_t *ss;
- mpegts_input_t *mi;
- int r;
-
- /* Tune */
- r = mm->mm_start(mm, (mpegts_input_t *)ti, name, weight, flags);
- if (r) {
- if (err) *err = r;
- return NULL;
- }
-
- /* Create subscription */
- if (!prch->prch_st)
- flags |= SUBSCRIPTION_NONE;
- s = subscription_create(prch, weight, name, flags, NULL,
- hostname, username, client);
- s->ths_mmi = mm->mm_active;
+ mpegts_service_t *s = mpegts_service_create_raw(mm);
- /* Install full mux handler */
- mi = s->ths_mmi->mmi_input;
- assert(mi);
-
- pthread_mutex_lock(&mi->mi_output_lock);
-
- if (s->ths_flags & SUBSCRIPTION_FULLMUX)
- mi->mi_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_NONE, s);
-
- /* Store */
- LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
-
- /* Connect */
- if (prch->prch_st)
- streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input);
-
- /* Deliver a start message */
- ss = calloc(1, sizeof(streaming_start_t));
- ss->ss_num_components = 0;
- ss->ss_refcount = 1;
-
- mpegts_mux_setsourceinfo(mm, &ss->ss_si);
- ss->ss_si.si_service = strdup("rawmux");
-
- tvhinfo("subscription",
- "%04X: \"%s\" subscribing to mux, weight: %d, adapter: \"%s\", "
- "network: \"%s\", mux: \"%s\", hostname: \"%s\", username: \"%s\", "
- "client: \"%s\"",
- shortid(s),
- s->ths_title,
- s->ths_weight,
- ss->ss_si.si_adapter ?: "<N/A>",
- ss->ss_si.si_network ?: "<N/A>",
- ss->ss_si.si_mux ?: "<N/A>",
- hostname ?: "<N/A>",
- username ?: "<N/A>",
- client ?: "<N/A>");
-
- sm = streaming_msg_create_data(SMT_START, ss);
- streaming_target_deliver(s->ths_output, sm);
-
- r = (mi->mi_get_grace ? mi->mi_get_grace(mi, mm) : 0) + 20;
- sm = streaming_msg_create_code(SMT_GRACE, r);
- streaming_target_deliver(s->ths_output, sm);
-
- pthread_mutex_unlock(&mi->mi_output_lock);
-
- if (r > 0)
- gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r);
+ if (!s)
+ return NULL;
- return s;
+ return subscription_create_from_channel_or_service
+ (prch, ti, weight, name, flags, hostname, username, client,
+ (service_t *)s);
}
#endif
else if(s->ths_dvrfile != NULL)
htsmsg_add_str(m, "service", s->ths_dvrfile ?: "");
- else if (s->ths_mmi != NULL && s->ths_mmi->mmi_mux != NULL) {
- char buf[512];
- mpegts_mux_t *mm = s->ths_mmi->mmi_mux;
- mpegts_mux_nice_name(mm, buf, sizeof(buf));
- htsmsg_add_str(m, "service", buf);
- }
-
return m;
}
void
subscription_done(void)
{
+ th_subscription_t *s;
+
pthread_mutex_lock(&global_lock);
+ LIST_FOREACH(s, &subscriptions, ths_global_link)
+ subscription_unsubscribe(s);
/* clear remaining subscriptions */
subscription_reschedule();
pthread_mutex_unlock(&global_lock);
st = calloc(1, sizeof(*st));
streaming_target_init(st, dummy_callback, NULL, 0);
prch->prch_st = st;
- s = subscription_create_from_service(prch, 1, "dummy", 0, NULL, NULL, "dummy");
+ s = subscription_create_from_service(prch, NULL, 1, "dummy", 0, NULL, NULL, "dummy");
tvhlog(LOG_NOTICE, "subscription",
"%04X: Dummy join %s ok", shortid(s), id);
#define SUBSCRIPTION_RAW_MPEGTS 0x001
#define SUBSCRIPTION_NONE 0x002
-#define SUBSCRIPTION_FULLMUX 0x004
-#define SUBSCRIPTION_STREAMING 0x008
-#define SUBSCRIPTION_RESTART 0x010
-#define SUBSCRIPTION_INITSCAN 0x020 ///< for mux subscriptions
-#define SUBSCRIPTION_IDLESCAN 0x040 ///< for mux subscriptions
-#define SUBSCRIPTION_USERSCAN 0x080 ///< for mux subscriptions
-#define SUBSCRIPTION_EPG 0x100 ///< for mux subscriptions
+#define SUBSCRIPTION_STREAMING 0x004
+#define SUBSCRIPTION_RESTART 0x008
+#define SUBSCRIPTION_INITSCAN 0x010 ///< for mux subscriptions
+#define SUBSCRIPTION_IDLESCAN 0x020 ///< for mux subscriptions
+#define SUBSCRIPTION_USERSCAN 0x040 ///< for mux subscriptions
+#define SUBSCRIPTION_EPG 0x080 ///< for mux subscriptions
/* Some internal priorities */
#define SUBSCRIPTION_PRIO_KEEP 1 ///< Keep input rolling
struct service *ths_service; /* if NULL, ths_service_link
is not linked */
+ struct tvh_input *ths_source; /* if NULL, all sources are allowed */
+
char *ths_title; /* display title */
time_t ths_start; /* time when subscription started */
int ths_total_err; /* total errors during entire subscription */
int ths_postpone;
time_t ths_postpone_end;
-#if ENABLE_MPEGTS
- // Note: its a bit ugly linking MPEG-TS code directly here, but to do
- // otherwise would probably require adding lots of additional
- // (repeated) logic elsewhere
- LIST_ENTRY(th_subscription) ths_mmi_link;
- struct mpegts_mux_instance *ths_mmi;
- gtimer_t ths_receive_timer;
- uint8_t ths_live;
-#endif
+ /*
+ * MPEG-TS mux chain
+ */
+ LIST_ENTRY(th_subscription) ths_mux_link;
} th_subscription_t;
th_subscription_t *
subscription_create_from_channel(struct profile_chain *prch,
+ struct tvh_input *ti,
unsigned int weight,
const char *name,
int flags,
th_subscription_t *
subscription_create_from_service(struct profile_chain *prch,
+ struct tvh_input *ti,
unsigned int weight,
const char *name,
int flags,
int flags,
const char *hostname,
const char *username,
- const char *client, int *err);
+ const char *client);
#endif
th_subscription_t *subscription_create(struct profile_chain *prch,
void subscription_unlink_service(th_subscription_t *s, int reason);
-void subscription_unlink_mux(th_subscription_t *s, int reason);
-
void subscription_dummy_join(const char *id, int first);
typedef enum {
SCT_NONE = -1,
SCT_UNKNOWN = 0,
- SCT_MPEG2VIDEO = 1,
+ SCT_RAW = 1,
+ SCT_MPEG2VIDEO,
SCT_MPEG2AUDIO,
SCT_H264,
SCT_AC3,
while(!hc->hc_shutdown && run && tvheadend_running) {
pthread_mutex_lock(&sq->sq_mutex);
sm = TAILQ_FIRST(&sq->sq_queue);
- if(sm == NULL) {
+ if(sm == NULL) {
gettimeofday(&tp, NULL);
ts.tv_sec = tp.tv_sec + 1;
ts.tv_nsec = tp.tv_usec * 1000;
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
- s = subscription_create_from_service(&prch, weight ?: 100, "HTTP",
+ s = subscription_create_from_service(&prch, NULL, weight ?: 100, "HTTP",
prch.prch_flags | SUBSCRIPTION_STREAMING,
addrbuf,
hc->hc_username,
th_subscription_t *s;
profile_chain_t prch;
size_t qsize;
- const char *name;
+ const char *name, *str;
char addrbuf[50];
void *tcp_id;
- const char *str;
- int res = HTTP_STATUS_SERVICE;
+ char *p, *saveptr;
+ mpegts_apids_t pids;
+ int res = HTTP_STATUS_SERVICE, i;
if(http_access_verify(hc, ACCESS_ADVANCED_STREAMING))
return HTTP_STATUS_UNAUTHORIZED;
else
qsize = 10000000;
+ mpegts_pid_init(&pids, NULL, 0);
+ if ((str = http_arg_get(&hc->hc_req_args, "pids"))) {
+ p = tvh_strdupa(str);
+ p = strtok_r(p, ",", &saveptr);
+ while (p) {
+ if (strcmp(p, "all") == 0) {
+ pids.all = 1;
+ } else {
+ i = atoi(p);
+ if (i < 0 || i > 8192)
+ return HTTP_STATUS_BAD_REQUEST;
+ if (i == 8192)
+ pids.all = 1;
+ else
+ mpegts_pid_add(&pids, i);
+ }
+ p = strtok_r(NULL, ",", &saveptr);
+ }
+ if (!pids.all && pids.count <= 0)
+ return HTTP_STATUS_BAD_REQUEST;
+ } else {
+ pids.all = 1;
+ }
+
if (!profile_chain_raw_open(&prch, mm, qsize)) {
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
s = subscription_create_from_mux(&prch, NULL, weight ?: 10, "HTTP",
prch.prch_flags |
- SUBSCRIPTION_FULLMUX |
SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
- http_arg_get(&hc->hc_args, "User-Agent"), NULL);
+ http_arg_get(&hc->hc_args, "User-Agent"));
if (s) {
name = tvh_strdupa(s->ths_title);
- pthread_mutex_unlock(&global_lock);
- http_stream_run(hc, &prch, name, s);
- pthread_mutex_lock(&global_lock);
+ if (s->ths_service->s_update_pids(s->ths_service, &pids) == 0) {
+ pthread_mutex_unlock(&global_lock);
+ http_stream_run(hc, &prch, name, s);
+ pthread_mutex_lock(&global_lock);
+ }
subscription_unsubscribe(s);
res = 0;
}
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
- s = subscription_create_from_channel(&prch, weight ?: 100, "HTTP",
+ s = subscription_create_from_channel(&prch,
+ NULL, weight ?: 100, "HTTP",
prch.prch_flags | SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
http_arg_get(&hc->hc_args, "User-Agent"));