]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
iptv: split input processing among multiple threads, issue #4925
authorJaroslav Kysela <perex@perex.cz>
Tue, 20 Feb 2018 14:12:05 +0000 (15:12 +0100)
committerJaroslav Kysela <perex@perex.cz>
Tue, 20 Feb 2018 14:51:54 +0000 (15:51 +0100)
19 files changed:
src/config.c
src/config.h
src/input/mpegts.h
src/input/mpegts/iptv/iptv.c
src/input/mpegts/iptv/iptv_file.c
src/input/mpegts/iptv/iptv_http.c
src/input/mpegts/iptv/iptv_libav.c
src/input/mpegts/iptv/iptv_mux.c
src/input/mpegts/iptv/iptv_pipe.c
src/input/mpegts/iptv/iptv_private.h
src/input/mpegts/iptv/iptv_rtsp.c
src/input/mpegts/iptv/iptv_udp.c
src/input/mpegts/linuxdvb/linuxdvb_frontend.c
src/input/mpegts/mpegts_input.c
src/input/mpegts/mpegts_mux.c
src/input/mpegts/mpegts_mux_dvb.c
src/input/mpegts/satip/satip_frontend.c
src/input/mpegts/tsfile/tsfile_input.c
src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c

index b66f5a329a4c5c0b78ca4ac70591110b6b858e8c..1105d80b6b860d9e8564b58d202c73a24ae06607 100644 (file)
@@ -1701,6 +1701,7 @@ config_boot ( const char *path, gid_t gid, uid_t uid )
   config_scanfile_ok = 0;
   config.theme_ui = strdup("blue");
   config.chname_num = 1;
+  config.iptv_tpool_count = 2;
 
   idclass_register(&config_class);
 
@@ -2417,6 +2418,15 @@ const idclass_t config_class = {
       .opts   = PO_EXPERT,
       .group  = 6,
     },
+    {
+      .type   = PT_BOOL,
+      .id     = "iptv_tpool",
+      .name   = N_("IPTV threads"),
+      .desc   = N_("Set the number of threads for IPTV to split load "
+                   "across more CPUs."),
+      .off    = offsetof(config_t, iptv_tpool_count),
+      .group  = 7
+    },
     {
       .type   = PT_INT,
       .id     = "dscp",
index b67f92a64ff2503e0b6a8e793ea318cbe35f6bed..02a2346b417ead6ee250c073169d400bede1cccc 100644 (file)
@@ -65,6 +65,7 @@ typedef struct config {
   int epg_compress;
   uint32_t epg_cut_window;
   uint32_t epg_update_window;
+  int iptv_tpool_count;
 } config_t;
 
 extern const idclass_t config_class;
index 573b675133c3b33e4f456c6a199ff7a611ffabc4..9a481f8e57d4681da61d0d61491f4b1ccc406769 100644 (file)
@@ -992,7 +992,7 @@ void mpegts_mux_update_pids ( mpegts_mux_t *mm );
 int mpegts_mux_compare ( mpegts_mux_t *a, mpegts_mux_t *b );
 
 void mpegts_input_recv_packets
-  (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
+  (mpegts_mux_instance_t *mmi, sbuf_t *sb,
    int flags, mpegts_pcr_t *pcr);
 
 void mpegts_input_postdemux
index 97c82b3293dd4f155518dc20414ba44a8836520b..5d7666c201e51788c08e12fec085faa13cd59d1c 100644 (file)
@@ -24,6 +24,7 @@
 #include "htsstr.h"
 #include "channels.h"
 #include "packet.h"
+#include "config.h"
 
 #include <sys/socket.h>
 #include <sys/types.h>
  * IPTV state
  * *************************************************************************/
 
-iptv_input_t   *iptv_input;
-tvhpoll_t      *iptv_poll;
-pthread_t       iptv_thread;
 pthread_mutex_t iptv_lock;
 
+typedef struct iptv_thread_pool {
+  TAILQ_ENTRY(iptv_thread_pool) link;
+  pthread_t thread;
+  iptv_input_t *input;
+  tvhpoll_t *poll;
+  uint32_t streams;
+} iptv_thread_pool_t;
+
+TAILQ_HEAD(, iptv_thread_pool) iptv_tpool;
+int iptv_tpool_count = 0;
+iptv_thread_pool_t *iptv_tpool_last = NULL;
+gtimer_t iptv_tpool_manage_timer;
+
+static void iptv_input_thread_manage(int count);
+
+static inline int iptv_tpool_safe_count(void)
+{
+  return MINMAX(config.iptv_tpool_count, 1, 128);
+}
+
 /* **************************************************************************
  * IPTV handlers
  * *************************************************************************/
@@ -151,6 +169,20 @@ iptv_input_is_free ( mpegts_input_t *mi, mpegts_mux_t *mm,
   return NULL;
 }
 
+static int
+iptv_input_thread_balance(iptv_input_t *mi)
+{
+  iptv_thread_pool_t *pool, *apool = mi->mi_tpool;
+
+  /*
+   * select input with the smallest count of active threads
+   */
+  TAILQ_FOREACH(pool, &iptv_tpool, link)
+    if (pool->streams < apool->streams)
+      return 1;
+  return 0;
+}
+
 static int
 iptv_input_is_enabled
   ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags, int weight )
@@ -164,6 +196,10 @@ iptv_input_is_enabled
     tvhtrace(LS_IPTV_SUB, "enabled[%p]: generic %d", mm, r);
     return r;
   }
