]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
streaming target: add streaming_ops_t/st_info to show the message chain
authorJaroslav Kysela <perex@perex.cz>
Wed, 24 Aug 2016 13:29:35 +0000 (15:29 +0200)
committerJaroslav Kysela <perex@perex.cz>
Wed, 24 Aug 2016 13:29:35 +0000 (15:29 +0200)
12 files changed:
src/htsp_server.c
src/input/mpegts/mpegts_mux_sched.c
src/plumbing/globalheaders.c
src/plumbing/transcoding.c
src/plumbing/tsfix.c
src/profile.c
src/streaming.c
src/streaming.h
src/subscriptions.c
src/subscriptions.h
src/timeshift.c
src/tvheadend.h

index 8cec35c15cfdd82830ec071cb30d84c58d7d5fb2..f1177c283611d83915bf69d655a908b3334cb872 100644 (file)
@@ -94,9 +94,15 @@ static struct htsp_connection_list htsp_async_connections;
 static struct htsp_connection_list htsp_connections;
 
 static void htsp_streaming_input(void *opaque, streaming_message_t *sm);
+static htsmsg_t *htsp_streaming_input_info(void *opaque, htsmsg_t *list);
 const char * _htsp_get_subscription_status(int smcode);
 static void htsp_epg_send_waiting(struct htsp_connection *, int64_t mintime);
 
+static streaming_ops_t htsp_streaming_input_ops = {
+  .st_cb   = htsp_streaming_input,
+  .st_info = htsp_streaming_input_info
+};
+
 /**
  *
  */
@@ -2378,7 +2384,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
   htsp_init_queue(&hs->hs_q, 0);
 
   hs->hs_sid = sid;
-  streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0);
+  streaming_target_init(&hs->hs_input, &htsp_streaming_input_ops, hs, 0);
 
 #if ENABLE_TIMESHIFT
   if (timeshiftPeriod != 0) {
@@ -4278,3 +4284,13 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
   }
   streaming_msg_free(sm);
 }
+
+static htsmsg_t *
+htsp_streaming_input_info(void *opaque, htsmsg_t *list)
+{
+  char buf[512];
+  htsp_subscription_t *hs = opaque;
+  snprintf(buf, sizeof(buf), "htsp input: %s", hs->hs_htsp->htsp_logname);
+  htsmsg_add_str(list, NULL, buf);
+  return list;
+}
index 83e594e55e88cd78fc58b7d19aa6f6bf4a28250d..640a078bc858eccc7581b98f38a8ec5bd6f76c99 100644 (file)
@@ -25,7 +25,6 @@
 #include "profile.h"
 
 static void mpegts_mux_sched_timer ( void *p );
-static void mpegts_mux_sched_input ( void *p, streaming_message_t *sm );
 
 mpegts_mux_sched_list_t mpegts_mux_sched_all;
 
@@ -190,6 +189,18 @@ mpegts_mux_sched_input ( void *p, streaming_message_t *sm )
   streaming_msg_free(sm);
 }
 
+static htsmsg_t *
+mpegts_mux_sched_input_info ( void *p, htsmsg_t *list )
+{
+  htsmsg_add_str(list, NULL, "mux sched input");
+  return list;
+}
+
+static streaming_ops_t mpegts_mux_sched_input_ops = {
+  .st_cb   = mpegts_mux_sched_input,
+  .st_info = mpegts_mux_sched_input_info
+};
+
 /******************************************************************************
  * Timer
  *****************************************************************************/
@@ -293,7 +304,7 @@ mpegts_mux_sched_create ( const char *uuid, htsmsg_t *conf )
   LIST_INSERT_HEAD(&mpegts_mux_sched_all, mms, mms_link);
 
   /* Initialise */
-  streaming_target_init(&mms->mms_input, mpegts_mux_sched_input, mms, 0);
+  streaming_target_init(&mms->mms_input, &mpegts_mux_sched_input_ops, mms, 0);
 
   /* Load conf */
   if (conf)
index c5b8c6d19e980ea9c470390bed16c8f704e06274..7ccd67cd7e8c6b0877cd1f4f9e887eeb96c1983f 100644 (file)
@@ -413,6 +413,20 @@ globalheaders_input(void *opaque, streaming_message_t *sm)
     gh_hold(gh, sm);
 }
 
