pro = de->de_config->dvr_profile;
prch = malloc(sizeof(*prch));
- if (profile_chain_open(pro, prch, de->de_channel, &de->de_config->dvr_muxcnf, 0, 0)) {
+ profile_chain_init(prch, pro, de->de_channel);
+ if (profile_chain_open(prch, &de->de_config->dvr_muxcnf, 0, 0)) {
tvherror("dvr", "unable to create new channel streaming chain for '%s'",
channel_get_name(de->de_channel));
return;
}
- de->de_s = subscription_create_from_channel(de->de_channel, pro, weight,
- buf, prch->prch_st,
- prch->prch_flags,
+ de->de_s = subscription_create_from_channel(prch, weight,
+ buf, prch->prch_flags,
NULL, NULL, NULL);
if (de->de_s == NULL) {
tvherror("dvr", "unable to create new channel subcription for '%s'",
#endif
pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp");
- if (!profile_work(pro, &hs->hs_prch, ch, &hs->hs_input, timeshiftPeriod, pflags)) {
+ profile_chain_init(&hs->hs_prch, pro, ch);
+ if (!profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, pflags)) {
tvhlog(LOG_ERR, "htsp", "unable to create profile chain '%s'", pro->pro_name);
free(hs);
return htsp_error("Stream setup error");
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
tvhdebug("htsp", "%s - subscribe to %s\n", htsp->htsp_logname, ch->ch_name ?: "");
- hs->hs_s = subscription_create_from_channel(ch, pro, weight,
+ hs->hs_s = subscription_create_from_channel(&hs->hs_prch, weight,
htsp->htsp_logname,
- hs->hs_prch.prch_st,
SUBSCRIPTION_STREAMING,
htsp->htsp_peername,
htsp->htsp_username,
#include "subscriptions.h"
#include "channels.h"
#include "access.h"
+#include "profile.h"
#include "dvb_charset.h"
#include <assert.h>
( mpegts_mux_t *mm, const char *name, int weight )
{
int err = 0;
+ profile_chain_t prch;
th_subscription_t *s;
- s = subscription_create_from_mux(mm, NULL, weight, name, NULL,
+ memset(&prch, 0, sizeof(prch));
+ prch.prch_id = mm;
+ s = subscription_create_from_mux(&prch, weight, name,
SUBSCRIPTION_NONE,
NULL, NULL, NULL, &err);
return s ? 0 : err;
#include "input/mpegts/mpegts_mux_sched.h"
#include "streaming.h"
#include "settings.h"
+#include "profile.h"
static void mpegts_mux_sched_timer ( void *p );
static void mpegts_mux_sched_input ( void *p, streaming_message_t *sm );
if (!mms->mms_active) {
assert(mms->mms_sub == NULL);
+ if (!mms->mms_prch)
+ mms->mms_prch = calloc(1, sizeof(mms->mms_prch));
+ mms->mms_prch->prch_id = mm;
+ mms->mms_prch->prch_st = &mms->mms_input;
+
mms->mms_sub
- = subscription_create_from_mux(mm, NULL, mms->mms_weight,
+ = subscription_create_from_mux(mms->mms_prch, mms->mms_weight,
mms->mms_creator ?: "",
- &mms->mms_input,
SUBSCRIPTION_NONE,
NULL, NULL, NULL, NULL);
free(mms->mms_cronstr);
free(mms->mms_mux);
free(mms->mms_creator);
+ free(mms->mms_prch);
free(mms);
}
#include "idnode.h"
#include "subscriptions.h"
+struct profile_chain;
+
typedef LIST_HEAD(,mpegts_mux_sched) mpegts_mux_sched_list_t;
extern mpegts_mux_sched_list_t mpegts_mux_sched_all;
/*
* Subscription
*/
- th_subscription_t *mms_sub; ///< Subscription handler
- streaming_target_t mms_input; ///< Streaming input
+ struct profile_chain *mms_prch; ///< Dummy profile chain
+ th_subscription_t *mms_sub; ///< Subscription handler
+ streaming_target_t mms_input; ///< Streaming input
} mpegts_mux_sched_t;
if (!htsmsg_get_bool(conf, "shield", &b))
pro->pro_shield = !!b;
}
+ pro->pro_refcount = 1;
TAILQ_INSERT_TAIL(&profiles, pro, pro_link);
if (save)
profile_class_save((idnode_t *)pro);
return pro;
}
+void
+profile_release_(profile_t *pro)
+{
+ if (pro->pro_free)
+ pro->pro_free(pro);
+ free(pro->pro_name);
+ free(pro->pro_comment);
+ free(pro);
+}
+
static void
profile_delete(profile_t *pro, int delconf)
{
idnode_unlink(&pro->pro_id);
dvr_config_destroy_by_profile(pro, delconf);
access_destroy_by_profile(pro, delconf);
- if (pro->pro_free)
- pro->pro_free(pro);
- free(pro->pro_name);
- free(pro->pro_comment);
- free(pro);
+ profile_release(pro);
}
static void
}
}
+/*
+ *
+ */
+void
+profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id)
+{
+ memset(prch, 0, sizeof(*prch));
+ if (pro)
+ profile_grab(pro);
+ prch->prch_pro = pro;
+ prch->prch_id = id;
+ streaming_queue_init(&prch->prch_sq, 0, 0);
+}
+
/*
*
*/
int
-profile_chain_raw_open(profile_chain_t *prch, size_t qsize)
+profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
+ uint32_t timeshift_period, int flags)
+{
+ profile_t *pro = prch->prch_pro;
+ if (pro && pro->pro_work)
+ return pro->pro_work(prch, dst, timeshift_period, flags);
+ return -1;
+}
+
+/*
+ *
+ */
+int
+profile_chain_open(profile_chain_t *prch,
+ muxer_config_t *m_cfg, int flags, size_t qsize)
+{
+ profile_t *pro = prch->prch_pro;
+ if (pro && pro->pro_open)
+ return pro->pro_open(prch, m_cfg, flags, qsize);
+ return -1;
+}
+
+/*
+ *
+ */
+int
+profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize)
{
muxer_config_t c;
memset(&c, 0, sizeof(c));
c.m_type = MC_RAW;
memset(prch, 0, sizeof(*prch));
+ prch->prch_id = id;
prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
prch->prch_st = &prch->prch_sq.sq_st;
muxer_destroy(prch->prch_muxer);
streaming_queue_deinit(&prch->prch_sq);
prch->prch_st = NULL;
+ if (prch->prch_pro)
+ profile_release(prch->prch_pro);
}
/*
};
static int
-profile_htsp_work(profile_t *_pro, profile_chain_t *prch,
- void *id, streaming_target_t *dst,
+profile_htsp_work(profile_chain_t *prch,
+ streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
- if (!(flags & PRCH_FLAG_SKIPZEROING))
- memset(prch, 0, sizeof(*prch));
-
if (flags & PRCH_FLAG_TSFIX)
dst = prch->prch_tsfix = tsfix_create(prch->prch_transcoder);
};
static int
-profile_mpegts_pass_open(profile_t *_pro, profile_chain_t *prch, void *id,
+profile_mpegts_pass_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
- profile_mpegts_t *pro = (profile_mpegts_t *)_pro;
+ profile_mpegts_t *pro = (profile_mpegts_t *)prch->prch_pro;
muxer_config_t c;
if (m_cfg)
c.m_rewrite_pat = pro->pro_rewrite_pat;
c.m_rewrite_pmt = pro->pro_rewrite_pmt;
- memset(prch, 0, sizeof(*prch));
prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
prch->prch_muxer = muxer_create(&c);
};
static int
-profile_matroska_open(profile_t *_pro, profile_chain_t *prch, void *id,
+profile_matroska_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
- profile_matroska_t *pro = (profile_matroska_t *)_pro;
+ profile_matroska_t *pro = (profile_matroska_t *)prch->prch_pro;
muxer_config_t c;
if (m_cfg)
if (pro->pro_webm)
c.m_type = MC_WEBM;
- memset(prch, 0, sizeof(*prch));
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
prch->prch_tsfix = tsfix_create(prch->prch_gh);
};
static int
-profile_transcode_work(profile_t *_pro, profile_chain_t *prch,
- void *id, streaming_target_t *dst,
+profile_transcode_work(profile_chain_t *prch,
+ streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
- profile_transcode_t *pro = (profile_transcode_t *)_pro;
+ profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
transcoder_props_t props;
memset(&props, 0, sizeof(props));
props.tp_bandwidth = pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
strncpy(props.tp_language, pro->pro_language ?: "", 3);
- if (!(flags & PRCH_FLAG_SKIPZEROING))
- memset(prch, 0, sizeof(*prch));
-
#if ENABLE_TIMESHIFT
if (timeshift_period > 0)
dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
}
static int
-profile_transcode_open(profile_t *_pro, profile_chain_t *prch, void *id,
+profile_transcode_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
- profile_transcode_t *pro = (profile_transcode_t *)_pro;
+ profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
muxer_config_t c;
int r;
c.m_type = MC_MATROSKA;
}
- memset(prch, 0, sizeof(*prch));
-
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
- r = profile_transcode_work(_pro, prch, prch->prch_gh, id, 0,
+ r = profile_transcode_work(prch, prch->prch_gh, 0,
PRCH_FLAG_SKIPZEROING | PRCH_FLAG_TSFIX);
if (r)
return r;
#define PRCH_FLAG_TSFIX (1<<1)
typedef struct profile_chain {
+ struct profile *prch_pro;
+ void *prch_id;
int prch_flags;
struct streaming_queue prch_sq;
struct streaming_target *prch_st;
idnode_t pro_id;
TAILQ_ENTRY(profile) pro_link;
+ int pro_refcount;
+
LIST_HEAD(,dvr_config) pro_dvr_configs;
LIST_HEAD(,access_entry) pro_accesses;
void (*pro_conf_changed)(struct profile *pro);
muxer_container_type_t (*pro_get_mc)(struct profile *pro);
- int (*pro_work)(struct profile *pro, profile_chain_t *prch,
- void *id, struct streaming_target *dst,
+ int (*pro_work)(profile_chain_t *prch, struct streaming_target *dst,
uint32_t timeshift_period, int flags);
- int (*pro_open)(struct profile *pro, profile_chain_t *prch, void *id,
+ int (*pro_open)(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize);
} profile_t;
profile_t *profile_create
(const char *uuid, htsmsg_t *conf, int save);
-static inline int
-profile_work(profile_t *pro, profile_chain_t *prch,
- void *id, struct streaming_target *dst,
- uint32_t timeshift_period, int flags)
- { return pro && pro->pro_work ? pro->pro_work(pro, prch, id, dst, timeshift_period, flags) : -1; }
-
-static inline int
-profile_chain_open(profile_t *pro, profile_chain_t *prch, void *id,
- muxer_config_t *m_cfg, int flags, size_t qsize)
- { return pro && pro->pro_open ? pro->pro_open(pro, prch, id, m_cfg, flags, qsize) : -1; }
-int profile_chain_raw_open(profile_chain_t *prch, size_t qsize);
+static inline void profile_grab( profile_t *pro )
+ { pro->pro_refcount++; }
+
+void profile_release_( profile_t *pro );
+static inline void profile_release( profile_t *pro )
+ {
+ assert(pro->pro_refcount > 0);
+ if (--pro->pro_refcount == 0) profile_release_(pro);
+ }
+
+int
+profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
+ uint32_t timeshift_period, int flags);
+int
+profile_chain_open(profile_chain_t *prch,
+ muxer_config_t *m_cfg, int flags, size_t qsize);
+void profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id);
+int profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize);
void profile_chain_close(profile_chain_t *prch);
static inline profile_t *profile_find_by_uuid(const char *uuid)
service_mapper_thread ( void *aux )
{
service_t *s;
+ profile_chain_t prch;
th_subscription_t *sub;
int run, working = 0;
- streaming_queue_t sq;
+ streaming_queue_t *sq;
streaming_message_t *sm;
const char *err = NULL;
- streaming_queue_init(&sq, 0, 0);
+ profile_chain_init(&prch, NULL, NULL);
+ sq = &prch.prch_sq;
pthread_mutex_lock(&global_lock);
/* Subscribe */
tvhinfo("service_mapper", "checking %s", s->s_nicename);
- sub = subscription_create_from_service(s, NULL, SUBSCRIPTION_PRIO_MAPPER,
- "service_mapper", &sq.sq_st,
+ prch.prch_id = s;
+ sub = subscription_create_from_service(&prch, SUBSCRIPTION_PRIO_MAPPER,
+ "service_mapper",
0, NULL, NULL, "service_mapper");
/* Failed */
/* Wait */
run = 1;
- pthread_mutex_lock(&sq.sq_mutex);
+ pthread_mutex_lock(&sq->sq_mutex);
while(tvheadend_running && run) {
/* Wait for message */
- while((sm = TAILQ_FIRST(&sq.sq_queue)) == NULL) {
- pthread_cond_wait(&sq.sq_cond, &sq.sq_mutex);
+ while((sm = TAILQ_FIRST(&sq->sq_queue)) == NULL) {
+ pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
if (!tvheadend_running)
break;
}
if (!tvheadend_running)
break;
- TAILQ_REMOVE(&sq.sq_queue, sm, sm_link);
- pthread_mutex_unlock(&sq.sq_mutex);
+ TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+ pthread_mutex_unlock(&sq->sq_mutex);
if(sm->sm_type == SMT_PACKET) {
run = 0;
}
streaming_msg_free(sm);
- pthread_mutex_lock(&sq.sq_mutex);
+ pthread_mutex_lock(&sq->sq_mutex);
}
if (!tvheadend_running)
break;
- streaming_queue_clear(&sq.sq_queue);
- pthread_mutex_unlock(&sq.sq_mutex);
+ streaming_queue_clear(&sq->sq_queue);
+ pthread_mutex_unlock(&sq->sq_mutex);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(sub);
}
pthread_mutex_unlock(&global_lock);
+ profile_chain_close(&prch);
return NULL;
}
*/
th_subscription_t *
subscription_create
- (profile_t *pro, int weight, const char *name, streaming_target_t *st,
+ (profile_chain_t *prch, int weight, const char *name,
int flags, st_callback_t *cb, const char *hostname,
const char *username, const char *client)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
+ profile_t *pro = prch ? prch->prch_pro : NULL;
+ streaming_target_t *st = prch ? prch->prch_st : NULL;
int reject = 0;
static int tally;
+
TAILQ_INIT(&s->ths_instances);
if(flags & SUBSCRIPTION_NONE)
streaming_target_init(&s->ths_input, cb, s, reject);
+ s->ths_prch = prch->prch_st ? prch : NULL;
s->ths_weight = weight;
s->ths_title = strdup(name);
s->ths_hostname = hostname ? strdup(hostname) : NULL;
*
*/
static th_subscription_t *
-subscription_create_from_channel_or_service(channel_t *ch,
- service_t *t,
- profile_t *pro,
+subscription_create_from_channel_or_service(profile_chain_t *prch,
unsigned int weight,
const char *name,
- streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
- const char *client)
+ const char *client,
+ int service)
{
th_subscription_t *s;
- assert(!ch || !t);
- assert(st);
+ channel_t *ch = NULL;
+ service_t *t = NULL;
+
+ assert(prch);
+ assert(prch->prch_id);
+ assert(prch->prch_st);
+
+ if (service)
+ t = prch->prch_id;
+ else
+ ch = prch->prch_id;
- s = subscription_create(pro, weight, name, st, flags, subscription_input,
+ s = subscription_create(prch, weight, name, flags, subscription_input,
hostname, username, client);
if (ch)
tvhtrace("subscription", "%04X: creating subscription for %s weight %d",
}
th_subscription_t *
-subscription_create_from_channel(channel_t *ch, profile_t *pro,
+subscription_create_from_channel(profile_chain_t *prch,
unsigned int weight,
- const char *name, streaming_target_t *st,
+ const char *name,
int flags, const char *hostname,
const char *username, const char *client)
{
return subscription_create_from_channel_or_service
- (ch, NULL, pro, weight, name, st, flags, hostname, username, client);
+ (prch, weight, name, flags, hostname, username, client, 0);
}
/**
*
*/
th_subscription_t *
-subscription_create_from_service(service_t *t, profile_t *pro,
+subscription_create_from_service(profile_chain_t *prch,
unsigned int weight,
const char *name,
- streaming_target_t *st, int flags,
+ int flags,
const char *hostname, const char *username,
const char *client)
{
return subscription_create_from_channel_or_service
- (NULL, t, pro, weight, name, st, flags, hostname, username, client);
+ (prch, weight, name, flags, hostname, username, client, 1);
}
/**
}
th_subscription_t *
-subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro,
+subscription_create_from_mux(profile_chain_t *prch,
unsigned int weight,
const char *name,
- streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client,
int *err)
{
+ mpegts_mux_t *mm = prch->prch_id;
th_subscription_t *s;
streaming_message_t *sm;
streaming_start_t *ss;
}
/* Create subscription */
- if (!st)
+ if (!prch->prch_st)
flags |= SUBSCRIPTION_NONE;
- s = subscription_create(pro, weight, name, st, flags, NULL,
+ s = subscription_create(prch, weight, name, flags, NULL,
hostname, username, client);
s->ths_mmi = mm->mm_active;
LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
/* Connect */
- if (st)
+ if (prch->prch_st)
streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input);
/* Deliver a start message */
subscription_dummy_join(const char *id, int first)
{
service_t *t = service_find_by_identifier(id);
+ profile_chain_t *prch;
streaming_target_t *st;
th_subscription_t *s;
return;
}
- st = calloc(1, sizeof(streaming_target_t));
+ prch = calloc(1, sizeof(*prch));
+ prch->prch_id = t;
+ st = calloc(1, sizeof(*st));
streaming_target_init(st, dummy_callback, NULL, 0);
- s = subscription_create_from_service(t, NULL, 1, "dummy", st, 0, NULL, NULL, "dummy");
+ prch->prch_st = st;
+ s = subscription_create_from_service(prch, 1, "dummy", 0, NULL, NULL, "dummy");
tvhlog(LOG_NOTICE, "subscription",
"%04X: Dummy join %s ok", shortid(s), id);
#include "service.h"
-struct profile;
+struct profile_chain;
extern struct th_subscription_list subscriptions;
LIST_ENTRY(th_subscription) ths_global_link;
LIST_ENTRY(th_subscription) ths_remove_link;
+
+ struct profile_chain *ths_prch;
+
int ths_weight;
enum {
void subscription_reschedule(void);
th_subscription_t *
-subscription_create_from_channel(struct channel *ch,
- struct profile *pro,
+subscription_create_from_channel(struct profile_chain *prch,
unsigned int weight,
const char *name,
- streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
th_subscription_t *
-subscription_create_from_service(struct service *t,
- struct profile *pro,
+subscription_create_from_service(struct profile_chain *prch,
unsigned int weight,
const char *name,
- streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
#if ENABLE_MPEGTS
struct mpegts_mux;
th_subscription_t *
-subscription_create_from_mux(struct mpegts_mux *m,
- struct profile *pro,
+subscription_create_from_mux(struct profile_chain *prch,
unsigned int weight,
const char *name,
- streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client, int *err);
#endif
-th_subscription_t *subscription_create(struct profile *pro,
+th_subscription_t *subscription_create(struct profile_chain *prch,
int weight, const char *name,
- streaming_target_t *st,
int flags, st_callback_t *cb,
const char *hostname,
const char *username,
else
qsize = 1500000;
- if (!profile_chain_open(pro, &prch, service, NULL, 0, qsize)) {
+ profile_chain_init(&prch, pro, service);
+ if (!profile_chain_open(&prch, NULL, 0, qsize)) {
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
- s = subscription_create_from_service(service, pro, weight ?: 100, "HTTP",
- prch.prch_st,
+ s = subscription_create_from_service(&prch, weight ?: 100, "HTTP",
prch.prch_flags | SUBSCRIPTION_STREAMING,
addrbuf,
hc->hc_username,
else
qsize = 10000000;
- if (!profile_chain_raw_open(&prch, qsize)) {
+ if (!profile_chain_raw_open(&prch, mm, qsize)) {
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
- s = subscription_create_from_mux(mm, NULL, weight ?: 10, "HTTP",
- prch.prch_st,
+ s = subscription_create_from_mux(&prch, weight ?: 10, "HTTP",
prch.prch_flags |
SUBSCRIPTION_FULLMUX |
SUBSCRIPTION_STREAMING,
else
qsize = 1500000;
- if (!profile_chain_open(pro, &prch, ch, NULL, 0, qsize)) {
+ profile_chain_init(&prch, pro, ch);
+ if (!profile_chain_open(&prch, NULL, 0, qsize)) {
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
- s = subscription_create_from_channel(ch, pro, weight ?: 100, "HTTP",
- prch.prch_st, prch.prch_flags | SUBSCRIPTION_STREAMING,
+ s = subscription_create_from_channel(&prch, weight ?: 100, "HTTP",
+ prch.prch_flags | SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
http_arg_get(&hc->hc_args, "User-Agent"));