+  if (iptv_input_thread_balance((iptv_input_t *)mi)) {
+    tvhtrace(LS_IPTV_SUB, "enabled[%p]: balance", mm);
+    return MI_IS_ENABLED_RETRY;
+  }
   mmi = iptv_input_is_free(mi, mm, &conf, weight, NULL);
   tvhtrace(LS_IPTV_SUB, "enabled[%p]: free %p", mm, mmi);
   return mmi == NULL ? MI_IS_ENABLED_OK : MI_IS_ENABLED_RETRY;
@@ -279,6 +315,7 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh
   int ret = SM_CODE_TUNING_FAILED;
   iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux;
   iptv_handler_t *ih;
+  iptv_thread_pool_t *pool = ((iptv_input_t *)mi)->mi_tpool;
   char buf[256], rawbuf[512], *raw = im->mm_iptv_url, *s;
   const char *scheme;
   url_t url;
@@ -342,35 +379,30 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh
   if (im->mm_iptv_url_raw) {
     im->mm_active = mmi; // Note: must set here else mux_started call
                          // will not realise we're ready to accept pid open calls
-    ret = ih->start(im, im->mm_iptv_url_raw, &url);
-    if (!ret)
+    ret = ih->start((iptv_input_t *)mi, im, im->mm_iptv_url_raw, &url);
+    if (!ret) {
       im->im_handler = ih;
-    else
+      pool->streams++;
+    } else {
       im->mm_active  = NULL;
+    }
   }
   pthread_mutex_unlock(&iptv_lock);
 
   urlreset(&url);
   free(s);
+
   return ret;
 }
 