+static htsmsg_t *
+globalheaders_input_info(void *opaque, htsmsg_t *list)
+{
+  globalheaders_t *gh = opaque;
+  streaming_target_t *st = gh->gh_output;
+  htsmsg_add_str(list, NULL, "globalheaders input");
+  return st->st_ops.st_info(st->st_opaque, list);
+}
+
+static streaming_ops_t globalheaders_input_ops = {
+  .st_cb   = globalheaders_input,
+  .st_info = globalheaders_input_info
+};
+
 
 /**
  *
@@ -425,7 +439,7 @@ globalheaders_create(streaming_target_t *output)
   TAILQ_INIT(&gh->gh_holdq);
 
   gh->gh_output = output;
-  streaming_target_init(&gh->gh_input, globalheaders_input, gh, 0);
+  streaming_target_init(&gh->gh_input, &globalheaders_input_ops, gh, 0);
   return &gh->gh_input;
 }
 
index 5ae96213ea45d943f8f14780bc21b31862a02793..630bb62b8b832d228eb758ddacdf848f6e55b772 100644 (file)
@@ -2022,11 +2022,9 @@ transcoder_stop(transcoder_t *t)
 static void
 transcoder_input(void *opaque, streaming_message_t *sm)
 {
-  transcoder_t *t;
+  transcoder_t *t = opaque;
   streaming_start_t *ss;
 
-  t = opaque;
-
   switch (sm->sm_type) {
   case SMT_PACKET:
     transcoder_packet(t, sm->sm_data);
@@ -2062,6 +2060,21 @@ transcoder_input(void *opaque, streaming_message_t *sm)
   }
 }
 
+static htsmsg_t *
+transcoder_input_info(void *opaque, htsmsg_t *list)
+{
+  transcoder_t *t = opaque;
+  streaming_target_t *st = t->t_output;
+  htsmsg_add_str(list, NULL, "transcoder input");
+  return st->st_ops.st_info(st->st_opaque, list);;
+}
+
+static streaming_ops_t transcoder_input_ops = {
+  .st_cb   = transcoder_input,
+  .st_info = transcoder_input_info
+};
+
+
 
 /**
  *
@@ -2076,7 +2089,7 @@ transcoder_create(streaming_target_t *output)
   if (!t->t_id) t->t_id = ++transcoder_id;
   t->t_output = output;
 
-  streaming_target_init(&t->t_input, transcoder_input, t, 0);
+  streaming_target_init(&t->t_input, &transcoder_input_ops, t, 0);
 
   return &t->t_input;
 }
index 8909ec7f51a02aad837188bfef0d3d8634772ad5..cf1a465edef75008951caf5b007d2c4f4418b3c9 100644 (file)
@@ -593,6 +593,20 @@ tsfix_input(void *opaque, streaming_message_t *sm)
   streaming_target_deliver2(tf->tf_output, sm);
 }
 
+static htsmsg_t *
+tsfix_input_info(void *opaque, htsmsg_t *list)
+{
+  tsfix_t *tf = opaque;
+  streaming_target_t *st = tf->tf_output;
+  htsmsg_add_str(list, NULL, "tsfix input");
+  return st->st_ops.st_info(st->st_opaque, list);
+}
+
+static streaming_ops_t tsfix_input_ops = {
+  .st_cb   = tsfix_input,
+  .st_info = tsfix_input_info
+};
+
 
 /**
  *
@@ -607,7 +621,7 @@ tsfix_create(streaming_target_t *output)
   tf->tf_output = output;
   tf->tf_start_time = mclk();
 
-  streaming_target_init(&tf->tf_input, tsfix_input, tf, 0);
+  streaming_target_init(&tf->tf_input, &tsfix_input_ops, tf, 0);
   return &tf->tf_input;
 }
 
index 985603b5a6d86a308d6296e777533bf877a391a7..1f7f7343335a4a28f12be1f9026a3bd6f8465d64 100644 (file)
@@ -658,6 +658,22 @@ direct:
   profile_deliver(prch, sm);
 }
 
+static htsmsg_t *
+profile_input_info(void *opaque, htsmsg_t *list)
+{
+  profile_chain_t *prch = opaque;
+  streaming_target_t *st = prch->prch_share;
+  htsmsg_add_str(list, NULL, "profile input");
+  st->st_ops.st_info(st->st_opaque, list);
+  st = prch->prch_post_share;
+  return st->st_ops.st_info(st->st_opaque, list);
+}
+
+static streaming_ops_t profile_input_ops = {
+  .st_cb   = profile_input,
+  .st_info = profile_input_info
+};
+
 /*
  *
  */
