#include "htsstr.h"
#include "channels.h"
#include "packet.h"
+#include "config.h"
#include <sys/socket.h>
#include <sys/types.h>
* IPTV state
* *************************************************************************/
-iptv_input_t *iptv_input;
-tvhpoll_t *iptv_poll;
-pthread_t iptv_thread;
pthread_mutex_t iptv_lock;
+typedef struct iptv_thread_pool {
+ TAILQ_ENTRY(iptv_thread_pool) link;
+ pthread_t thread;
+ iptv_input_t *input;
+ tvhpoll_t *poll;
+ uint32_t streams;
+} iptv_thread_pool_t;
+
+TAILQ_HEAD(, iptv_thread_pool) iptv_tpool;
+int iptv_tpool_count = 0;
+iptv_thread_pool_t *iptv_tpool_last = NULL;
+gtimer_t iptv_tpool_manage_timer;
+
+static void iptv_input_thread_manage(int count);
+
+static inline int iptv_tpool_safe_count(void)
+{
+ return MINMAX(config.iptv_tpool_count, 1, 128);
+}
+
/* **************************************************************************
* IPTV handlers
* *************************************************************************/
return NULL;
}
+static int
+iptv_input_thread_balance(iptv_input_t *mi)
+{
+ iptv_thread_pool_t *pool, *apool = mi->mi_tpool;
+
+ /*
+ * select input with the smallest count of active threads
+ */
+ TAILQ_FOREACH(pool, &iptv_tpool, link)
+ if (pool->streams < apool->streams)
+ return 1;
+ return 0;
+}
+
static int
iptv_input_is_enabled
( mpegts_input_t *mi, mpegts_mux_t *mm, int flags, int weight )
tvhtrace(LS_IPTV_SUB, "enabled[%p]: generic %d", mm, r);
return r;
}
+ if (iptv_input_thread_balance((iptv_input_t *)mi)) {
+ tvhtrace(LS_IPTV_SUB, "enabled[%p]: balance", mm);
+ return MI_IS_ENABLED_RETRY;
+ }
mmi = iptv_input_is_free(mi, mm, &conf, weight, NULL);
tvhtrace(LS_IPTV_SUB, "enabled[%p]: free %p", mm, mmi);
return mmi == NULL ? MI_IS_ENABLED_OK : MI_IS_ENABLED_RETRY;
int ret = SM_CODE_TUNING_FAILED;
iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux;
iptv_handler_t *ih;
+ iptv_thread_pool_t *pool = ((iptv_input_t *)mi)->mi_tpool;
char buf[256], rawbuf[512], *raw = im->mm_iptv_url, *s;
const char *scheme;
url_t url;
if (im->mm_iptv_url_raw) {
im->mm_active = mmi; // Note: must set here else mux_started call
// will not realise we're ready to accept pid open calls
- ret = ih->start(im, im->mm_iptv_url_raw, &url);
- if (!ret)
+ ret = ih->start((iptv_input_t *)mi, im, im->mm_iptv_url_raw, &url);
+ if (!ret) {
im->im_handler = ih;
- else
+ pool->streams++;
+ } else {
im->mm_active = NULL;
+ }
}
pthread_mutex_unlock(&iptv_lock);
urlreset(&url);
free(s);
+
return ret;
}
-static void
-iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
+void
+iptv_input_close_fds ( iptv_input_t *mi, iptv_mux_t *im )
{
- iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux;
-
- pthread_mutex_lock(&iptv_lock);
-
- mtimer_disarm(&im->im_pause_timer);
-
- /* Stop */
- if (im->im_handler->stop)
- im->im_handler->stop(im);
+ iptv_thread_pool_t *pool = mi->mi_tpool;
/* Close file */
if (im->mm_iptv_fd > 0) {
- tvhpoll_rem1(iptv_poll, im->mm_iptv_fd);
+ tvhpoll_rem1(pool->poll, im->mm_iptv_fd);
udp_close(im->mm_iptv_connection);
im->mm_iptv_connection = NULL;
im->mm_iptv_fd = -1;
/* Close file2 */
if (im->mm_iptv_fd2 > 0) {
- tvhpoll_rem1(iptv_poll, im->mm_iptv_fd2);
+ tvhpoll_rem1(pool->poll, im->mm_iptv_fd2);
udp_close(im->mm_iptv_connection2);
im->mm_iptv_connection2 = NULL;
im->mm_iptv_fd2 = -1;
}
+}
+
+static void
+iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
+{
+ iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux;
+ iptv_thread_pool_t *pool = ((iptv_input_t *)mi)->mi_tpool;
+ uint32_t u32;
+
+ pthread_mutex_lock(&iptv_lock);
+
+ mtimer_disarm(&im->im_pause_timer);
+
+ /* Stop */
+ if (im->im_handler->stop)
+ im->im_handler->stop((iptv_input_t *)mi, im);
+
+ iptv_input_close_fds((iptv_input_t *)mi, im);
/* Free memory */
sbuf_free(&im->mm_iptv_buffer);
/* Clear bw limit */
((iptv_network_t *)im->mm_network)->in_bw_limited = 0;
+ u32 = --pool->streams;
+
pthread_mutex_unlock(&iptv_lock);
+
+ if (u32 == 0)
+ iptv_input_thread_manage(iptv_tpool_safe_count());
}
static void
iptv_input_unpause ( void *aux )
{
iptv_mux_t *im = aux;
+ iptv_input_t *mi;
int pause;
pthread_mutex_lock(&iptv_lock);
- if (iptv_input_pause_check(im)) {
- pause = 1;
- } else {
- tvhtrace(LS_IPTV_PCR, "unpause timer callback");
- im->im_handler->pause(im, 0);
- pause = 0;
+ pause = 0;
+ if (im->mm_active) {
+ mi = (iptv_input_t *)im->mm_active->mmi_input;
+ if (iptv_input_pause_check(im)) {
+ pause = 1;
+ } else {
+ tvhtrace(LS_IPTV_PCR, "unpause timer callback");
+ im->im_handler->pause(mi, im, 0);
+ }
}
pthread_mutex_unlock(&iptv_lock);
if (pause)
static void *
iptv_input_thread ( void *aux )
{
+ iptv_thread_pool_t *pool = aux;
int nfds, r;
ssize_t n;
iptv_mux_t *im;
+ iptv_input_t *mi;
tvhpoll_event_t ev;
while ( tvheadend_is_running() ) {
- nfds = tvhpoll_wait(iptv_poll, &ev, 1, -1);
+ nfds = tvhpoll_wait(pool->poll, &ev, 1, -1);
if ( nfds < 0 ) {
if (tvheadend_is_running() && !ERRNO_AGAIN(errno)) {
tvherror(LS_IPTV, "poll() error %s, sleeping 1 second",
/* Only when active */
if (im->mm_active) {
+ mi = (iptv_input_t *)im->mm_active->mmi_input;
/* Get data */
- if ((n = im->im_handler->read(im)) < 0) {
+ if ((n = im->im_handler->read(mi, im)) < 0) {
tvherror(LS_IPTV, "read() error %s", strerror(errno));
- im->im_handler->stop(im);
+ im->im_handler->stop(mi, im);
break;
}
r = iptv_input_recv_packets(im, n);
if (r == 1)
- im->im_handler->pause(im, 1);
+ im->im_handler->pause(mi, im, 1);
}
pthread_mutex_unlock(&iptv_lock);
}
void
-iptv_input_pause_handler ( iptv_mux_t *im, int pause )
+iptv_input_pause_handler ( iptv_input_t *mi, iptv_mux_t *im, int pause )
{
+ iptv_thread_pool_t *tpool = mi->mi_tpool;
+
if (pause)
- tvhpoll_rem1(iptv_poll, im->mm_iptv_fd);
+ tvhpoll_rem1(tpool->poll, im->mm_iptv_fd);
else
- tvhpoll_add1(iptv_poll, im->mm_iptv_fd, TVHPOLL_IN, im);
+ tvhpoll_add1(tpool->poll, im->mm_iptv_fd, TVHPOLL_IN, im);
}
void
if (mmi == NULL)
return;
- mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
- &im->mm_iptv_buffer, MPEGTS_DATA_CC_RESTART,
- NULL);
+ mpegts_input_recv_packets(mmi, &im->mm_iptv_buffer,
+ MPEGTS_DATA_CC_RESTART, NULL);
}
int
tvhtrace(LS_IPTV_PCR, "pcr: paused");
return 1;
}
- mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
- &im->mm_iptv_buffer,
+ mpegts_input_recv_packets(mmi, &im->mm_iptv_buffer,
in->in_remove_scrambled_bits ?
MPEGTS_DATA_REMOVE_SCRAMBLED : 0, &pcr);
if (pcr.pcr_first != PTS_UNSET && pcr.pcr_last != PTS_UNSET) {
return 0;
}
-
int
-iptv_input_fd_started ( iptv_mux_t *im )
+iptv_input_fd_started ( iptv_input_t *mi, iptv_mux_t *im )
{
+ iptv_thread_pool_t *tpool = mi->mi_tpool;
+
/* Setup poll */
if (im->mm_iptv_fd > 0) {
/* Error? */
- if (tvhpoll_add1(iptv_poll, im->mm_iptv_fd, TVHPOLL_IN, im) < 0) {
+ if (tvhpoll_add1(tpool->poll, im->mm_iptv_fd, TVHPOLL_IN, im) < 0) {
tvherror(LS_IPTV, "%s - failed to add to poll q", im->mm_nicename);
close(im->mm_iptv_fd);
im->mm_iptv_fd = -1;
/* Setup poll2 */
if (im->mm_iptv_fd2 > 0) {
/* Error? */
- if (tvhpoll_add1(iptv_poll, im->mm_iptv_fd2, TVHPOLL_IN, im) < 0) {
+ if (tvhpoll_add1(tpool->poll, im->mm_iptv_fd2, TVHPOLL_IN, im) < 0) {
tvherror(LS_IPTV, "%s - failed to add to poll q (2)", im->mm_nicename);
close(im->mm_iptv_fd2);
im->mm_iptv_fd2 = -1;
}
void
-iptv_input_mux_started ( iptv_mux_t *im )
+iptv_input_mux_started ( iptv_input_t *mi, iptv_mux_t *im )
{
/* Allocate input buffer */
sbuf_reset_and_alloc(&im->mm_iptv_buffer, IPTV_BUF_SIZE);
im->im_pcr = PTS_UNSET;
im->im_pcr_pid = MPEGTS_PID_NONE;
- if (iptv_input_fd_started(im))
+ if (iptv_input_fd_started(mi, im))
return;
/* Install table handlers */
mpegts_mux_t *mm = (mpegts_mux_t*)im;
if (mm->mm_active)
- psi_tables_install(mm->mm_active->mmi_input, mm,
+ psi_tables_install((mpegts_input_t *)mi, mm,
im->mm_iptv_atsc ? DVB_SYS_ATSC_ALL : DVB_SYS_DVBT);
}
in->mn_skipinitscan = 1;
}
- /* Link */
- mpegts_input_add_network((mpegts_input_t*)iptv_input, (mpegts_network_t*)in);
-
/* Load muxes */
if ((c = hts_settings_load_r(1, "input/iptv/networks/%s/muxes",
idnode_uuid_as_str(&in->mn_id, ubuf)))) {
mpegts_network_wizard_create(ntype, NULL, lang);
}
+static iptv_input_t *
+iptv_create_input ( void *tpool )
+{
+ iptv_input_t *input = calloc(1, sizeof(iptv_input_t));
+ mpegts_network_t *mn;
+
+ /* Init Input */
+ mpegts_input_create0((mpegts_input_t*)input,
+ &iptv_input_class, NULL, NULL);
+ input->ti_wizard_get = iptv_input_wizard_get;
+ input->ti_wizard_set = iptv_input_wizard_set;
+ input->mi_warm_mux = iptv_input_warm_mux;
+ input->mi_start_mux = iptv_input_start_mux;
+ input->mi_stop_mux = iptv_input_stop_mux;
+ input->mi_is_enabled = iptv_input_is_enabled;
+ input->mi_get_weight = iptv_input_get_weight;
+ input->mi_get_grace = iptv_input_get_grace;
+ input->mi_get_priority = iptv_input_get_priority;
+ input->mi_display_name = iptv_input_display_name;
+ input->mi_enabled = 1;
+
+ input->mi_tpool = tpool;
+
+ /* Link */
+ LIST_FOREACH(mn, &mpegts_network_all, mn_global_link)
+ if (idnode_is_instance(&mn->mn_id, &iptv_network_class))
+ mpegts_input_add_network((mpegts_input_t *)input, mn);
+
+ return input;
+}
+
+static void
+iptv_input_thread_manage(int count)
+{
+ iptv_thread_pool_t *pool;
+
+ while (iptv_tpool_count < count) {
+ pool = calloc(1, sizeof(*pool));
+ pool->poll = tvhpoll_create(10);
+ pool->input = iptv_create_input(pool);
+ tvhthread_create(&pool->thread, NULL, iptv_input_thread, pool, "iptv");
+ TAILQ_INSERT_TAIL(&iptv_tpool, pool, link);
+ iptv_tpool_count++;
+ }
+ while (iptv_tpool_count > count) {
+ TAILQ_FOREACH(pool, &iptv_tpool, link)
+ if (pool->streams == 0) {
+ pthread_kill(pool->thread, SIGTERM);
+ pthread_join(pool->thread, NULL);
+ TAILQ_REMOVE(&iptv_tpool, pool, link);
+ mpegts_input_stop_all((mpegts_input_t*)pool->input);
+ mpegts_input_delete((mpegts_input_t *)pool->input, 0);
+ tvhpoll_destroy(pool->poll);
+ free(pool);
+ iptv_tpool_count--;
+ break;
+ }
+ if (pool == NULL)
+ break;
+ }
+}
+
void iptv_init ( void )
{
+ TAILQ_INIT(&iptv_tpool);
+ pthread_mutex_init(&iptv_lock, NULL);
+
/* Register handlers */
iptv_http_init();
iptv_udp_init();
iptv_libav_init();
#endif
- iptv_input = calloc(1, sizeof(iptv_input_t));
-
- /* Init Input */
- mpegts_input_create0((mpegts_input_t*)iptv_input,
- &iptv_input_class, NULL, NULL);
- iptv_input->ti_wizard_get = iptv_input_wizard_get;
- iptv_input->ti_wizard_set = iptv_input_wizard_set;
- iptv_input->mi_warm_mux = iptv_input_warm_mux;
- iptv_input->mi_start_mux = iptv_input_start_mux;
- iptv_input->mi_stop_mux = iptv_input_stop_mux;
- iptv_input->mi_is_enabled = iptv_input_is_enabled;
- iptv_input->mi_get_weight = iptv_input_get_weight;
- iptv_input->mi_get_grace = iptv_input_get_grace;
- iptv_input->mi_get_priority = iptv_input_get_priority;
- iptv_input->mi_display_name = iptv_input_display_name;
- iptv_input->mi_enabled = 1;
-
/* Init Network */
iptv_network_init();
- /* Setup TS thread */
- iptv_poll = tvhpoll_create(10);
- pthread_mutex_init(&iptv_lock, NULL);
- tvhthread_create(&iptv_thread, NULL, iptv_input_thread, NULL, "iptv");
+ /* Threads init */
+ iptv_input_thread_manage(iptv_tpool_safe_count());
+ tvhinfo(LS_IPTV, "Using %d input thread(s)", iptv_tpool_count);
}
void iptv_done ( void )
{
- pthread_kill(iptv_thread, SIGTERM);
- pthread_join(iptv_thread, NULL);
- tvhpoll_destroy(iptv_poll);
pthread_mutex_lock(&global_lock);
+ iptv_input_thread_manage(0);
+ assert(TAILQ_EMPTY(&iptv_tpool));
mpegts_network_unregister_builder(&iptv_auto_network_class);
mpegts_network_unregister_builder(&iptv_network_class);
mpegts_network_class_delete(&iptv_auto_network_class, 0);
mpegts_network_class_delete(&iptv_network_class, 0);
- mpegts_input_stop_all((mpegts_input_t*)iptv_input);
- mpegts_input_delete((mpegts_input_t *)iptv_input, 0);
pthread_mutex_unlock(&global_lock);
}