static struct htsp_connection_list htsp_connections;
static void htsp_streaming_input(void *opaque, streaming_message_t *sm);
+static htsmsg_t *htsp_streaming_input_info(void *opaque, htsmsg_t *list);
const char * _htsp_get_subscription_status(int smcode);
static void htsp_epg_send_waiting(struct htsp_connection *, int64_t mintime);
+static streaming_ops_t htsp_streaming_input_ops = {
+ .st_cb = htsp_streaming_input,
+ .st_info = htsp_streaming_input_info
+};
+
/**
*
*/
htsp_init_queue(&hs->hs_q, 0);
hs->hs_sid = sid;
- streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0);
+ streaming_target_init(&hs->hs_input, &htsp_streaming_input_ops, hs, 0);
#if ENABLE_TIMESHIFT
if (timeshiftPeriod != 0) {
}
streaming_msg_free(sm);
}
+
+static htsmsg_t *
+htsp_streaming_input_info(void *opaque, htsmsg_t *list)
+{
+ char buf[512];
+ htsp_subscription_t *hs = opaque;
+ snprintf(buf, sizeof(buf), "htsp input: %s", hs->hs_htsp->htsp_logname);
+ htsmsg_add_str(list, NULL, buf);
+ return list;
+}
#include "profile.h"
static void mpegts_mux_sched_timer ( void *p );
-static void mpegts_mux_sched_input ( void *p, streaming_message_t *sm );
mpegts_mux_sched_list_t mpegts_mux_sched_all;
streaming_msg_free(sm);
}
+static htsmsg_t *
+mpegts_mux_sched_input_info ( void *p, htsmsg_t *list )
+{
+ htsmsg_add_str(list, NULL, "mux sched input");
+ return list;
+}
+
+static streaming_ops_t mpegts_mux_sched_input_ops = {
+ .st_cb = mpegts_mux_sched_input,
+ .st_info = mpegts_mux_sched_input_info
+};
+
/******************************************************************************
* Timer
*****************************************************************************/
LIST_INSERT_HEAD(&mpegts_mux_sched_all, mms, mms_link);
/* Initialise */
- streaming_target_init(&mms->mms_input, mpegts_mux_sched_input, mms, 0);
+ streaming_target_init(&mms->mms_input, &mpegts_mux_sched_input_ops, mms, 0);
/* Load conf */
if (conf)
gh_hold(gh, sm);
}
+static htsmsg_t *
+globalheaders_input_info(void *opaque, htsmsg_t *list)
+{
+ globalheaders_t *gh = opaque;
+ streaming_target_t *st = gh->gh_output;
+ htsmsg_add_str(list, NULL, "globalheaders input");
+ return st->st_ops.st_info(st->st_opaque, list);
+}
+
+static streaming_ops_t globalheaders_input_ops = {
+ .st_cb = globalheaders_input,
+ .st_info = globalheaders_input_info
+};
+
/**
*
TAILQ_INIT(&gh->gh_holdq);
gh->gh_output = output;
- streaming_target_init(&gh->gh_input, globalheaders_input, gh, 0);
+ streaming_target_init(&gh->gh_input, &globalheaders_input_ops, gh, 0);
return &gh->gh_input;
}
static void
transcoder_input(void *opaque, streaming_message_t *sm)
{
- transcoder_t *t;
+ transcoder_t *t = opaque;
streaming_start_t *ss;
- t = opaque;
-
switch (sm->sm_type) {
case SMT_PACKET:
transcoder_packet(t, sm->sm_data);
}
}
+static htsmsg_t *
+transcoder_input_info(void *opaque, htsmsg_t *list)
+{
+ transcoder_t *t = opaque;
+ streaming_target_t *st = t->t_output;
+ htsmsg_add_str(list, NULL, "transcoder input");
+ return st->st_ops.st_info(st->st_opaque, list);;
+}
+
+static streaming_ops_t transcoder_input_ops = {
+ .st_cb = transcoder_input,
+ .st_info = transcoder_input_info
+};
+
+
/**
*
if (!t->t_id) t->t_id = ++transcoder_id;
t->t_output = output;
- streaming_target_init(&t->t_input, transcoder_input, t, 0);
+ streaming_target_init(&t->t_input, &transcoder_input_ops, t, 0);
return &t->t_input;
}
streaming_target_deliver2(tf->tf_output, sm);
}
+static htsmsg_t *
+tsfix_input_info(void *opaque, htsmsg_t *list)
+{
+ tsfix_t *tf = opaque;
+ streaming_target_t *st = tf->tf_output;
+ htsmsg_add_str(list, NULL, "tsfix input");
+ return st->st_ops.st_info(st->st_opaque, list);
+}
+
+static streaming_ops_t tsfix_input_ops = {
+ .st_cb = tsfix_input,
+ .st_info = tsfix_input_info
+};
+
/**
*
tf->tf_output = output;
tf->tf_start_time = mclk();
- streaming_target_init(&tf->tf_input, tsfix_input, tf, 0);
+ streaming_target_init(&tf->tf_input, &tsfix_input_ops, tf, 0);
return &tf->tf_input;
}
profile_deliver(prch, sm);
}
+static htsmsg_t *
+profile_input_info(void *opaque, htsmsg_t *list)
+{
+ profile_chain_t *prch = opaque;
+ streaming_target_t *st = prch->prch_share;
+ htsmsg_add_str(list, NULL, "profile input");
+ st->st_ops.st_info(st->st_opaque, list);
+ st = prch->prch_post_share;
+ return st->st_ops.st_info(st->st_opaque, list);
+}
+
+static streaming_ops_t profile_input_ops = {
+ .st_cb = profile_input,
+ .st_info = profile_input_info
+};
+
/*
*
*/
streaming_msg_free(sm);
}
+static htsmsg_t *
+profile_sharer_input_info(void *opaque, htsmsg_t *list)
+{
+ htsmsg_add_str(list, NULL, "profile sharer input");
+ return list;
+}
+
+static streaming_ops_t profile_sharer_input_ops = {
+ .st_cb = profile_sharer_input,
+ .st_info = profile_sharer_input_info
+};
+
/*
*
*/
}
if (!prsh) {
prsh = calloc(1, sizeof(*prsh));
- streaming_target_init(&prsh->prsh_input, profile_sharer_input, prsh, 0);
+ streaming_target_init(&prsh->prsh_input, &profile_sharer_input_ops, prsh, 0);
LIST_INIT(&prsh->prsh_chains);
}
return prsh;
prch->prch_share = prsh->prsh_tsfix;
prch->prch_flags = SUBSCRIPTION_PACKET;
- streaming_target_init(&prch->prch_input, profile_input, prch, 0);
+ streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
prch->prch_st = &prch->prch_input;
return 0;
prsh->prsh_tsfix = tsfix_create(dst);
}
prch->prch_share = prsh->prsh_tsfix;
- streaming_target_init(&prch->prch_input, profile_input, prch, 0);
+ streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
prch->prch_st = &prch->prch_input;
return 0;
fail:
*
*/
void
-streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque,
- int reject_filter)
+streaming_target_init(streaming_target_t *st, streaming_ops_t *ops,
+ void *opaque, int reject_filter)
{
- st->st_cb = cb;
+ st->st_ops = *ops;
st->st_opaque = opaque;
st->st_reject_filter = reject_filter;
}
pthread_mutex_unlock(&sq->sq_mutex);
}
+/**
+ *
+ */
+static htsmsg_t *
+streaming_queue_info(void *opaque, htsmsg_t *list)
+{
+ streaming_queue_t *sq = opaque;
+ char buf[256];
+ snprintf(buf, sizeof(buf), "streaming queue %p size %zd", sq, sq->sq_size);
+ htsmsg_add_str(list, NULL, buf);
+ return list;
+}
+
/**
*
*/
void
streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize)
{
- streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter);
+ static streaming_ops_t ops = {
+ .st_cb = streaming_queue_deliver,
+ .st_info = streaming_queue_info
+ };
+
+ streaming_target_init(&sq->sq_st, &ops, sq, reject_filter);
pthread_mutex_init(&sq->sq_mutex, NULL);
tvh_cond_init(&sq->sq_cond);
void streaming_pad_init(streaming_pad_t *sp);
void streaming_target_init(streaming_target_t *st,
- st_callback_t *cb, void *opaque,
+ streaming_ops_t *ops, void *opaque,
int reject_filter);
void streaming_queue_init
static inline void
streaming_target_deliver(streaming_target_t *st, streaming_message_t *sm)
- { st->st_cb(st->st_opaque, sm); }
+ { st->st_ops.st_cb(st->st_opaque, sm); }
void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm);
tvhinfo(LS_SUBSCRIPTION, "%04X: %s", shortid(s), buf);
service_source_info_free(&si);
+
+ if (tvhtrace_enabled()) {
+ htsmsg_t *list = htsmsg_create_list();
+ htsmsg_field_t *f;
+ const char *x;
+ int i = 1;
+ s->ths_input.st_ops.st_info(s->ths_input.st_opaque, list);
+ HTSMSG_FOREACH(f, list)
+ if ((x = htsmsg_field_get_str(f)) != NULL) {
+ tvhtrace(LS_SUBSCRIPTION, "%04X: chain %02d: %s", shortid(s), i++, x);
+ }
+ htsmsg_destroy(list);
+ }
}
/**
streaming_msg_free(sm);
}
+static htsmsg_t *
+subscription_input_null_info(void *opaque, htsmsg_t *list)
+{
+ htsmsg_add_str(list, NULL, "null input");
+ return list;
+}
+
+static streaming_ops_t subscription_input_null_ops = {
+ .st_cb = subscription_input_null,
+ .st_info = subscription_input_null_info
+};
+
/**
*
*/
streaming_target_deliver(s->ths_output, sm);
}
+static htsmsg_t *
+subscription_input_direct_info(void *opaque, htsmsg_t *list)
+{
+ htsmsg_add_str(list, NULL, "direct input");
+ return list;
+}
+
+static streaming_ops_t subscription_input_direct_ops = {
+ .st_cb = subscription_input_direct,
+ .st_info = subscription_input_direct_info
+};
+
/**
* This callback is invoked when we receive data and status updates from
* the currently bound service
subscription_input_direct(s, sm);
}
+static htsmsg_t *
+subscription_input_info(void *opaque, htsmsg_t *list)
+{
+ th_subscription_t *s = opaque;
+ streaming_target_t *st = s->ths_output;
+ htsmsg_add_str(list, NULL, "input");
+ return st->st_ops.st_info(st->st_opaque, list);
+}
+
+static streaming_ops_t subscription_input_ops = {
+ .st_cb = subscription_input,
+ .st_info = subscription_input_info
+};
+
+
/* **************************************************************************
* Destroy subscriptions
* *************************************************************************/
{
streaming_msg_free(s->ths_start_message);
- if(s->ths_output->st_cb == subscription_input_null)
+ if(s->ths_output->st_ops.st_cb == subscription_input_null)
free(s->ths_output);
free(s->ths_title);
th_subscription_t *
subscription_create
(profile_chain_t *prch, int weight, const char *name,
- int flags, st_callback_t *cb, const char *hostname,
+ int flags, streaming_ops_t *ops, const char *hostname,
const char *username, const char *client)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
abort();
}
- if (!cb) cb = subscription_input_direct;
+ if (!ops) ops = &subscription_input_direct_ops;
if (!st) {
st = calloc(1, sizeof(streaming_target_t));
- streaming_target_init(st, subscription_input_null, s, 0);
+ streaming_target_init(st, &subscription_input_null_ops, s, 0);
}
- streaming_target_init(&s->ths_input, cb, s, reject);
+ streaming_target_init(&s->ths_input, ops, s, reject);
s->ths_prch = prch && prch->prch_st ? prch : NULL;
s->ths_title = strdup(name);
if (!service)
ch = prch->prch_id;
- s = subscription_create(prch, weight, name, flags, subscription_input,
+ s = subscription_create(prch, weight, name, flags, &subscription_input_ops,
hostname, username, client);
if (tvhtrace_enabled()) {
const char *pro_name = prch->prch_pro ? profile_get_name(prch->prch_pro) : "<none>";
streaming_msg_free(sm);
}
+static htsmsg_t *
+dummy_info(void *opaque, htsmsg_t *list)
+{
+ htsmsg_add_str(list, NULL, "null input");
+ return list;
+}
+
+static streaming_ops_t dummy_ops = {
+ .st_cb = dummy_callback,
+ .st_info = dummy_info
+};
+
+
static mtimer_t dummy_sub_timer;
/**
*
prch = calloc(1, sizeof(*prch));
prch->prch_id = t;
st = calloc(1, sizeof(*st));
- streaming_target_init(st, dummy_callback, NULL, 0);
+ streaming_target_init(st, &dummy_ops, NULL, 0);
prch->prch_st = st;
s = subscription_create_from_service(prch, NULL, 1, "dummy", 0, NULL, NULL, "dummy", NULL);
th_subscription_t *subscription_create(struct profile_chain *prch,
int weight, const char *name,
- int flags, st_callback_t *cb,
+ int flags, streaming_ops_t *ops,
const char *hostname,
const char *username,
const char *client);
}
}
+static htsmsg_t *
+timeshift_input_info(void *opaque, htsmsg_t *list)
+{
+ htsmsg_add_str(list, NULL, "wtimeshift input");
+ return list;
+}
+
+static streaming_ops_t timeshift_input_ops = {
+ .st_cb = timeshift_input,
+ .st_info = timeshift_input_info
+};
+
+
/**
*
*/
/* Initialise input */
streaming_queue_init(&ts->wr_queue, 0, 0);
- streaming_target_init(&ts->input, timeshift_input, ts, 0);
+ streaming_target_init(&ts->input, ×hift_input_ops, ts, 0);
tvhthread_create(&ts->wr_thread, NULL, timeshift_writer, ts, "tshift-wr");
tvhthread_create(&ts->rd_thread, NULL, timeshift_reader, ts, "tshift-rd");
* A streaming target receives data.
*/
-typedef void (st_callback_t)(void *opauqe, streaming_message_t *sm);
+typedef void (st_callback_t)(void *opaque, streaming_message_t *sm);
+
+typedef struct {
+ st_callback_t *st_cb;
+ htsmsg_t *(*st_info)(void *opaque, htsmsg_t *list);
+} streaming_ops_t;
typedef struct streaming_target {
LIST_ENTRY(streaming_target) st_link;
streaming_pad_t *st_pad; /* Source we are linked to */
- st_callback_t *st_cb;
+ streaming_ops_t st_ops;
void *st_opaque;
int st_reject_filter;
} streaming_target_t;