@@ -734,6 +750,18 @@ profile_sharer_input(void *opaque, streaming_message_t *sm)
     streaming_msg_free(sm);
 }
 
+static htsmsg_t *
+profile_sharer_input_info(void *opaque, htsmsg_t *list)
+{
+  htsmsg_add_str(list, NULL, "profile sharer input");
+  return list;
+}
+
+static streaming_ops_t profile_sharer_input_ops = {
+  .st_cb   = profile_sharer_input,
+  .st_info = profile_sharer_input_info
+};
+
 /*
  *
  */
@@ -755,7 +783,7 @@ profile_sharer_find(profile_chain_t *prch)
   }
   if (!prsh) {
     prsh = calloc(1, sizeof(*prsh));
-    streaming_target_init(&prsh->prsh_input, profile_sharer_input, prsh, 0);
+    streaming_target_init(&prsh->prsh_input, &profile_sharer_input_ops, prsh, 0);
     LIST_INIT(&prsh->prsh_chains);
   }
   return prsh;
@@ -1008,7 +1036,7 @@ profile_htsp_work(profile_chain_t *prch,
 
   prch->prch_share = prsh->prsh_tsfix;
   prch->prch_flags = SUBSCRIPTION_PACKET;
-  streaming_target_init(&prch->prch_input, profile_input, prch, 0);
+  streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
   prch->prch_st = &prch->prch_input;
   return 0;
 
@@ -1923,7 +1951,7 @@ profile_transcode_work(profile_chain_t *prch,
     prsh->prsh_tsfix = tsfix_create(dst);
   }
   prch->prch_share = prsh->prsh_tsfix;
-  streaming_target_init(&prch->prch_input, profile_input, prch, 0);
+  streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
   prch->prch_st = &prch->prch_input;
   return 0;
 fail:
index f6e846b9b934e0b7520e4a97067a49a8c51fd5cd..26ddb98dfcbacf3d4ea752ca3eb2bfecb7e3a6ec 100644 (file)
@@ -38,10 +38,10 @@ streaming_pad_init(streaming_pad_t *sp)
  *
  */
 void
-streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque,
-                     int reject_filter)
+streaming_target_init(streaming_target_t *st, streaming_ops_t *ops,
+                      void *opaque, int reject_filter)
 {
-  st->st_cb = cb;
+  st->st_ops = *ops;
   st->st_opaque = opaque;
   st->st_reject_filter = reject_filter;
 }
@@ -86,6 +86,19 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
   pthread_mutex_unlock(&sq->sq_mutex);
 }
 
+/**
+ *
+ */
+static htsmsg_t *
+streaming_queue_info(void *opaque, htsmsg_t *list)
+{
+  streaming_queue_t *sq = opaque;
+  char buf[256];
+  snprintf(buf, sizeof(buf), "streaming queue %p size %zd", sq, sq->sq_size);
+  htsmsg_add_str(list, NULL, buf);
+  return list;
+}
+
 /**
  *
  */
@@ -102,7 +115,12 @@ streaming_queue_remove(streaming_queue_t *sq, streaming_message_t *sm)
 void
 streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize)
 {
-  streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter);
+  static streaming_ops_t ops = {
+    .st_cb   = streaming_queue_deliver,
+    .st_info = streaming_queue_info
+  };
+
+  streaming_target_init(&sq->sq_st, &ops, sq, reject_filter);
 
   pthread_mutex_init(&sq->sq_mutex, NULL);
   tvh_cond_init(&sq->sq_cond);
index 07fa3537eee18bf72d85fbf733e9893a2c6a8aaf..4e7e4ca798f723f897392e7b0d711b3e9a374932 100644 (file)
@@ -72,7 +72,7 @@ typedef struct streaming_start {
 void streaming_pad_init(streaming_pad_t *sp);
 
 void streaming_target_init(streaming_target_t *st,
-                          st_callback_t *cb, void *opaque,
+                          streaming_ops_t *ops, void *opaque,
                           int reject_filter);
 
 void streaming_queue_init
@@ -106,7 +106,7 @@ streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt);
 
 static inline void
 streaming_target_deliver(streaming_target_t *st, streaming_message_t *sm)