-static void
-iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
+void
+iptv_input_close_fds ( iptv_input_t *mi, iptv_mux_t *im )
 {
-  iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux;
-
-  pthread_mutex_lock(&iptv_lock);
-
-  mtimer_disarm(&im->im_pause_timer);
-
-  /* Stop */
-  if (im->im_handler->stop)
-    im->im_handler->stop(im);
+  iptv_thread_pool_t *pool = mi->mi_tpool;
 
   /* Close file */
   if (im->mm_iptv_fd > 0) {
-    tvhpoll_rem1(iptv_poll, im->mm_iptv_fd);
+    tvhpoll_rem1(pool->poll, im->mm_iptv_fd);
     udp_close(im->mm_iptv_connection);
     im->mm_iptv_connection = NULL;
     im->mm_iptv_fd = -1;
@@ -378,11 +410,29 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
 
   /* Close file2 */
   if (im->mm_iptv_fd2 > 0) {
-    tvhpoll_rem1(iptv_poll, im->mm_iptv_fd2);
+    tvhpoll_rem1(pool->poll, im->mm_iptv_fd2);
     udp_close(im->mm_iptv_connection2);
     im->mm_iptv_connection2 = NULL;
     im->mm_iptv_fd2 = -1;
   }
+}
+
+static void
+iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
+{
+  iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux;
+  iptv_thread_pool_t *pool = ((iptv_input_t *)mi)->mi_tpool;
+  uint32_t u32;
+
+  pthread_mutex_lock(&iptv_lock);
+
+  mtimer_disarm(&im->im_pause_timer);
+
+  /* Stop */
+  if (im->im_handler->stop)
+    im->im_handler->stop((iptv_input_t *)mi, im);
+
+  iptv_input_close_fds((iptv_input_t *)mi, im);
 
   /* Free memory */
   sbuf_free(&im->mm_iptv_buffer);
@@ -390,7 +440,12 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
   /* Clear bw limit */
   ((iptv_network_t *)im->mm_network)->in_bw_limited = 0;
 
+  u32 = --pool->streams;
+
   pthread_mutex_unlock(&iptv_lock);
+
+  if (u32 == 0)
+    iptv_input_thread_manage(iptv_tpool_safe_count());
 }
 
 static void
@@ -428,14 +483,18 @@ void
 iptv_input_unpause ( void *aux )
 {
   iptv_mux_t *im = aux;
+  iptv_input_t *mi;
   int pause;
   pthread_mutex_lock(&iptv_lock);
-  if (iptv_input_pause_check(im)) {
-    pause = 1;
-  } else {
-    tvhtrace(LS_IPTV_PCR, "unpause timer callback");
-    im->im_handler->pause(im, 0);
-    pause = 0;
+  pause = 0;
+  if (im->mm_active) {
+    mi = (iptv_input_t *)im->mm_active->mmi_input;
+    if (iptv_input_pause_check(im)) {
+      pause = 1;
+    } else {
+      tvhtrace(LS_IPTV_PCR, "unpause timer callback");
+      im->im_handler->pause(mi, im, 0);
+    }
   }
   pthread_mutex_unlock(&iptv_lock);
   if (pause)
@@ -445,13 +504,15 @@ iptv_input_unpause ( void *aux )
 static void *
 iptv_input_thread ( void *aux )
 {
+  iptv_thread_pool_t *pool = aux;
   int nfds, r;
   ssize_t n;
   iptv_mux_t *im;
+  iptv_input_t *mi;
   tvhpoll_event_t ev;
 
   while ( tvheadend_is_running() ) {
-    nfds = tvhpoll_wait(iptv_poll, &ev, 1, -1);
+    nfds = tvhpoll_wait(pool->poll, &ev, 1, -1);
     if ( nfds < 0 ) {
       if (tvheadend_is_running() && !ERRNO_AGAIN(errno)) {
         tvherror(LS_IPTV, "poll() error %s, sleeping 1 second",
@@ -469,15 +530,16 @@ iptv_input_thread ( void *aux )
 
     /* Only when active */
     if (im->mm_active) {
+      mi = (iptv_input_t *)im->mm_active->mmi_input;
       /* Get data */
-      if ((n = im->im_handler->read(im)) < 0) {
+      if ((n = im->im_handler->read(mi, im)) < 0) {
         tvherror(LS_IPTV, "read() error %s", strerror(errno));
-        im->im_handler->stop(im);
+        im->im_handler->stop(mi, im);
         break;
       }
       r = iptv_input_recv_packets(im, n);
       if (r == 1)
-        im->im_handler->pause(im, 1);
+        im->im_handler->pause(mi, im, 1);
     }
 
     pthread_mutex_unlock(&iptv_lock);
@@ -493,12 +555,14 @@ iptv_input_thread ( void *aux )
 }
 
 void
-iptv_input_pause_handler ( iptv_mux_t *im, int pause )
+iptv_input_pause_handler ( iptv_input_t *mi, iptv_mux_t *im, int pause )
 {
+  iptv_thread_pool_t *tpool = mi->mi_tpool;
+
   if (pause)
-    tvhpoll_rem1(iptv_poll, im->mm_iptv_fd);
+    tvhpoll_rem1(tpool->poll, im->mm_iptv_fd);
   else
-    tvhpoll_add1(iptv_poll, im->mm_iptv_fd, TVHPOLL_IN, im);
+    tvhpoll_add1(tpool->poll, im->mm_iptv_fd, TVHPOLL_IN, im);
 }
 
 void
@@ -508,9 +572,8 @@ iptv_input_recv_flush ( iptv_mux_t *im )
 
   if (mmi == NULL)
     return;
-  mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
-                            &im->mm_iptv_buffer, MPEGTS_DATA_CC_RESTART,
-                            NULL);
+  mpegts_input_recv_packets(mmi, &im->mm_iptv_buffer,
+                            MPEGTS_DATA_CC_RESTART, NULL);
 }
 
 int
@@ -547,8 +610,7 @@ iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
       tvhtrace(LS_IPTV_PCR, "pcr: paused");
       return 1;
     }
-    mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
-                              &im->mm_iptv_buffer,
+    mpegts_input_recv_packets(mmi, &im->mm_iptv_buffer,
                               in->in_remove_scrambled_bits ?
                                 MPEGTS_DATA_REMOVE_SCRAMBLED : 0, &pcr);
     if (pcr.pcr_first != PTS_UNSET && pcr.pcr_last != PTS_UNSET) {
@@ -578,14 +640,15 @@ iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
   return 0;
 }
 
-
 int
-iptv_input_fd_started ( iptv_mux_t *im )
+iptv_input_fd_started ( iptv_input_t *mi, iptv_mux_t *im )
 {
+  iptv_thread_pool_t *tpool = mi->mi_tpool;
+
   /* Setup poll */
   if (im->mm_iptv_fd > 0) {
     /* Error? */
-    if (tvhpoll_add1(iptv_poll, im->mm_iptv_fd, TVHPOLL_IN, im) < 0) {
+    if (tvhpoll_add1(tpool->poll, im->mm_iptv_fd, TVHPOLL_IN, im) < 0) {
       tvherror(LS_IPTV, "%s - failed to add to poll q", im->mm_nicename);
       close(im->mm_iptv_fd);
       im->mm_iptv_fd = -1;
@@ -596,7 +659,7 @@ iptv_input_fd_started ( iptv_mux_t *im )
   /* Setup poll2 */
   if (im->mm_iptv_fd2 > 0) {
     /* Error? */
-    if (tvhpoll_add1(iptv_poll, im->mm_iptv_fd2, TVHPOLL_IN, im) < 0) {
+    if (tvhpoll_add1(tpool->poll, im->mm_iptv_fd2, TVHPOLL_IN, im) < 0) {
       tvherror(LS_IPTV, "%s - failed to add to poll q (2)", im->mm_nicename);
       close(im->mm_iptv_fd2);
       im->mm_iptv_fd2 = -1;
@@ -607,7 +670,7 @@ iptv_input_fd_started ( iptv_mux_t *im )
 }
 
 void
-iptv_input_mux_started ( iptv_mux_t *im )
+iptv_input_mux_started ( iptv_input_t *mi, iptv_mux_t *im )
 {
   /* Allocate input buffer */
   sbuf_reset_and_alloc(&im->mm_iptv_buffer, IPTV_BUF_SIZE);
@@ -615,13 +678,13 @@ iptv_input_mux_started ( iptv_mux_t *im )
   im->im_pcr = PTS_UNSET;
   im->im_pcr_pid = MPEGTS_PID_NONE;
 
-  if (iptv_input_fd_started(im))
+  if (iptv_input_fd_started(mi, im))
     return;
 
   /* Install table handlers */
   mpegts_mux_t *mm = (mpegts_mux_t*)im;
   if (mm->mm_active)
-    psi_tables_install(mm->mm_active->mmi_input, mm,
+    psi_tables_install((mpegts_input_t *)mi, mm,
                        im->mm_iptv_atsc ? DVB_SYS_ATSC_ALL : DVB_SYS_DVBT);
 }
 
@@ -998,9 +1061,6 @@ iptv_network_create0
     in->mn_skipinitscan = 1;
   }
 
-  /* Link */
-  mpegts_input_add_network((mpegts_input_t*)iptv_input, (mpegts_network_t*)in);
-
   /* Load muxes */
   if ((c = hts_settings_load_r(1, "input/iptv/networks/%s/muxes",
                                 idnode_uuid_as_str(&in->mn_id, ubuf)))) {
@@ -1092,8 +1152,73 @@ iptv_input_wizard_set( tvh_input_t *ti, htsmsg_t *conf, const char *lang )
     mpegts_network_wizard_create(ntype, NULL, lang);
 }
 
+static iptv_input_t *
+iptv_create_input ( void *tpool )
+{
+  iptv_input_t *input = calloc(1, sizeof(iptv_input_t));
+  mpegts_network_t *mn;
+
+  /* Init Input */
+  mpegts_input_create0((mpegts_input_t*)input,
+                       &iptv_input_class, NULL, NULL);
+  input->ti_wizard_get     = iptv_input_wizard_get;
+  input->ti_wizard_set     = iptv_input_wizard_set;
+  input->mi_warm_mux       = iptv_input_warm_mux;
+  input->mi_start_mux      = iptv_input_start_mux;
+  input->mi_stop_mux       = iptv_input_stop_mux;
+  input->mi_is_enabled     = iptv_input_is_enabled;
+  input->mi_get_weight     = iptv_input_get_weight;
+  input->mi_get_grace      = iptv_input_get_grace;
+  input->mi_get_priority   = iptv_input_get_priority;
+  input->mi_display_name   = iptv_input_display_name;
+  input->mi_enabled        = 1;
+
+  input->mi_tpool          = tpool;
+
+  /* Link */
+  LIST_FOREACH(mn, &mpegts_network_all, mn_global_link)
+    if (idnode_is_instance(&mn->mn_id, &iptv_network_class))
+      mpegts_input_add_network((mpegts_input_t *)input, mn);
+
+  return input;
+}
+
+static void
+iptv_input_thread_manage(int count)
+{
+  iptv_thread_pool_t *pool;
+
+  while (iptv_tpool_count < count) {
+    pool = calloc(1, sizeof(*pool));
+    pool->poll = tvhpoll_create(10);
+    pool->input = iptv_create_input(pool);
+    tvhthread_create(&pool->thread, NULL, iptv_input_thread, pool, "iptv");
+    TAILQ_INSERT_TAIL(&iptv_tpool, pool, link);
+    iptv_tpool_count++;
+  }
+  while (iptv_tpool_count > count) {
+    TAILQ_FOREACH(pool, &iptv_tpool, link)
+      if (pool->streams == 0) {
+        pthread_kill(pool->thread, SIGTERM);
+        pthread_join(pool->thread, NULL);
+        TAILQ_REMOVE(&iptv_tpool, pool, link);
+        mpegts_input_stop_all((mpegts_input_t*)pool->input);
+        mpegts_input_delete((mpegts_input_t *)pool->input, 0);
+        tvhpoll_destroy(pool->poll);
+        free(pool);
+        iptv_tpool_count--;
+        break;
+      }
+    if (pool == NULL)
+      break;
+  }
+}
+
 void iptv_init ( void )
 {
+  TAILQ_INIT(&iptv_tpool);
+  pthread_mutex_init(&iptv_lock, NULL);
+
   /* Register handlers */
   iptv_http_init();
   iptv_udp_init();
@@ -1104,44 +1229,23 @@ void iptv_init ( void )
   iptv_libav_init();
 #endif
 
-  iptv_input = calloc(1, sizeof(iptv_input_t));
-
-  /* Init Input */
-  mpegts_input_create0((mpegts_input_t*)iptv_input,
-                       &iptv_input_class, NULL, NULL);
-  iptv_input->ti_wizard_get     = iptv_input_wizard_get;
-  iptv_input->ti_wizard_set     = iptv_input_wizard_set;
-  iptv_input->mi_warm_mux       = iptv_input_warm_mux;
-  iptv_input->mi_start_mux      = iptv_input_start_mux;
-  iptv_input->mi_stop_mux       = iptv_input_stop_mux;
-  iptv_input->mi_is_enabled     = iptv_input_is_enabled;
-  iptv_input->mi_get_weight     = iptv_input_get_weight;
-  iptv_input->mi_get_grace      = iptv_input_get_grace;
-  iptv_input->mi_get_priority   = iptv_input_get_priority;
-  iptv_input->mi_display_name   = iptv_input_display_name;
-  iptv_input->mi_enabled        = 1;
-
   /* Init Network */
   iptv_network_init();
 
-  /* Setup TS thread */
-  iptv_poll = tvhpoll_create(10);
-  pthread_mutex_init(&iptv_lock, NULL);
-  tvhthread_create(&iptv_thread, NULL, iptv_input_thread, NULL, "iptv");
+  /* Threads init */
+  iptv_input_thread_manage(iptv_tpool_safe_count());
+  tvhinfo(LS_IPTV, "Using %d input thread(s)", iptv_tpool_count);
 }
 
 void iptv_done ( void )
 {
-  pthread_kill(iptv_thread, SIGTERM);
-  pthread_join(iptv_thread, NULL);
-  tvhpoll_destroy(iptv_poll);
   pthread_mutex_lock(&global_lock);
+  iptv_input_thread_manage(0);
+  assert(TAILQ_EMPTY(&iptv_tpool));
   mpegts_network_unregister_builder(&iptv_auto_network_class);
   mpegts_network_unregister_builder(&iptv_network_class);
   mpegts_network_class_delete(&iptv_auto_network_class, 0);
   mpegts_network_class_delete(&iptv_network_class, 0);
-  mpegts_input_stop_all((mpegts_input_t*)iptv_input);
-  mpegts_input_delete((mpegts_input_t *)iptv_input, 0);
   pthread_mutex_unlock(&global_lock);
 }
 
index 94a5a4b5e0eb69a0de45e2fcf0c47284405d48b2..1d86ac6d7b68a9288e58fc2d766cc4b38fe928ab 100644 (file)
@@ -93,7 +93,8 @@ iptv_file_thread ( void *aux )
  * Open file
  */
 static int
-iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url )
+iptv_file_start
+  ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url )
 {
   file_priv_t *fp;
   int fd = tvh_open(raw + 7, O_RDONLY | O_NONBLOCK, 0);
@@ -107,14 +108,14 @@ iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url )
   fp->fd = fd;
   tvh_cond_init(&fp->cond);
   im->im_data = fp;
-  iptv_input_mux_started(im);
+  iptv_input_mux_started(mi, im);
   tvhthread_create(&fp->tid, NULL, iptv_file_thread, im, "iptvfile");
   return 0;
 }
 
 static void
 iptv_file_stop
-  ( iptv_mux_t *im )
+  ( iptv_input_t *mi, iptv_mux_t *im )
 {
   file_priv_t *fp = im->im_data;
   int rd = fp->fd;
index a94a690fd4f169572429cd61934e220faf20f82c..8461ec5d861a12c94a036ad562cfbb7dcf1b6c6f 100644 (file)
@@ -29,6 +29,7 @@
 #endif
 
 typedef struct http_priv {
+  iptv_input_t  *mi;
   iptv_mux_t    *im;
   http_client_t *hc;
   uint8_t        shutdown;
@@ -249,7 +250,7 @@ iptv_http_header ( http_client_t *hc )
   hp->off = 0;
   if (iptv_http_safe_global_lock(hp)) {
     if (!hp->started) {
-      iptv_input_mux_started(hp->im);
+      iptv_input_mux_started(hp->mi, hp->im);
     } else {
       iptv_input_recv_flush(hp->im);
     }
@@ -511,13 +512,14 @@ iptv_http_create_header
  */
 static int
 iptv_http_start
-  ( iptv_mux_t *im, const char *raw, const url_t *u )
+  ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *u )
 {
   http_priv_t *hp;
   http_client_t *hc;
   int r;
 
   hp = calloc(1, sizeof(*hp));
+  hp->mi = mi;
   hp->im = im;
   if (!(hc = http_client_connect(hp, HTTP_VERSION_1_1, u->scheme,
                                  u->host, u->port, NULL))) {
@@ -552,7 +554,7 @@ iptv_http_start
  */
 static void
 iptv_http_stop
-  ( iptv_mux_t *im )
+  ( iptv_input_t *mi, iptv_mux_t *im )
 {
   http_priv_t *hp = im->im_data;
 
@@ -579,7 +581,7 @@ iptv_http_stop
  */
 static void
 iptv_http_pause
-  ( iptv_mux_t *im, int pause )
+  ( iptv_input_t *mi, iptv_mux_t *im, int pause )
 {
   http_priv_t *hp = im->im_data;
 
index 35381e89d5c763c3b7c83010921b3aca69402fcd..e7a058a029e9272ed34cea26b08fb5e3a981bb13 100644 (file)
@@ -174,7 +174,8 @@ fail:
  * Start new thread
  */
 static int
-iptv_libav_start ( iptv_mux_t *im, const char *raw, const url_t *url )
+iptv_libav_start
+  ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url )
 {
   iptv_libav_priv_t *la = calloc(1, sizeof(*la));
 
@@ -187,19 +188,19 @@ iptv_libav_start ( iptv_mux_t *im, const char *raw, const url_t *url )
   la->mux = im;
   tvh_pipe(O_NONBLOCK, &la->pipe);
   im->mm_iptv_fd = la->pipe.rd;
-  iptv_input_fd_started(im);
+  iptv_input_fd_started(mi, im);
   atomic_set(&la->running, 1);
   atomic_set(&la->pause, 0);
   sbuf_init(&la->sbuf);
   tvhthread_create(&la->thread, NULL, iptv_libav_thread, la, "libavinput");
   if (raw[0])
-    iptv_input_mux_started(im);
+    iptv_input_mux_started(mi, im);
   return 0;
 }
 
 static void
 iptv_libav_stop
-  ( iptv_mux_t *im )
+  ( iptv_input_t *mi, iptv_mux_t *im )
 {
   iptv_libav_priv_t *la = im->im_opaque;
 
@@ -215,7 +216,7 @@ iptv_libav_stop
 }
 
 static ssize_t
-iptv_libav_read ( iptv_mux_t *im )
+iptv_libav_read ( iptv_input_t *mi, iptv_mux_t *im )
 {
   iptv_libav_priv_t *la = im->im_opaque;
   char buf[8192];
@@ -233,7 +234,7 @@ iptv_libav_read ( iptv_mux_t *im )
 }
 
 static void
-iptv_libav_pause ( iptv_mux_t *im, int pause )
+iptv_libav_pause ( iptv_input_t *mi, iptv_mux_t *im, int pause )
 {
   iptv_libav_priv_t *la = im->im_opaque;
 
index c796a4738c8f5566c14d16098a96f410c19a9cbc..903dff11baa5d8b344166147ccb6af4ca20f955f 100644 (file)
@@ -25,7 +25,6 @@
  * Class
  */
 extern const idclass_t mpegts_mux_class;
-extern const idclass_t mpegts_mux_instance_class;
 
 static inline void
 iptv_url_set0 ( char **url, char **sane_url,
@@ -373,11 +372,6 @@ iptv_mux_create0 ( iptv_network_t *in, const char *uuid, htsmsg_t *conf )
 
   sbuf_init(&im->mm_iptv_buffer);
 
-  /* Create Instance */
-  (void)mpegts_mux_instance_create(mpegts_mux_instance, NULL,
-                                   (mpegts_input_t*)iptv_input,
-                                   (mpegts_mux_t*)im);
-
   /* Services */
   c2 = NULL;
   c = htsmsg_get_map(conf, "services");
index 6f4265dd788fa15139c14e5e64fcbea0b9cc2749..89f3a486c13d0a969208256eaa74dddf50082354 100644 (file)
@@ -32,7 +32,8 @@
  * Spawn task and create pipes
  */
 static int
-iptv_pipe_start ( iptv_mux_t *im, const char *raw, const url_t *url )
+iptv_pipe_start
+  ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url )
 {
   char **argv = NULL, **envp = NULL;
   const char *replace[] = { "${service_name}", im->mm_iptv_svcname ?: "", NULL };
@@ -65,7 +66,7 @@ iptv_pipe_start ( iptv_mux_t *im, const char *raw, const url_t *url )
   im->mm_iptv_respawn_last = mclk();
 
   if (url)
-    iptv_input_mux_started(im);
+    iptv_input_mux_started(mi, im);
   return 0;
 
 err:
@@ -78,20 +79,15 @@ err:
 
 static void
 iptv_pipe_stop
-  ( iptv_mux_t *im )
+  ( iptv_input_t *mi, iptv_mux_t *im )
 {
-  int rd = im->mm_iptv_fd;
   pid_t pid = (intptr_t)im->im_data;
   spawn_kill(pid, tvh_kill_to_sig(im->mm_iptv_kill), im->mm_iptv_kill_timeout);
-  if (rd > 0) {
-    tvhpoll_rem1(iptv_poll, rd);
-    close(rd);
-  }
-  im->mm_iptv_fd = -1;
+  iptv_input_close_fds(mi, im);
 }
 
 static ssize_t
-iptv_pipe_read ( iptv_mux_t *im )
+iptv_pipe_read ( iptv_input_t *mi, iptv_mux_t *im )
 {
   int r, rd = im->mm_iptv_fd;
   ssize_t res = 0;
@@ -107,12 +103,10 @@ iptv_pipe_read ( iptv_mux_t *im )
         continue;
     }
     if (r <= 0) {
-      tvhpoll_rem1(iptv_poll, rd);
-      close(rd);
+      iptv_input_close_fds(mi, im);
       pid = (intptr_t)im->im_data;
-      spawn_kill(pid, tvh_kill_to_sig(im->mm_iptv_kill), im->mm_iptv_kill_timeout);
-      im->mm_iptv_fd = -1;
       im->im_data = NULL;
+      spawn_kill(pid, tvh_kill_to_sig(im->mm_iptv_kill), im->mm_iptv_kill_timeout);
       if (mclk() < im->mm_iptv_respawn_last + sec2mono(2)) {
         tvherror(LS_IPTV, "stdin pipe unexpectedly closed: %s",
                  r < 0 ? strerror(errno) : "No data");
@@ -122,10 +116,10 @@ iptv_pipe_read ( iptv_mux_t *im )
         pthread_mutex_lock(&global_lock);
         pthread_mutex_lock(&iptv_lock);
         if (im->mm_active) {
-          if (iptv_pipe_start(im, im->mm_iptv_url_raw, NULL)) {
+          if (iptv_pipe_start(mi, im, im->mm_iptv_url_raw, NULL)) {
             tvherror(LS_IPTV, "unable to respawn %s", im->mm_iptv_url_raw);
           } else {
-            iptv_input_fd_started(im);
+            iptv_input_fd_started(mi, im);
             im->mm_iptv_respawn_last = mclk();
           }
         }
index e7c000ca21f0ee7d68fb8646d055436409327063..84109eb08de49d6eb1f67ef7949e82238f69f497 100644 (file)
@@ -42,10 +42,10 @@ struct iptv_handler
 
   uint32_t buffer_limit;
 
-  int     (*start) ( iptv_mux_t *im, const char *raw, const url_t *url );
-  void    (*stop)  ( iptv_mux_t *im );
-  ssize_t (*read)  ( iptv_mux_t *im );
-  void    (*pause) ( iptv_mux_t *im, int pause );
+  int     (*start) ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url );
+  void    (*stop)  ( iptv_input_t *mi, iptv_mux_t *im );
+  ssize_t (*read)  ( iptv_input_t *mi, iptv_mux_t *im );
+  void    (*pause) ( iptv_input_t *mi, iptv_mux_t *im, int pause );
   
   RB_ENTRY(iptv_handler) link;
 };
@@ -55,13 +55,16 @@ void iptv_handler_register ( iptv_handler_t *ih, int num );
 struct iptv_input
 {
   mpegts_input_t;
+
+  void *mi_tpool;
 };
 
-int  iptv_input_fd_started ( iptv_mux_t *im );
-void iptv_input_mux_started ( iptv_mux_t *im );
+int  iptv_input_fd_started ( iptv_input_t *mi, iptv_mux_t *im );
+void iptv_input_close_fds ( iptv_input_t *mi, iptv_mux_t *im );
+void iptv_input_mux_started ( iptv_input_t *mi, iptv_mux_t *im );
 int  iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len );
 void iptv_input_recv_flush ( iptv_mux_t *im );
-void iptv_input_pause_handler ( iptv_mux_t *im, int pause );
+void iptv_input_pause_handler ( iptv_input_t *mi, iptv_mux_t *im, int pause );
 
 struct iptv_network
 {
@@ -176,11 +179,9 @@ extern const idclass_t iptv_network_class;
 extern const idclass_t iptv_auto_network_class;
 extern const idclass_t iptv_mux_class;
 
-extern iptv_input_t   *iptv_input;
 extern iptv_network_t *iptv_network;
 
 extern pthread_mutex_t iptv_lock;
-extern tvhpoll_t      *iptv_poll;
 
 int iptv_url_set ( char **url, char **sane_url, const char *str, int allow_file, int allow_pipe );
 
index ac0f550e20b860c37aaad32e81dc46aee98236f7..e661cc0089bab52ce68194bae20c2d690cb23c3b 100644 (file)
@@ -112,7 +112,8 @@ iptv_rtsp_header ( http_client_t *hc )
     }
     hc->hc_cmd = HTTP_CMD_NONE;
     pthread_mutex_lock(&global_lock);
-    iptv_input_mux_started(hc->hc_aux);
+    if (im->mm_active)
+      iptv_input_mux_started((iptv_input_t *)im->mm_active->mmi_input, im);
     mtimer_arm_rel(&rp->alive_timer, iptv_rtsp_alive_cb, im,
                    sec2mono(MAX(1, (hc->hc_rtp_timeout / 2) - 1)));
     pthread_mutex_unlock(&global_lock);
@@ -147,7 +148,7 @@ iptv_rtsp_data
  */
 static int
 iptv_rtsp_start
-  ( iptv_mux_t *im, const char *raw, const url_t *u )
+  ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *u )
 {
   rtsp_priv_t *rp;
   http_client_t *hc;
@@ -209,7 +210,7 @@ iptv_rtsp_start
  */
 static void
 iptv_rtsp_stop
-  ( iptv_mux_t *im )
+  ( iptv_input_t *mi, iptv_mux_t *im )
 {
   rtsp_priv_t *rp = im->im_data;
   int play;
@@ -267,7 +268,7 @@ iptv_rtp_header_callback ( iptv_mux_t *im, uint8_t *rtp, int len )
  * Read data
  */
 static ssize_t
-iptv_rtsp_read ( iptv_mux_t *im )
+iptv_rtsp_read ( iptv_input_t *mi, iptv_mux_t *im )
 {
   rtsp_priv_t *rp = im->im_data;
   udp_multirecv_t *um = &rp->um;
index 0d55fbd6e98e735ea3b1fcb751375e9b125c0eba..cde65b6ed693822d6d1233b11044949a6680e77b 100644 (file)
@@ -32,7 +32,8 @@
  * Connect UDP/RTP
  */
 static int
-iptv_udp_start ( iptv_mux_t *im, const char *raw, const url_t *url )
+iptv_udp_start
+  ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url )
 {
   udp_connection_t *conn;
   udp_multirecv_t *um;
@@ -54,13 +55,13 @@ iptv_udp_start ( iptv_mux_t *im, const char *raw, const url_t *url )
   udp_multirecv_init(um, IPTV_PKTS, IPTV_PKT_PAYLOAD);
   im->im_data = um;
 
-  iptv_input_mux_started(im);
+  iptv_input_mux_started(mi, im);
   return 0;
 }
 
 static void
 iptv_udp_stop
-  ( iptv_mux_t *im )
+  ( iptv_input_t *mi, iptv_mux_t *im )
 {
   udp_multirecv_t *um = im->im_data;
 
@@ -72,7 +73,7 @@ iptv_udp_stop
 }
 
 static ssize_t
-iptv_udp_read ( iptv_mux_t *im )
+iptv_udp_read ( iptv_input_t *mi, iptv_mux_t *im )
 {
   int i, n;
   struct iovec *iovec;
@@ -178,7 +179,7 @@ iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um,
 }
 
 static ssize_t
-iptv_udp_rtp_read ( iptv_mux_t *im )
+iptv_udp_rtp_read ( iptv_input_t *mi, iptv_mux_t *im )
 {
   udp_multirecv_t *um = im->im_data;
 
index 566e6497294cf79098459626f811fae3634bfb56..e7b25093f568e2128c409715dfe0b770f6bcf706 100644 (file)
@@ -1454,7 +1454,7 @@ linuxdvb_frontend_input_thread ( void *aux )
     }
     
     /* Process */
-    mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, 0, NULL);
+    mpegts_input_recv_packets(mmi, &sb, 0, NULL);
   }
 
   sbuf_free(&sb);
index dd7c236294aa3aa62e136bc51a6406e63a765967..cb7df25b10564b7d3f683525a516465510382210 100644 (file)
@@ -1126,9 +1126,10 @@ ts_sync_count ( const uint8_t *tsb, int len )
 
 void
 mpegts_input_recv_packets
-  ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
+  ( mpegts_mux_instance_t *mmi, sbuf_t *sb,
     int flags, mpegts_pcr_t *pcr )
 {
+  mpegts_input_t *mi = mmi->mmi_input;
   int len, len2, off;
   mpegts_packet_t *mp;
   uint8_t *tsb;
index 64e8f60e9347caff05f756696963245264cc5610..b9b4971154b4545b73e3aff5f5f47d951b1b1d55 100644 (file)
@@ -803,6 +803,12 @@ mpegts_mux_is_epg ( mpegts_mux_t *mm )
 static void
 mpegts_mux_create_instances ( mpegts_mux_t *mm )
 {
+  mpegts_network_link_t *mnl;
+  LIST_FOREACH(mnl, &mm->mm_network->mn_inputs, mnl_mn_link) {
+    mpegts_input_t *mi = mnl->mnl_input;
+    if (mi->mi_is_enabled(mi, mm, 0, -1) != MI_IS_ENABLED_NEVER)
+      mi->mi_create_mux_instance(mi, mm);
+  }
 }
 
 static int
index e4f362fdc3b95ef359313f5e4e4dee1329b45fc9..7f8260658e05d48e71207c1fe1cae3e3a0f17570 100644 (file)
@@ -1071,17 +1071,6 @@ dvb_mux_display_name ( mpegts_mux_t *mm, char *buf, size_t len )
     snprintf(buf, len, "%d%s", freq, extra);
 }
 
-static void
-dvb_mux_create_instances ( mpegts_mux_t *mm )
-{
-  mpegts_network_link_t *mnl;
-  LIST_FOREACH(mnl, &mm->mm_network->mn_inputs, mnl_mn_link) {
-    mpegts_input_t *mi = mnl->mnl_input;
-    if (mi->mi_is_enabled(mi, mm, 0, -1) != MI_IS_ENABLED_NEVER)
-      mi->mi_create_mux_instance(mi, mm);
-  }
-}
-
 static void
 dvb_mux_delete ( mpegts_mux_t *mm, int delconf )
 {
@@ -1177,7 +1166,6 @@ dvb_mux_create0
   lm->mm_delete           = dvb_mux_delete;
   lm->mm_display_name     = dvb_mux_display_name;
   lm->mm_config_save      = dvb_mux_config_save;
-  lm->mm_create_instances = dvb_mux_create_instances;
 
   mpegts_mux_nice_name(mm, buf, sizeof(buf));
   free(mm->mm_nicename);
index 43bda829be587c398500732ac3b37dd8699150ee..706ae0941683f38783763bcd8a1bad631d912213 100644 (file)
@@ -1491,8 +1491,7 @@ satip_frontend_rtp_data_received( http_client_t *hc, void *buf, size_t len )
       if (lfe->sf_req == lfe->sf_req_thread) {
         mmi = lfe->sf_req->sf_mmi;
         atomic_add(&mmi->tii_stats.unc, unc);
-        mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi,
-                                  &lfe->sf_sbuf, 0, NULL);
+        mpegts_input_recv_packets(mmi, &lfe->sf_sbuf, 0, NULL);
       }
       pthread_mutex_unlock(&lfe->sf_dvr_lock);
       lfe->sf_last_data_tstamp = mclk();
@@ -2027,7 +2026,7 @@ new_tune:
     pthread_mutex_lock(&lfe->sf_dvr_lock);
     if (lfe->sf_req == lfe->sf_req_thread) {
       atomic_add(&mmi->tii_stats.unc, unc);
-      mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, sb, 0, NULL);
+      mpegts_input_recv_packets(mmi, sb, 0, NULL);
     } else
       fatal = 1;
     pthread_mutex_unlock(&lfe->sf_dvr_lock);
index d1b3f03d385d9221220cfe7372238081d7367ed2..5484831e4e7f0ddc57bf720bd14b6247ef5641f6 100644 (file)
@@ -131,7 +131,7 @@ tsfile_input_thread ( void *aux )
       pcr.pcr_first = PTS_UNSET;
       pcr.pcr_last  = PTS_UNSET;
       pcr.pcr_pid   = tmi->mmi_tsfile_pcr_pid;
-      mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf, 0, &pcr);
+      mpegts_input_recv_packets(mmi, &buf, 0, &pcr);
       if (pcr.pcr_pid)
         tmi->mmi_tsfile_pcr_pid = pcr.pcr_pid;
 
index d99be68d1744feca03852ed2826857b01b5279ef..32fda7eb69302f280b3b497c3f39e921cf89acf0 100644 (file)
@@ -201,7 +201,7 @@ tvhdhomerun_frontend_input_thread ( void *aux )
 
     //tvhdebug(LS_TVHDHOMERUN, "got r=%d (thats %d)", r, (r == 7*188));
 
-    mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, 0, NULL);
+    mpegts_input_recv_packets(mmi, &sb, 0, NULL);
   }
 
   tvhdebug(LS_TVHDHOMERUN, "setting target to none");