From: Jaroslav Kysela Date: Tue, 20 Feb 2018 14:12:05 +0000 (+0100) Subject: iptv: split input processing among multiple threads, issue #4925 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1824ca6ed3cdbffc2b5f98e9c5e59dd28b7036f6;p=thirdparty%2Ftvheadend.git iptv: split input processing among multiple threads, issue #4925 --- diff --git a/src/config.c b/src/config.c index b66f5a329..1105d80b6 100644 --- a/src/config.c +++ b/src/config.c @@ -1701,6 +1701,7 @@ config_boot ( const char *path, gid_t gid, uid_t uid ) config_scanfile_ok = 0; config.theme_ui = strdup("blue"); config.chname_num = 1; + config.iptv_tpool_count = 2; idclass_register(&config_class); @@ -2417,6 +2418,15 @@ const idclass_t config_class = { .opts = PO_EXPERT, .group = 6, }, + { + .type = PT_BOOL, + .id = "iptv_tpool", + .name = N_("IPTV threads"), + .desc = N_("Set the number of threads for IPTV to split load " + "across more CPUs."), + .off = offsetof(config_t, iptv_tpool_count), + .group = 7 + }, { .type = PT_INT, .id = "dscp", diff --git a/src/config.h b/src/config.h index b67f92a64..02a2346b4 100644 --- a/src/config.h +++ b/src/config.h @@ -65,6 +65,7 @@ typedef struct config { int epg_compress; uint32_t epg_cut_window; uint32_t epg_update_window; + int iptv_tpool_count; } config_t; extern const idclass_t config_class; diff --git a/src/input/mpegts.h b/src/input/mpegts.h index 573b67513..9a481f8e5 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -992,7 +992,7 @@ void mpegts_mux_update_pids ( mpegts_mux_t *mm ); int mpegts_mux_compare ( mpegts_mux_t *a, mpegts_mux_t *b ); void mpegts_input_recv_packets - (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, + (mpegts_mux_instance_t *mmi, sbuf_t *sb, int flags, mpegts_pcr_t *pcr); void mpegts_input_postdemux diff --git a/src/input/mpegts/iptv/iptv.c b/src/input/mpegts/iptv/iptv.c index 97c82b329..5d7666c20 100644 --- a/src/input/mpegts/iptv/iptv.c +++ b/src/input/mpegts/iptv/iptv.c @@ -24,6 +24,7 @@ #include "htsstr.h" #include "channels.h" #include "packet.h" +#include "config.h" #include #include @@ -40,11 +41,28 @@ * IPTV state * *************************************************************************/ -iptv_input_t *iptv_input; -tvhpoll_t *iptv_poll; -pthread_t iptv_thread; pthread_mutex_t iptv_lock; +typedef struct iptv_thread_pool { + TAILQ_ENTRY(iptv_thread_pool) link; + pthread_t thread; + iptv_input_t *input; + tvhpoll_t *poll; + uint32_t streams; +} iptv_thread_pool_t; + +TAILQ_HEAD(, iptv_thread_pool) iptv_tpool; +int iptv_tpool_count = 0; +iptv_thread_pool_t *iptv_tpool_last = NULL; +gtimer_t iptv_tpool_manage_timer; + +static void iptv_input_thread_manage(int count); + +static inline int iptv_tpool_safe_count(void) +{ + return MINMAX(config.iptv_tpool_count, 1, 128); +} + /* ************************************************************************** * IPTV handlers * *************************************************************************/ @@ -151,6 +169,20 @@ iptv_input_is_free ( mpegts_input_t *mi, mpegts_mux_t *mm, return NULL; } +static int +iptv_input_thread_balance(iptv_input_t *mi) +{ + iptv_thread_pool_t *pool, *apool = mi->mi_tpool; + + /* + * select input with the smallest count of active threads + */ + TAILQ_FOREACH(pool, &iptv_tpool, link) + if (pool->streams < apool->streams) + return 1; + return 0; +} + static int iptv_input_is_enabled ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags, int weight ) @@ -164,6 +196,10 @@ iptv_input_is_enabled tvhtrace(LS_IPTV_SUB, "enabled[%p]: generic %d", mm, r); return r; } + if (iptv_input_thread_balance((iptv_input_t *)mi)) { + tvhtrace(LS_IPTV_SUB, "enabled[%p]: balance", mm); + return MI_IS_ENABLED_RETRY; + } mmi = iptv_input_is_free(mi, mm, &conf, weight, NULL); tvhtrace(LS_IPTV_SUB, "enabled[%p]: free %p", mm, mmi); return mmi == NULL ? MI_IS_ENABLED_OK : MI_IS_ENABLED_RETRY; @@ -279,6 +315,7 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh int ret = SM_CODE_TUNING_FAILED; iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux; iptv_handler_t *ih; + iptv_thread_pool_t *pool = ((iptv_input_t *)mi)->mi_tpool; char buf[256], rawbuf[512], *raw = im->mm_iptv_url, *s; const char *scheme; url_t url; @@ -342,35 +379,30 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh if (im->mm_iptv_url_raw) { im->mm_active = mmi; // Note: must set here else mux_started call // will not realise we're ready to accept pid open calls - ret = ih->start(im, im->mm_iptv_url_raw, &url); - if (!ret) + ret = ih->start((iptv_input_t *)mi, im, im->mm_iptv_url_raw, &url); + if (!ret) { im->im_handler = ih; - else + pool->streams++; + } else { im->mm_active = NULL; + } } pthread_mutex_unlock(&iptv_lock); urlreset(&url); free(s); + return ret; } -static void -iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) +void +iptv_input_close_fds ( iptv_input_t *mi, iptv_mux_t *im ) { - iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux; - - pthread_mutex_lock(&iptv_lock); - - mtimer_disarm(&im->im_pause_timer); - - /* Stop */ - if (im->im_handler->stop) - im->im_handler->stop(im); + iptv_thread_pool_t *pool = mi->mi_tpool; /* Close file */ if (im->mm_iptv_fd > 0) { - tvhpoll_rem1(iptv_poll, im->mm_iptv_fd); + tvhpoll_rem1(pool->poll, im->mm_iptv_fd); udp_close(im->mm_iptv_connection); im->mm_iptv_connection = NULL; im->mm_iptv_fd = -1; @@ -378,11 +410,29 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) /* Close file2 */ if (im->mm_iptv_fd2 > 0) { - tvhpoll_rem1(iptv_poll, im->mm_iptv_fd2); + tvhpoll_rem1(pool->poll, im->mm_iptv_fd2); udp_close(im->mm_iptv_connection2); im->mm_iptv_connection2 = NULL; im->mm_iptv_fd2 = -1; } +} + +static void +iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) +{ + iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux; + iptv_thread_pool_t *pool = ((iptv_input_t *)mi)->mi_tpool; + uint32_t u32; + + pthread_mutex_lock(&iptv_lock); + + mtimer_disarm(&im->im_pause_timer); + + /* Stop */ + if (im->im_handler->stop) + im->im_handler->stop((iptv_input_t *)mi, im); + + iptv_input_close_fds((iptv_input_t *)mi, im); /* Free memory */ sbuf_free(&im->mm_iptv_buffer); @@ -390,7 +440,12 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) /* Clear bw limit */ ((iptv_network_t *)im->mm_network)->in_bw_limited = 0; + u32 = --pool->streams; + pthread_mutex_unlock(&iptv_lock); + + if (u32 == 0) + iptv_input_thread_manage(iptv_tpool_safe_count()); } static void @@ -428,14 +483,18 @@ void iptv_input_unpause ( void *aux ) { iptv_mux_t *im = aux; + iptv_input_t *mi; int pause; pthread_mutex_lock(&iptv_lock); - if (iptv_input_pause_check(im)) { - pause = 1; - } else { - tvhtrace(LS_IPTV_PCR, "unpause timer callback"); - im->im_handler->pause(im, 0); - pause = 0; + pause = 0; + if (im->mm_active) { + mi = (iptv_input_t *)im->mm_active->mmi_input; + if (iptv_input_pause_check(im)) { + pause = 1; + } else { + tvhtrace(LS_IPTV_PCR, "unpause timer callback"); + im->im_handler->pause(mi, im, 0); + } } pthread_mutex_unlock(&iptv_lock); if (pause) @@ -445,13 +504,15 @@ iptv_input_unpause ( void *aux ) static void * iptv_input_thread ( void *aux ) { + iptv_thread_pool_t *pool = aux; int nfds, r; ssize_t n; iptv_mux_t *im; + iptv_input_t *mi; tvhpoll_event_t ev; while ( tvheadend_is_running() ) { - nfds = tvhpoll_wait(iptv_poll, &ev, 1, -1); + nfds = tvhpoll_wait(pool->poll, &ev, 1, -1); if ( nfds < 0 ) { if (tvheadend_is_running() && !ERRNO_AGAIN(errno)) { tvherror(LS_IPTV, "poll() error %s, sleeping 1 second", @@ -469,15 +530,16 @@ iptv_input_thread ( void *aux ) /* Only when active */ if (im->mm_active) { + mi = (iptv_input_t *)im->mm_active->mmi_input; /* Get data */ - if ((n = im->im_handler->read(im)) < 0) { + if ((n = im->im_handler->read(mi, im)) < 0) { tvherror(LS_IPTV, "read() error %s", strerror(errno)); - im->im_handler->stop(im); + im->im_handler->stop(mi, im); break; } r = iptv_input_recv_packets(im, n); if (r == 1) - im->im_handler->pause(im, 1); + im->im_handler->pause(mi, im, 1); } pthread_mutex_unlock(&iptv_lock); @@ -493,12 +555,14 @@ iptv_input_thread ( void *aux ) } void -iptv_input_pause_handler ( iptv_mux_t *im, int pause ) +iptv_input_pause_handler ( iptv_input_t *mi, iptv_mux_t *im, int pause ) { + iptv_thread_pool_t *tpool = mi->mi_tpool; + if (pause) - tvhpoll_rem1(iptv_poll, im->mm_iptv_fd); + tvhpoll_rem1(tpool->poll, im->mm_iptv_fd); else - tvhpoll_add1(iptv_poll, im->mm_iptv_fd, TVHPOLL_IN, im); + tvhpoll_add1(tpool->poll, im->mm_iptv_fd, TVHPOLL_IN, im); } void @@ -508,9 +572,8 @@ iptv_input_recv_flush ( iptv_mux_t *im ) if (mmi == NULL) return; - mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi, - &im->mm_iptv_buffer, MPEGTS_DATA_CC_RESTART, - NULL); + mpegts_input_recv_packets(mmi, &im->mm_iptv_buffer, + MPEGTS_DATA_CC_RESTART, NULL); } int @@ -547,8 +610,7 @@ iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len ) tvhtrace(LS_IPTV_PCR, "pcr: paused"); return 1; } - mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi, - &im->mm_iptv_buffer, + mpegts_input_recv_packets(mmi, &im->mm_iptv_buffer, in->in_remove_scrambled_bits ? MPEGTS_DATA_REMOVE_SCRAMBLED : 0, &pcr); if (pcr.pcr_first != PTS_UNSET && pcr.pcr_last != PTS_UNSET) { @@ -578,14 +640,15 @@ iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len ) return 0; } - int -iptv_input_fd_started ( iptv_mux_t *im ) +iptv_input_fd_started ( iptv_input_t *mi, iptv_mux_t *im ) { + iptv_thread_pool_t *tpool = mi->mi_tpool; + /* Setup poll */ if (im->mm_iptv_fd > 0) { /* Error? */ - if (tvhpoll_add1(iptv_poll, im->mm_iptv_fd, TVHPOLL_IN, im) < 0) { + if (tvhpoll_add1(tpool->poll, im->mm_iptv_fd, TVHPOLL_IN, im) < 0) { tvherror(LS_IPTV, "%s - failed to add to poll q", im->mm_nicename); close(im->mm_iptv_fd); im->mm_iptv_fd = -1; @@ -596,7 +659,7 @@ iptv_input_fd_started ( iptv_mux_t *im ) /* Setup poll2 */ if (im->mm_iptv_fd2 > 0) { /* Error? */ - if (tvhpoll_add1(iptv_poll, im->mm_iptv_fd2, TVHPOLL_IN, im) < 0) { + if (tvhpoll_add1(tpool->poll, im->mm_iptv_fd2, TVHPOLL_IN, im) < 0) { tvherror(LS_IPTV, "%s - failed to add to poll q (2)", im->mm_nicename); close(im->mm_iptv_fd2); im->mm_iptv_fd2 = -1; @@ -607,7 +670,7 @@ iptv_input_fd_started ( iptv_mux_t *im ) } void -iptv_input_mux_started ( iptv_mux_t *im ) +iptv_input_mux_started ( iptv_input_t *mi, iptv_mux_t *im ) { /* Allocate input buffer */ sbuf_reset_and_alloc(&im->mm_iptv_buffer, IPTV_BUF_SIZE); @@ -615,13 +678,13 @@ iptv_input_mux_started ( iptv_mux_t *im ) im->im_pcr = PTS_UNSET; im->im_pcr_pid = MPEGTS_PID_NONE; - if (iptv_input_fd_started(im)) + if (iptv_input_fd_started(mi, im)) return; /* Install table handlers */ mpegts_mux_t *mm = (mpegts_mux_t*)im; if (mm->mm_active) - psi_tables_install(mm->mm_active->mmi_input, mm, + psi_tables_install((mpegts_input_t *)mi, mm, im->mm_iptv_atsc ? DVB_SYS_ATSC_ALL : DVB_SYS_DVBT); } @@ -998,9 +1061,6 @@ iptv_network_create0 in->mn_skipinitscan = 1; } - /* Link */ - mpegts_input_add_network((mpegts_input_t*)iptv_input, (mpegts_network_t*)in); - /* Load muxes */ if ((c = hts_settings_load_r(1, "input/iptv/networks/%s/muxes", idnode_uuid_as_str(&in->mn_id, ubuf)))) { @@ -1092,8 +1152,73 @@ iptv_input_wizard_set( tvh_input_t *ti, htsmsg_t *conf, const char *lang ) mpegts_network_wizard_create(ntype, NULL, lang); } +static iptv_input_t * +iptv_create_input ( void *tpool ) +{ + iptv_input_t *input = calloc(1, sizeof(iptv_input_t)); + mpegts_network_t *mn; + + /* Init Input */ + mpegts_input_create0((mpegts_input_t*)input, + &iptv_input_class, NULL, NULL); + input->ti_wizard_get = iptv_input_wizard_get; + input->ti_wizard_set = iptv_input_wizard_set; + input->mi_warm_mux = iptv_input_warm_mux; + input->mi_start_mux = iptv_input_start_mux; + input->mi_stop_mux = iptv_input_stop_mux; + input->mi_is_enabled = iptv_input_is_enabled; + input->mi_get_weight = iptv_input_get_weight; + input->mi_get_grace = iptv_input_get_grace; + input->mi_get_priority = iptv_input_get_priority; + input->mi_display_name = iptv_input_display_name; + input->mi_enabled = 1; + + input->mi_tpool = tpool; + + /* Link */ + LIST_FOREACH(mn, &mpegts_network_all, mn_global_link) + if (idnode_is_instance(&mn->mn_id, &iptv_network_class)) + mpegts_input_add_network((mpegts_input_t *)input, mn); + + return input; +} + +static void +iptv_input_thread_manage(int count) +{ + iptv_thread_pool_t *pool; + + while (iptv_tpool_count < count) { + pool = calloc(1, sizeof(*pool)); + pool->poll = tvhpoll_create(10); + pool->input = iptv_create_input(pool); + tvhthread_create(&pool->thread, NULL, iptv_input_thread, pool, "iptv"); + TAILQ_INSERT_TAIL(&iptv_tpool, pool, link); + iptv_tpool_count++; + } + while (iptv_tpool_count > count) { + TAILQ_FOREACH(pool, &iptv_tpool, link) + if (pool->streams == 0) { + pthread_kill(pool->thread, SIGTERM); + pthread_join(pool->thread, NULL); + TAILQ_REMOVE(&iptv_tpool, pool, link); + mpegts_input_stop_all((mpegts_input_t*)pool->input); + mpegts_input_delete((mpegts_input_t *)pool->input, 0); + tvhpoll_destroy(pool->poll); + free(pool); + iptv_tpool_count--; + break; + } + if (pool == NULL) + break; + } +} + void iptv_init ( void ) { + TAILQ_INIT(&iptv_tpool); + pthread_mutex_init(&iptv_lock, NULL); + /* Register handlers */ iptv_http_init(); iptv_udp_init(); @@ -1104,44 +1229,23 @@ void iptv_init ( void ) iptv_libav_init(); #endif - iptv_input = calloc(1, sizeof(iptv_input_t)); - - /* Init Input */ - mpegts_input_create0((mpegts_input_t*)iptv_input, - &iptv_input_class, NULL, NULL); - iptv_input->ti_wizard_get = iptv_input_wizard_get; - iptv_input->ti_wizard_set = iptv_input_wizard_set; - iptv_input->mi_warm_mux = iptv_input_warm_mux; - iptv_input->mi_start_mux = iptv_input_start_mux; - iptv_input->mi_stop_mux = iptv_input_stop_mux; - iptv_input->mi_is_enabled = iptv_input_is_enabled; - iptv_input->mi_get_weight = iptv_input_get_weight; - iptv_input->mi_get_grace = iptv_input_get_grace; - iptv_input->mi_get_priority = iptv_input_get_priority; - iptv_input->mi_display_name = iptv_input_display_name; - iptv_input->mi_enabled = 1; - /* Init Network */ iptv_network_init(); - /* Setup TS thread */ - iptv_poll = tvhpoll_create(10); - pthread_mutex_init(&iptv_lock, NULL); - tvhthread_create(&iptv_thread, NULL, iptv_input_thread, NULL, "iptv"); + /* Threads init */ + iptv_input_thread_manage(iptv_tpool_safe_count()); + tvhinfo(LS_IPTV, "Using %d input thread(s)", iptv_tpool_count); } void iptv_done ( void ) { - pthread_kill(iptv_thread, SIGTERM); - pthread_join(iptv_thread, NULL); - tvhpoll_destroy(iptv_poll); pthread_mutex_lock(&global_lock); + iptv_input_thread_manage(0); + assert(TAILQ_EMPTY(&iptv_tpool)); mpegts_network_unregister_builder(&iptv_auto_network_class); mpegts_network_unregister_builder(&iptv_network_class); mpegts_network_class_delete(&iptv_auto_network_class, 0); mpegts_network_class_delete(&iptv_network_class, 0); - mpegts_input_stop_all((mpegts_input_t*)iptv_input); - mpegts_input_delete((mpegts_input_t *)iptv_input, 0); pthread_mutex_unlock(&global_lock); } diff --git a/src/input/mpegts/iptv/iptv_file.c b/src/input/mpegts/iptv/iptv_file.c index 94a5a4b5e..1d86ac6d7 100644 --- a/src/input/mpegts/iptv/iptv_file.c +++ b/src/input/mpegts/iptv/iptv_file.c @@ -93,7 +93,8 @@ iptv_file_thread ( void *aux ) * Open file */ static int -iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url ) +iptv_file_start + ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url ) { file_priv_t *fp; int fd = tvh_open(raw + 7, O_RDONLY | O_NONBLOCK, 0); @@ -107,14 +108,14 @@ iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url ) fp->fd = fd; tvh_cond_init(&fp->cond); im->im_data = fp; - iptv_input_mux_started(im); + iptv_input_mux_started(mi, im); tvhthread_create(&fp->tid, NULL, iptv_file_thread, im, "iptvfile"); return 0; } static void iptv_file_stop - ( iptv_mux_t *im ) + ( iptv_input_t *mi, iptv_mux_t *im ) { file_priv_t *fp = im->im_data; int rd = fp->fd; diff --git a/src/input/mpegts/iptv/iptv_http.c b/src/input/mpegts/iptv/iptv_http.c index a94a690fd..8461ec5d8 100644 --- a/src/input/mpegts/iptv/iptv_http.c +++ b/src/input/mpegts/iptv/iptv_http.c @@ -29,6 +29,7 @@ #endif typedef struct http_priv { + iptv_input_t *mi; iptv_mux_t *im; http_client_t *hc; uint8_t shutdown; @@ -249,7 +250,7 @@ iptv_http_header ( http_client_t *hc ) hp->off = 0; if (iptv_http_safe_global_lock(hp)) { if (!hp->started) { - iptv_input_mux_started(hp->im); + iptv_input_mux_started(hp->mi, hp->im); } else { iptv_input_recv_flush(hp->im); } @@ -511,13 +512,14 @@ iptv_http_create_header */ static int iptv_http_start - ( iptv_mux_t *im, const char *raw, const url_t *u ) + ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *u ) { http_priv_t *hp; http_client_t *hc; int r; hp = calloc(1, sizeof(*hp)); + hp->mi = mi; hp->im = im; if (!(hc = http_client_connect(hp, HTTP_VERSION_1_1, u->scheme, u->host, u->port, NULL))) { @@ -552,7 +554,7 @@ iptv_http_start */ static void iptv_http_stop - ( iptv_mux_t *im ) + ( iptv_input_t *mi, iptv_mux_t *im ) { http_priv_t *hp = im->im_data; @@ -579,7 +581,7 @@ iptv_http_stop */ static void iptv_http_pause - ( iptv_mux_t *im, int pause ) + ( iptv_input_t *mi, iptv_mux_t *im, int pause ) { http_priv_t *hp = im->im_data; diff --git a/src/input/mpegts/iptv/iptv_libav.c b/src/input/mpegts/iptv/iptv_libav.c index 35381e89d..e7a058a02 100644 --- a/src/input/mpegts/iptv/iptv_libav.c +++ b/src/input/mpegts/iptv/iptv_libav.c @@ -174,7 +174,8 @@ fail: * Start new thread */ static int -iptv_libav_start ( iptv_mux_t *im, const char *raw, const url_t *url ) +iptv_libav_start + ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url ) { iptv_libav_priv_t *la = calloc(1, sizeof(*la)); @@ -187,19 +188,19 @@ iptv_libav_start ( iptv_mux_t *im, const char *raw, const url_t *url ) la->mux = im; tvh_pipe(O_NONBLOCK, &la->pipe); im->mm_iptv_fd = la->pipe.rd; - iptv_input_fd_started(im); + iptv_input_fd_started(mi, im); atomic_set(&la->running, 1); atomic_set(&la->pause, 0); sbuf_init(&la->sbuf); tvhthread_create(&la->thread, NULL, iptv_libav_thread, la, "libavinput"); if (raw[0]) - iptv_input_mux_started(im); + iptv_input_mux_started(mi, im); return 0; } static void iptv_libav_stop - ( iptv_mux_t *im ) + ( iptv_input_t *mi, iptv_mux_t *im ) { iptv_libav_priv_t *la = im->im_opaque; @@ -215,7 +216,7 @@ iptv_libav_stop } static ssize_t -iptv_libav_read ( iptv_mux_t *im ) +iptv_libav_read ( iptv_input_t *mi, iptv_mux_t *im ) { iptv_libav_priv_t *la = im->im_opaque; char buf[8192]; @@ -233,7 +234,7 @@ iptv_libav_read ( iptv_mux_t *im ) } static void -iptv_libav_pause ( iptv_mux_t *im, int pause ) +iptv_libav_pause ( iptv_input_t *mi, iptv_mux_t *im, int pause ) { iptv_libav_priv_t *la = im->im_opaque; diff --git a/src/input/mpegts/iptv/iptv_mux.c b/src/input/mpegts/iptv/iptv_mux.c index c796a4738..903dff11b 100644 --- a/src/input/mpegts/iptv/iptv_mux.c +++ b/src/input/mpegts/iptv/iptv_mux.c @@ -25,7 +25,6 @@ * Class */ extern const idclass_t mpegts_mux_class; -extern const idclass_t mpegts_mux_instance_class; static inline void iptv_url_set0 ( char **url, char **sane_url, @@ -373,11 +372,6 @@ iptv_mux_create0 ( iptv_network_t *in, const char *uuid, htsmsg_t *conf ) sbuf_init(&im->mm_iptv_buffer); - /* Create Instance */ - (void)mpegts_mux_instance_create(mpegts_mux_instance, NULL, - (mpegts_input_t*)iptv_input, - (mpegts_mux_t*)im); - /* Services */ c2 = NULL; c = htsmsg_get_map(conf, "services"); diff --git a/src/input/mpegts/iptv/iptv_pipe.c b/src/input/mpegts/iptv/iptv_pipe.c index 6f4265dd7..89f3a486c 100644 --- a/src/input/mpegts/iptv/iptv_pipe.c +++ b/src/input/mpegts/iptv/iptv_pipe.c @@ -32,7 +32,8 @@ * Spawn task and create pipes */ static int -iptv_pipe_start ( iptv_mux_t *im, const char *raw, const url_t *url ) +iptv_pipe_start + ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url ) { char **argv = NULL, **envp = NULL; const char *replace[] = { "${service_name}", im->mm_iptv_svcname ?: "", NULL }; @@ -65,7 +66,7 @@ iptv_pipe_start ( iptv_mux_t *im, const char *raw, const url_t *url ) im->mm_iptv_respawn_last = mclk(); if (url) - iptv_input_mux_started(im); + iptv_input_mux_started(mi, im); return 0; err: @@ -78,20 +79,15 @@ err: static void iptv_pipe_stop - ( iptv_mux_t *im ) + ( iptv_input_t *mi, iptv_mux_t *im ) { - int rd = im->mm_iptv_fd; pid_t pid = (intptr_t)im->im_data; spawn_kill(pid, tvh_kill_to_sig(im->mm_iptv_kill), im->mm_iptv_kill_timeout); - if (rd > 0) { - tvhpoll_rem1(iptv_poll, rd); - close(rd); - } - im->mm_iptv_fd = -1; + iptv_input_close_fds(mi, im); } static ssize_t -iptv_pipe_read ( iptv_mux_t *im ) +iptv_pipe_read ( iptv_input_t *mi, iptv_mux_t *im ) { int r, rd = im->mm_iptv_fd; ssize_t res = 0; @@ -107,12 +103,10 @@ iptv_pipe_read ( iptv_mux_t *im ) continue; } if (r <= 0) { - tvhpoll_rem1(iptv_poll, rd); - close(rd); + iptv_input_close_fds(mi, im); pid = (intptr_t)im->im_data; - spawn_kill(pid, tvh_kill_to_sig(im->mm_iptv_kill), im->mm_iptv_kill_timeout); - im->mm_iptv_fd = -1; im->im_data = NULL; + spawn_kill(pid, tvh_kill_to_sig(im->mm_iptv_kill), im->mm_iptv_kill_timeout); if (mclk() < im->mm_iptv_respawn_last + sec2mono(2)) { tvherror(LS_IPTV, "stdin pipe unexpectedly closed: %s", r < 0 ? strerror(errno) : "No data"); @@ -122,10 +116,10 @@ iptv_pipe_read ( iptv_mux_t *im ) pthread_mutex_lock(&global_lock); pthread_mutex_lock(&iptv_lock); if (im->mm_active) { - if (iptv_pipe_start(im, im->mm_iptv_url_raw, NULL)) { + if (iptv_pipe_start(mi, im, im->mm_iptv_url_raw, NULL)) { tvherror(LS_IPTV, "unable to respawn %s", im->mm_iptv_url_raw); } else { - iptv_input_fd_started(im); + iptv_input_fd_started(mi, im); im->mm_iptv_respawn_last = mclk(); } } diff --git a/src/input/mpegts/iptv/iptv_private.h b/src/input/mpegts/iptv/iptv_private.h index e7c000ca2..84109eb08 100644 --- a/src/input/mpegts/iptv/iptv_private.h +++ b/src/input/mpegts/iptv/iptv_private.h @@ -42,10 +42,10 @@ struct iptv_handler uint32_t buffer_limit; - int (*start) ( iptv_mux_t *im, const char *raw, const url_t *url ); - void (*stop) ( iptv_mux_t *im ); - ssize_t (*read) ( iptv_mux_t *im ); - void (*pause) ( iptv_mux_t *im, int pause ); + int (*start) ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url ); + void (*stop) ( iptv_input_t *mi, iptv_mux_t *im ); + ssize_t (*read) ( iptv_input_t *mi, iptv_mux_t *im ); + void (*pause) ( iptv_input_t *mi, iptv_mux_t *im, int pause ); RB_ENTRY(iptv_handler) link; }; @@ -55,13 +55,16 @@ void iptv_handler_register ( iptv_handler_t *ih, int num ); struct iptv_input { mpegts_input_t; + + void *mi_tpool; }; -int iptv_input_fd_started ( iptv_mux_t *im ); -void iptv_input_mux_started ( iptv_mux_t *im ); +int iptv_input_fd_started ( iptv_input_t *mi, iptv_mux_t *im ); +void iptv_input_close_fds ( iptv_input_t *mi, iptv_mux_t *im ); +void iptv_input_mux_started ( iptv_input_t *mi, iptv_mux_t *im ); int iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len ); void iptv_input_recv_flush ( iptv_mux_t *im ); -void iptv_input_pause_handler ( iptv_mux_t *im, int pause ); +void iptv_input_pause_handler ( iptv_input_t *mi, iptv_mux_t *im, int pause ); struct iptv_network { @@ -176,11 +179,9 @@ extern const idclass_t iptv_network_class; extern const idclass_t iptv_auto_network_class; extern const idclass_t iptv_mux_class; -extern iptv_input_t *iptv_input; extern iptv_network_t *iptv_network; extern pthread_mutex_t iptv_lock; -extern tvhpoll_t *iptv_poll; int iptv_url_set ( char **url, char **sane_url, const char *str, int allow_file, int allow_pipe ); diff --git a/src/input/mpegts/iptv/iptv_rtsp.c b/src/input/mpegts/iptv/iptv_rtsp.c index ac0f550e2..e661cc008 100644 --- a/src/input/mpegts/iptv/iptv_rtsp.c +++ b/src/input/mpegts/iptv/iptv_rtsp.c @@ -112,7 +112,8 @@ iptv_rtsp_header ( http_client_t *hc ) } hc->hc_cmd = HTTP_CMD_NONE; pthread_mutex_lock(&global_lock); - iptv_input_mux_started(hc->hc_aux); + if (im->mm_active) + iptv_input_mux_started((iptv_input_t *)im->mm_active->mmi_input, im); mtimer_arm_rel(&rp->alive_timer, iptv_rtsp_alive_cb, im, sec2mono(MAX(1, (hc->hc_rtp_timeout / 2) - 1))); pthread_mutex_unlock(&global_lock); @@ -147,7 +148,7 @@ iptv_rtsp_data */ static int iptv_rtsp_start - ( iptv_mux_t *im, const char *raw, const url_t *u ) + ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *u ) { rtsp_priv_t *rp; http_client_t *hc; @@ -209,7 +210,7 @@ iptv_rtsp_start */ static void iptv_rtsp_stop - ( iptv_mux_t *im ) + ( iptv_input_t *mi, iptv_mux_t *im ) { rtsp_priv_t *rp = im->im_data; int play; @@ -267,7 +268,7 @@ iptv_rtp_header_callback ( iptv_mux_t *im, uint8_t *rtp, int len ) * Read data */ static ssize_t -iptv_rtsp_read ( iptv_mux_t *im ) +iptv_rtsp_read ( iptv_input_t *mi, iptv_mux_t *im ) { rtsp_priv_t *rp = im->im_data; udp_multirecv_t *um = &rp->um; diff --git a/src/input/mpegts/iptv/iptv_udp.c b/src/input/mpegts/iptv/iptv_udp.c index 0d55fbd6e..cde65b6ed 100644 --- a/src/input/mpegts/iptv/iptv_udp.c +++ b/src/input/mpegts/iptv/iptv_udp.c @@ -32,7 +32,8 @@ * Connect UDP/RTP */ static int -iptv_udp_start ( iptv_mux_t *im, const char *raw, const url_t *url ) +iptv_udp_start + ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url ) { udp_connection_t *conn; udp_multirecv_t *um; @@ -54,13 +55,13 @@ iptv_udp_start ( iptv_mux_t *im, const char *raw, const url_t *url ) udp_multirecv_init(um, IPTV_PKTS, IPTV_PKT_PAYLOAD); im->im_data = um; - iptv_input_mux_started(im); + iptv_input_mux_started(mi, im); return 0; } static void iptv_udp_stop - ( iptv_mux_t *im ) + ( iptv_input_t *mi, iptv_mux_t *im ) { udp_multirecv_t *um = im->im_data; @@ -72,7 +73,7 @@ iptv_udp_stop } static ssize_t -iptv_udp_read ( iptv_mux_t *im ) +iptv_udp_read ( iptv_input_t *mi, iptv_mux_t *im ) { int i, n; struct iovec *iovec; @@ -178,7 +179,7 @@ iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um, } static ssize_t -iptv_udp_rtp_read ( iptv_mux_t *im ) +iptv_udp_rtp_read ( iptv_input_t *mi, iptv_mux_t *im ) { udp_multirecv_t *um = im->im_data; diff --git a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c index 566e64972..e7b25093f 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c +++ b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c @@ -1454,7 +1454,7 @@ linuxdvb_frontend_input_thread ( void *aux ) } /* Process */ - mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, 0, NULL); + mpegts_input_recv_packets(mmi, &sb, 0, NULL); } sbuf_free(&sb); diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index dd7c23629..cb7df25b1 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -1126,9 +1126,10 @@ ts_sync_count ( const uint8_t *tsb, int len ) void mpegts_input_recv_packets - ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, + ( mpegts_mux_instance_t *mmi, sbuf_t *sb, int flags, mpegts_pcr_t *pcr ) { + mpegts_input_t *mi = mmi->mmi_input; int len, len2, off; mpegts_packet_t *mp; uint8_t *tsb; diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 64e8f60e9..b9b497115 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -803,6 +803,12 @@ mpegts_mux_is_epg ( mpegts_mux_t *mm ) static void mpegts_mux_create_instances ( mpegts_mux_t *mm ) { + mpegts_network_link_t *mnl; + LIST_FOREACH(mnl, &mm->mm_network->mn_inputs, mnl_mn_link) { + mpegts_input_t *mi = mnl->mnl_input; + if (mi->mi_is_enabled(mi, mm, 0, -1) != MI_IS_ENABLED_NEVER) + mi->mi_create_mux_instance(mi, mm); + } } static int diff --git a/src/input/mpegts/mpegts_mux_dvb.c b/src/input/mpegts/mpegts_mux_dvb.c index e4f362fdc..7f8260658 100644 --- a/src/input/mpegts/mpegts_mux_dvb.c +++ b/src/input/mpegts/mpegts_mux_dvb.c @@ -1071,17 +1071,6 @@ dvb_mux_display_name ( mpegts_mux_t *mm, char *buf, size_t len ) snprintf(buf, len, "%d%s", freq, extra); } -static void -dvb_mux_create_instances ( mpegts_mux_t *mm ) -{ - mpegts_network_link_t *mnl; - LIST_FOREACH(mnl, &mm->mm_network->mn_inputs, mnl_mn_link) { - mpegts_input_t *mi = mnl->mnl_input; - if (mi->mi_is_enabled(mi, mm, 0, -1) != MI_IS_ENABLED_NEVER) - mi->mi_create_mux_instance(mi, mm); - } -} - static void dvb_mux_delete ( mpegts_mux_t *mm, int delconf ) { @@ -1177,7 +1166,6 @@ dvb_mux_create0 lm->mm_delete = dvb_mux_delete; lm->mm_display_name = dvb_mux_display_name; lm->mm_config_save = dvb_mux_config_save; - lm->mm_create_instances = dvb_mux_create_instances; mpegts_mux_nice_name(mm, buf, sizeof(buf)); free(mm->mm_nicename); diff --git a/src/input/mpegts/satip/satip_frontend.c b/src/input/mpegts/satip/satip_frontend.c index 43bda829b..706ae0941 100644 --- a/src/input/mpegts/satip/satip_frontend.c +++ b/src/input/mpegts/satip/satip_frontend.c @@ -1491,8 +1491,7 @@ satip_frontend_rtp_data_received( http_client_t *hc, void *buf, size_t len ) if (lfe->sf_req == lfe->sf_req_thread) { mmi = lfe->sf_req->sf_mmi; atomic_add(&mmi->tii_stats.unc, unc); - mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, - &lfe->sf_sbuf, 0, NULL); + mpegts_input_recv_packets(mmi, &lfe->sf_sbuf, 0, NULL); } pthread_mutex_unlock(&lfe->sf_dvr_lock); lfe->sf_last_data_tstamp = mclk(); @@ -2027,7 +2026,7 @@ new_tune: pthread_mutex_lock(&lfe->sf_dvr_lock); if (lfe->sf_req == lfe->sf_req_thread) { atomic_add(&mmi->tii_stats.unc, unc); - mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, sb, 0, NULL); + mpegts_input_recv_packets(mmi, sb, 0, NULL); } else fatal = 1; pthread_mutex_unlock(&lfe->sf_dvr_lock); diff --git a/src/input/mpegts/tsfile/tsfile_input.c b/src/input/mpegts/tsfile/tsfile_input.c index d1b3f03d3..5484831e4 100644 --- a/src/input/mpegts/tsfile/tsfile_input.c +++ b/src/input/mpegts/tsfile/tsfile_input.c @@ -131,7 +131,7 @@ tsfile_input_thread ( void *aux ) pcr.pcr_first = PTS_UNSET; pcr.pcr_last = PTS_UNSET; pcr.pcr_pid = tmi->mmi_tsfile_pcr_pid; - mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf, 0, &pcr); + mpegts_input_recv_packets(mmi, &buf, 0, &pcr); if (pcr.pcr_pid) tmi->mmi_tsfile_pcr_pid = pcr.pcr_pid; diff --git a/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c b/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c index d99be68d1..32fda7eb6 100644 --- a/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c +++ b/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c @@ -201,7 +201,7 @@ tvhdhomerun_frontend_input_thread ( void *aux ) //tvhdebug(LS_TVHDHOMERUN, "got r=%d (thats %d)", r, (r == 7*188)); - mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, 0, NULL); + mpegts_input_recv_packets(mmi, &sb, 0, NULL); } tvhdebug(LS_TVHDHOMERUN, "setting target to none");