-  { st->st_cb(st->st_opaque, sm); }
+  { st->st_ops.st_cb(st->st_opaque, sm); }
 
 void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm);
 
index 5a994367217cf421ccc3f77103c374c84ce29542..cc5bc2162787ca50b2bb92c7b34cd78bfb8d992a 100644 (file)
@@ -267,6 +267,19 @@ subscription_show_info(th_subscription_t *s)
 
   tvhinfo(LS_SUBSCRIPTION, "%04X: %s", shortid(s), buf);
   service_source_info_free(&si);
+
+  if (tvhtrace_enabled()) {
+    htsmsg_t *list = htsmsg_create_list();
+    htsmsg_field_t *f;
+    const char *x;
+    int i = 1;
+    s->ths_input.st_ops.st_info(s->ths_input.st_opaque, list);
+    HTSMSG_FOREACH(f, list)
+      if ((x = htsmsg_field_get_str(f)) != NULL) {
+        tvhtrace(LS_SUBSCRIPTION, "%04X:  chain %02d: %s", shortid(s), i++, x);
+      }
+    htsmsg_destroy(list);
+  }
 }
 
 /**
@@ -472,6 +485,18 @@ subscription_input_null(void *opaque, streaming_message_t *sm)
   streaming_msg_free(sm);
 }
 
+static htsmsg_t *
+subscription_input_null_info(void *opaque, htsmsg_t *list)
+{
+  htsmsg_add_str(list, NULL, "null input");
+  return list;
+}
+
+static streaming_ops_t subscription_input_null_ops = {
+  .st_cb   = subscription_input_null,
+  .st_info = subscription_input_null_info
+};
+
 /**
  *
  */
@@ -496,6 +521,18 @@ subscription_input_direct(void *opauqe, streaming_message_t *sm)
   streaming_target_deliver(s->ths_output, sm);
 }
 
+static htsmsg_t *
+subscription_input_direct_info(void *opaque, htsmsg_t *list)
+{
+  htsmsg_add_str(list, NULL, "direct input");
+  return list;
+}
+
+static streaming_ops_t subscription_input_direct_ops = {
+  .st_cb   = subscription_input_direct,
+  .st_info = subscription_input_direct_info
+};
+
 /**
  * This callback is invoked when we receive data and status updates from
  * the currently bound service
@@ -567,6 +604,21 @@ subscription_input(void *opauqe, streaming_message_t *sm)
   subscription_input_direct(s, sm);
 }
 
+static htsmsg_t *
+subscription_input_info(void *opaque, htsmsg_t *list)
+{
+  th_subscription_t *s = opaque;
+  streaming_target_t *st = s->ths_output;
+  htsmsg_add_str(list, NULL, "input");
+  return st->st_ops.st_info(st->st_opaque, list);
+}
+
+static streaming_ops_t subscription_input_ops = {
+  .st_cb   = subscription_input,
+  .st_info = subscription_input_info
+};
+
+
 /* **************************************************************************
  * Destroy subscriptions
  * *************************************************************************/
