From: Jaroslav Kysela Date: Wed, 24 Aug 2016 13:29:35 +0000 (+0200) Subject: streaming target: add streaming_ops_t/st_info to show the message chain X-Git-Tag: v4.2.1~351 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=54715ffd4f87a56b070aa655427acbda83e3872a;p=thirdparty%2Ftvheadend.git streaming target: add streaming_ops_t/st_info to show the message chain --- diff --git a/src/htsp_server.c b/src/htsp_server.c index 8cec35c15..f1177c283 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -94,9 +94,15 @@ static struct htsp_connection_list htsp_async_connections; static struct htsp_connection_list htsp_connections; static void htsp_streaming_input(void *opaque, streaming_message_t *sm); +static htsmsg_t *htsp_streaming_input_info(void *opaque, htsmsg_t *list); const char * _htsp_get_subscription_status(int smcode); static void htsp_epg_send_waiting(struct htsp_connection *, int64_t mintime); +static streaming_ops_t htsp_streaming_input_ops = { + .st_cb = htsp_streaming_input, + .st_info = htsp_streaming_input_info +}; + /** * */ @@ -2378,7 +2384,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) htsp_init_queue(&hs->hs_q, 0); hs->hs_sid = sid; - streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0); + streaming_target_init(&hs->hs_input, &htsp_streaming_input_ops, hs, 0); #if ENABLE_TIMESHIFT if (timeshiftPeriod != 0) { @@ -4278,3 +4284,13 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm) } streaming_msg_free(sm); } + +static htsmsg_t * +htsp_streaming_input_info(void *opaque, htsmsg_t *list) +{ + char buf[512]; + htsp_subscription_t *hs = opaque; + snprintf(buf, sizeof(buf), "htsp input: %s", hs->hs_htsp->htsp_logname); + htsmsg_add_str(list, NULL, buf); + return list; +} diff --git a/src/input/mpegts/mpegts_mux_sched.c b/src/input/mpegts/mpegts_mux_sched.c index 83e594e55..640a078bc 100644 --- a/src/input/mpegts/mpegts_mux_sched.c +++ b/src/input/mpegts/mpegts_mux_sched.c @@ -25,7 +25,6 @@ #include "profile.h" static void mpegts_mux_sched_timer ( void *p ); -static void mpegts_mux_sched_input ( void *p, streaming_message_t *sm ); mpegts_mux_sched_list_t mpegts_mux_sched_all; @@ -190,6 +189,18 @@ mpegts_mux_sched_input ( void *p, streaming_message_t *sm ) streaming_msg_free(sm); } +static htsmsg_t * +mpegts_mux_sched_input_info ( void *p, htsmsg_t *list ) +{ + htsmsg_add_str(list, NULL, "mux sched input"); + return list; +} + +static streaming_ops_t mpegts_mux_sched_input_ops = { + .st_cb = mpegts_mux_sched_input, + .st_info = mpegts_mux_sched_input_info +}; + /****************************************************************************** * Timer *****************************************************************************/ @@ -293,7 +304,7 @@ mpegts_mux_sched_create ( const char *uuid, htsmsg_t *conf ) LIST_INSERT_HEAD(&mpegts_mux_sched_all, mms, mms_link); /* Initialise */ - streaming_target_init(&mms->mms_input, mpegts_mux_sched_input, mms, 0); + streaming_target_init(&mms->mms_input, &mpegts_mux_sched_input_ops, mms, 0); /* Load conf */ if (conf) diff --git a/src/plumbing/globalheaders.c b/src/plumbing/globalheaders.c index c5b8c6d19..7ccd67cd7 100644 --- a/src/plumbing/globalheaders.c +++ b/src/plumbing/globalheaders.c @@ -413,6 +413,20 @@ globalheaders_input(void *opaque, streaming_message_t *sm) gh_hold(gh, sm); } +static htsmsg_t * +globalheaders_input_info(void *opaque, htsmsg_t *list) +{ + globalheaders_t *gh = opaque; + streaming_target_t *st = gh->gh_output; + htsmsg_add_str(list, NULL, "globalheaders input"); + return st->st_ops.st_info(st->st_opaque, list); +} + +static streaming_ops_t globalheaders_input_ops = { + .st_cb = globalheaders_input, + .st_info = globalheaders_input_info +}; + /** * @@ -425,7 +439,7 @@ globalheaders_create(streaming_target_t *output) TAILQ_INIT(&gh->gh_holdq); gh->gh_output = output; - streaming_target_init(&gh->gh_input, globalheaders_input, gh, 0); + streaming_target_init(&gh->gh_input, &globalheaders_input_ops, gh, 0); return &gh->gh_input; } diff --git a/src/plumbing/transcoding.c b/src/plumbing/transcoding.c index 5ae96213e..630bb62b8 100644 --- a/src/plumbing/transcoding.c +++ b/src/plumbing/transcoding.c @@ -2022,11 +2022,9 @@ transcoder_stop(transcoder_t *t) static void transcoder_input(void *opaque, streaming_message_t *sm) { - transcoder_t *t; + transcoder_t *t = opaque; streaming_start_t *ss; - t = opaque; - switch (sm->sm_type) { case SMT_PACKET: transcoder_packet(t, sm->sm_data); @@ -2062,6 +2060,21 @@ transcoder_input(void *opaque, streaming_message_t *sm) } } +static htsmsg_t * +transcoder_input_info(void *opaque, htsmsg_t *list) +{ + transcoder_t *t = opaque; + streaming_target_t *st = t->t_output; + htsmsg_add_str(list, NULL, "transcoder input"); + return st->st_ops.st_info(st->st_opaque, list);; +} + +static streaming_ops_t transcoder_input_ops = { + .st_cb = transcoder_input, + .st_info = transcoder_input_info +}; + + /** * @@ -2076,7 +2089,7 @@ transcoder_create(streaming_target_t *output) if (!t->t_id) t->t_id = ++transcoder_id; t->t_output = output; - streaming_target_init(&t->t_input, transcoder_input, t, 0); + streaming_target_init(&t->t_input, &transcoder_input_ops, t, 0); return &t->t_input; } diff --git a/src/plumbing/tsfix.c b/src/plumbing/tsfix.c index 8909ec7f5..cf1a465ed 100644 --- a/src/plumbing/tsfix.c +++ b/src/plumbing/tsfix.c @@ -593,6 +593,20 @@ tsfix_input(void *opaque, streaming_message_t *sm) streaming_target_deliver2(tf->tf_output, sm); } +static htsmsg_t * +tsfix_input_info(void *opaque, htsmsg_t *list) +{ + tsfix_t *tf = opaque; + streaming_target_t *st = tf->tf_output; + htsmsg_add_str(list, NULL, "tsfix input"); + return st->st_ops.st_info(st->st_opaque, list); +} + +static streaming_ops_t tsfix_input_ops = { + .st_cb = tsfix_input, + .st_info = tsfix_input_info +}; + /** * @@ -607,7 +621,7 @@ tsfix_create(streaming_target_t *output) tf->tf_output = output; tf->tf_start_time = mclk(); - streaming_target_init(&tf->tf_input, tsfix_input, tf, 0); + streaming_target_init(&tf->tf_input, &tsfix_input_ops, tf, 0); return &tf->tf_input; } diff --git a/src/profile.c b/src/profile.c index 985603b5a..1f7f73433 100644 --- a/src/profile.c +++ b/src/profile.c @@ -658,6 +658,22 @@ direct: profile_deliver(prch, sm); } +static htsmsg_t * +profile_input_info(void *opaque, htsmsg_t *list) +{ + profile_chain_t *prch = opaque; + streaming_target_t *st = prch->prch_share; + htsmsg_add_str(list, NULL, "profile input"); + st->st_ops.st_info(st->st_opaque, list); + st = prch->prch_post_share; + return st->st_ops.st_info(st->st_opaque, list); +} + +static streaming_ops_t profile_input_ops = { + .st_cb = profile_input, + .st_info = profile_input_info +}; + /* * */ @@ -734,6 +750,18 @@ profile_sharer_input(void *opaque, streaming_message_t *sm) streaming_msg_free(sm); } +static htsmsg_t * +profile_sharer_input_info(void *opaque, htsmsg_t *list) +{ + htsmsg_add_str(list, NULL, "profile sharer input"); + return list; +} + +static streaming_ops_t profile_sharer_input_ops = { + .st_cb = profile_sharer_input, + .st_info = profile_sharer_input_info +}; + /* * */ @@ -755,7 +783,7 @@ profile_sharer_find(profile_chain_t *prch) } if (!prsh) { prsh = calloc(1, sizeof(*prsh)); - streaming_target_init(&prsh->prsh_input, profile_sharer_input, prsh, 0); + streaming_target_init(&prsh->prsh_input, &profile_sharer_input_ops, prsh, 0); LIST_INIT(&prsh->prsh_chains); } return prsh; @@ -1008,7 +1036,7 @@ profile_htsp_work(profile_chain_t *prch, prch->prch_share = prsh->prsh_tsfix; prch->prch_flags = SUBSCRIPTION_PACKET; - streaming_target_init(&prch->prch_input, profile_input, prch, 0); + streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0); prch->prch_st = &prch->prch_input; return 0; @@ -1923,7 +1951,7 @@ profile_transcode_work(profile_chain_t *prch, prsh->prsh_tsfix = tsfix_create(dst); } prch->prch_share = prsh->prsh_tsfix; - streaming_target_init(&prch->prch_input, profile_input, prch, 0); + streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0); prch->prch_st = &prch->prch_input; return 0; fail: diff --git a/src/streaming.c b/src/streaming.c index f6e846b9b..26ddb98df 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -38,10 +38,10 @@ streaming_pad_init(streaming_pad_t *sp) * */ void -streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque, - int reject_filter) +streaming_target_init(streaming_target_t *st, streaming_ops_t *ops, + void *opaque, int reject_filter) { - st->st_cb = cb; + st->st_ops = *ops; st->st_opaque = opaque; st->st_reject_filter = reject_filter; } @@ -86,6 +86,19 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) pthread_mutex_unlock(&sq->sq_mutex); } +/** + * + */ +static htsmsg_t * +streaming_queue_info(void *opaque, htsmsg_t *list) +{ + streaming_queue_t *sq = opaque; + char buf[256]; + snprintf(buf, sizeof(buf), "streaming queue %p size %zd", sq, sq->sq_size); + htsmsg_add_str(list, NULL, buf); + return list; +} + /** * */ @@ -102,7 +115,12 @@ streaming_queue_remove(streaming_queue_t *sq, streaming_message_t *sm) void streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize) { - streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter); + static streaming_ops_t ops = { + .st_cb = streaming_queue_deliver, + .st_info = streaming_queue_info + }; + + streaming_target_init(&sq->sq_st, &ops, sq, reject_filter); pthread_mutex_init(&sq->sq_mutex, NULL); tvh_cond_init(&sq->sq_cond); diff --git a/src/streaming.h b/src/streaming.h index 07fa3537e..4e7e4ca79 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -72,7 +72,7 @@ typedef struct streaming_start { void streaming_pad_init(streaming_pad_t *sp); void streaming_target_init(streaming_target_t *st, - st_callback_t *cb, void *opaque, + streaming_ops_t *ops, void *opaque, int reject_filter); void streaming_queue_init @@ -106,7 +106,7 @@ streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt); static inline void streaming_target_deliver(streaming_target_t *st, streaming_message_t *sm) - { st->st_cb(st->st_opaque, sm); } + { st->st_ops.st_cb(st->st_opaque, sm); } void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm); diff --git a/src/subscriptions.c b/src/subscriptions.c index 5a9943672..cc5bc2162 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -267,6 +267,19 @@ subscription_show_info(th_subscription_t *s) tvhinfo(LS_SUBSCRIPTION, "%04X: %s", shortid(s), buf); service_source_info_free(&si); + + if (tvhtrace_enabled()) { + htsmsg_t *list = htsmsg_create_list(); + htsmsg_field_t *f; + const char *x; + int i = 1; + s->ths_input.st_ops.st_info(s->ths_input.st_opaque, list); + HTSMSG_FOREACH(f, list) + if ((x = htsmsg_field_get_str(f)) != NULL) { + tvhtrace(LS_SUBSCRIPTION, "%04X: chain %02d: %s", shortid(s), i++, x); + } + htsmsg_destroy(list); + } } /** @@ -472,6 +485,18 @@ subscription_input_null(void *opaque, streaming_message_t *sm) streaming_msg_free(sm); } +static htsmsg_t * +subscription_input_null_info(void *opaque, htsmsg_t *list) +{ + htsmsg_add_str(list, NULL, "null input"); + return list; +} + +static streaming_ops_t subscription_input_null_ops = { + .st_cb = subscription_input_null, + .st_info = subscription_input_null_info +}; + /** * */ @@ -496,6 +521,18 @@ subscription_input_direct(void *opauqe, streaming_message_t *sm) streaming_target_deliver(s->ths_output, sm); } +static htsmsg_t * +subscription_input_direct_info(void *opaque, htsmsg_t *list) +{ + htsmsg_add_str(list, NULL, "direct input"); + return list; +} + +static streaming_ops_t subscription_input_direct_ops = { + .st_cb = subscription_input_direct, + .st_info = subscription_input_direct_info +}; + /** * This callback is invoked when we receive data and status updates from * the currently bound service @@ -567,6 +604,21 @@ subscription_input(void *opauqe, streaming_message_t *sm) subscription_input_direct(s, sm); } +static htsmsg_t * +subscription_input_info(void *opaque, htsmsg_t *list) +{ + th_subscription_t *s = opaque; + streaming_target_t *st = s->ths_output; + htsmsg_add_str(list, NULL, "input"); + return st->st_ops.st_info(st->st_opaque, list); +} + +static streaming_ops_t subscription_input_ops = { + .st_cb = subscription_input, + .st_info = subscription_input_info +}; + + /* ************************************************************************** * Destroy subscriptions * *************************************************************************/ @@ -585,7 +637,7 @@ subscription_destroy(th_subscription_t *s) { streaming_msg_free(s->ths_start_message); - if(s->ths_output->st_cb == subscription_input_null) + if(s->ths_output->st_ops.st_cb == subscription_input_null) free(s->ths_output); free(s->ths_title); @@ -675,7 +727,7 @@ subscription_unsubscribe(th_subscription_t *s, int flags) th_subscription_t * subscription_create (profile_chain_t *prch, int weight, const char *name, - int flags, st_callback_t *cb, const char *hostname, + int flags, streaming_ops_t *ops, const char *hostname, const char *username, const char *client) { th_subscription_t *s = calloc(1, sizeof(th_subscription_t)); @@ -700,13 +752,13 @@ subscription_create abort(); } - if (!cb) cb = subscription_input_direct; + if (!ops) ops = &subscription_input_direct_ops; if (!st) { st = calloc(1, sizeof(streaming_target_t)); - streaming_target_init(st, subscription_input_null, s, 0); + streaming_target_init(st, &subscription_input_null_ops, s, 0); } - streaming_target_init(&s->ths_input, cb, s, reject); + streaming_target_init(&s->ths_input, ops, s, reject); s->ths_prch = prch && prch->prch_st ? prch : NULL; s->ths_title = strdup(name); @@ -775,7 +827,7 @@ subscription_create_from_channel_or_service(profile_chain_t *prch, if (!service) ch = prch->prch_id; - s = subscription_create(prch, weight, name, flags, subscription_input, + s = subscription_create(prch, weight, name, flags, &subscription_input_ops, hostname, username, client); if (tvhtrace_enabled()) { const char *pro_name = prch->prch_pro ? profile_get_name(prch->prch_pro) : ""; @@ -1139,6 +1191,19 @@ dummy_callback(void *opauqe, streaming_message_t *sm) streaming_msg_free(sm); } +static htsmsg_t * +dummy_info(void *opaque, htsmsg_t *list) +{ + htsmsg_add_str(list, NULL, "null input"); + return list; +} + +static streaming_ops_t dummy_ops = { + .st_cb = dummy_callback, + .st_info = dummy_info +}; + + static mtimer_t dummy_sub_timer; /** * @@ -1177,7 +1242,7 @@ subscription_dummy_join(const char *id, int first) prch = calloc(1, sizeof(*prch)); prch->prch_id = t; st = calloc(1, sizeof(*st)); - streaming_target_init(st, dummy_callback, NULL, 0); + streaming_target_init(st, &dummy_ops, NULL, 0); prch->prch_st = st; s = subscription_create_from_service(prch, NULL, 1, "dummy", 0, NULL, NULL, "dummy", NULL); diff --git a/src/subscriptions.h b/src/subscriptions.h index e7e76ed1e..86cf81cd1 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -196,7 +196,7 @@ subscription_create_from_mux(struct profile_chain *prch, th_subscription_t *subscription_create(struct profile_chain *prch, int weight, const char *name, - int flags, st_callback_t *cb, + int flags, streaming_ops_t *ops, const char *hostname, const char *username, const char *client); diff --git a/src/timeshift.c b/src/timeshift.c index 5212d9315..60121c24d 100644 --- a/src/timeshift.c +++ b/src/timeshift.c @@ -357,6 +357,19 @@ _exit: } } +static htsmsg_t * +timeshift_input_info(void *opaque, htsmsg_t *list) +{ + htsmsg_add_str(list, NULL, "wtimeshift input"); + return list; +} + +static streaming_ops_t timeshift_input_ops = { + .st_cb = timeshift_input, + .st_info = timeshift_input_info +}; + + /** * */ @@ -442,7 +455,7 @@ streaming_target_t *timeshift_create /* Initialise input */ streaming_queue_init(&ts->wr_queue, 0, 0); - streaming_target_init(&ts->input, timeshift_input, ts, 0); + streaming_target_init(&ts->input, ×hift_input_ops, ts, 0); tvhthread_create(&ts->wr_thread, NULL, timeshift_writer, ts, "tshift-wr"); tvhthread_create(&ts->rd_thread, NULL, timeshift_reader, ts, "tshift-rd"); diff --git a/src/tvheadend.h b/src/tvheadend.h index b34220874..534c017bf 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -580,13 +580,18 @@ typedef struct streaming_message { * A streaming target receives data. */ -typedef void (st_callback_t)(void *opauqe, streaming_message_t *sm); +typedef void (st_callback_t)(void *opaque, streaming_message_t *sm); + +typedef struct { + st_callback_t *st_cb; + htsmsg_t *(*st_info)(void *opaque, htsmsg_t *list); +} streaming_ops_t; typedef struct streaming_target { LIST_ENTRY(streaming_target) st_link; streaming_pad_t *st_pad; /* Source we are linked to */ - st_callback_t *st_cb; + streaming_ops_t st_ops; void *st_opaque; int st_reject_filter; } streaming_target_t;