@@ -585,7 +637,7 @@ subscription_destroy(th_subscription_t *s)
 {
   streaming_msg_free(s->ths_start_message);
 
-  if(s->ths_output->st_cb == subscription_input_null)
+  if(s->ths_output->st_ops.st_cb == subscription_input_null)
    free(s->ths_output);
 
   free(s->ths_title);
@@ -675,7 +727,7 @@ subscription_unsubscribe(th_subscription_t *s, int flags)
 th_subscription_t *
 subscription_create
   (profile_chain_t *prch, int weight, const char *name,
-   int flags, st_callback_t *cb, const char *hostname,
+   int flags, streaming_ops_t *ops, const char *hostname,
    const char *username, const char *client)
 {
   th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
@@ -700,13 +752,13 @@ subscription_create
     abort();
   }
 
-  if (!cb) cb = subscription_input_direct;
+  if (!ops) ops = &subscription_input_direct_ops;
   if (!st) {
     st = calloc(1, sizeof(streaming_target_t));
-    streaming_target_init(st, subscription_input_null, s, 0);
+    streaming_target_init(st, &subscription_input_null_ops, s, 0);
   }
 
-  streaming_target_init(&s->ths_input, cb, s, reject);
+  streaming_target_init(&s->ths_input, ops, s, reject);
 
   s->ths_prch              = prch && prch->prch_st ? prch : NULL;
   s->ths_title             = strdup(name);
@@ -775,7 +827,7 @@ subscription_create_from_channel_or_service(profile_chain_t *prch,
   if (!service)
     ch = prch->prch_id;
 
-  s = subscription_create(prch, weight, name, flags, subscription_input,
+  s = subscription_create(prch, weight, name, flags, &subscription_input_ops,
                           hostname, username, client);
   if (tvhtrace_enabled()) {
     const char *pro_name = prch->prch_pro ? profile_get_name(prch->prch_pro) : "<none>";
@@ -1139,6 +1191,19 @@ dummy_callback(void *opauqe, streaming_message_t *sm)
   streaming_msg_free(sm);
 }
 
+static htsmsg_t *
+dummy_info(void *opaque, htsmsg_t *list)
+{
+  htsmsg_add_str(list, NULL, "null input");
+  return list;
+}
+
+static streaming_ops_t dummy_ops = {
+  .st_cb   = dummy_callback,
+  .st_info = dummy_info
+};
+
+
 static mtimer_t dummy_sub_timer;
 /**
  *
@@ -1177,7 +1242,7 @@ subscription_dummy_join(const char *id, int first)
   prch = calloc(1, sizeof(*prch));
   prch->prch_id = t;
   st = calloc(1, sizeof(*st));
-  streaming_target_init(st, dummy_callback, NULL, 0);
+  streaming_target_init(st, &dummy_ops, NULL, 0);
   prch->prch_st = st;
   s = subscription_create_from_service(prch, NULL, 1, "dummy", 0, NULL, NULL, "dummy", NULL);
 
index e7e76ed1e1b6592ce9b67cf5d4cf300cb33123c9..86cf81cd13b4fef85e12deddecde3e2761a543d9 100644 (file)
@@ -196,7 +196,7 @@ subscription_create_from_mux(struct profile_chain *prch,
 
 th_subscription_t *subscription_create(struct profile_chain *prch,
                                        int weight, const char *name,
-                                      int flags, st_callback_t *cb,
+                                      int flags, streaming_ops_t *ops,
                                       const char *hostname,
                                       const char *username,
                                       const char *client);
index 5212d93158e2dc2d7312fe502e7b1b072d3e7604..60121c24dbcc22e2f738f4b7dd5f12c2b8da6d2a 100644 (file)
@@ -357,6 +357,19 @@ _exit:
   }
 }
 
+static htsmsg_t *
+timeshift_input_info(void *opaque, htsmsg_t *list)
+{
+  htsmsg_add_str(list, NULL, "wtimeshift input");
+  return list;
+}
+
+static streaming_ops_t timeshift_input_ops = {
+  .st_cb   = timeshift_input,
+  .st_info = timeshift_input_info
+};
+
+
 /**
  *
  */
@@ -442,7 +455,7 @@ streaming_target_t *timeshift_create
 
   /* Initialise input */
   streaming_queue_init(&ts->wr_queue, 0, 0);
-  streaming_target_init(&ts->input, timeshift_input, ts, 0);
+  streaming_target_init(&ts->input, &timeshift_input_ops, ts, 0);
   tvhthread_create(&ts->wr_thread, NULL, timeshift_writer, ts, "tshift-wr");
   tvhthread_create(&ts->rd_thread, NULL, timeshift_reader, ts, "tshift-rd");
 
index b3422087452b223db47fa350ede6337180faffba..534c017bfe94efab8905b3719bc03b52b9f03f48 100644 (file)
@@ -580,13 +580,18 @@ typedef struct streaming_message {
  * A streaming target receives data.
  */
 
-typedef void (st_callback_t)(void *opauqe, streaming_message_t *sm);
+typedef void (st_callback_t)(void *opaque, streaming_message_t *sm);
+
+typedef struct {
+  st_callback_t *st_cb;
+  htsmsg_t *(*st_info)(void *opaque, htsmsg_t *list);
+} streaming_ops_t;
 
 typedef struct streaming_target {
   LIST_ENTRY(streaming_target) st_link;
   streaming_pad_t *st_pad;               /* Source we are linked to */
 
 st_callback_t *st_cb;
streaming_ops_t st_ops;
   void *st_opaque;
   int st_reject_filter;
 } streaming_target_t;