From: Jaroslav Kysela Date: Fri, 13 May 2016 07:18:08 +0000 (+0200) Subject: mpegts input: add queue size checks for raw mpegts data for slow machines X-Git-Tag: v4.2.1~513 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=1936395398007a6985cd42c134c8bef521736dce;p=thirdparty%2Ftvheadend.git mpegts input: add queue size checks for raw mpegts data for slow machines --- diff --git a/src/input/mpegts.h b/src/input/mpegts.h index ca9b64bc7..c8b260d0d 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -683,6 +683,8 @@ struct mpegts_input pthread_mutex_t mi_input_lock; tvh_cond_t mi_input_cond; TAILQ_HEAD(,mpegts_packet) mi_input_queue; + uint64_t mi_input_queue_size; + tvhlog_limit_t mi_input_queue_loglimit; /* Data processing/output */ // Note: this lock (mi_output_lock) protects all the remaining @@ -696,6 +698,8 @@ struct mpegts_input pthread_t mi_table_tid; tvh_cond_t mi_table_cond; mpegts_table_feed_queue_t mi_table_queue; + uint64_t mi_table_queue_size; + tvhlog_limit_t mi_table_queue_loglimit; /* DBus */ #if ENABLE_DBUS_1 diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 762776ee5..d95586e9f 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -1123,9 +1123,16 @@ retry: pthread_mutex_lock(&mi->mi_input_lock); if (mmi->mmi_mux->mm_active == mmi) { - memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2); - TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link); - tvh_cond_signal(&mi->mi_input_cond, 0); + if (mi->mi_input_queue_size < 50*1024*1024) { + mi->mi_input_queue_size += len2; + memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2); + TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link); + tvh_cond_signal(&mi->mi_input_cond, 0); + } else { + if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10)) + tvhwarn("mpegts", "too much queued input data (over 50MB), discarding new"); + free(mp); + } } else { free(mp); } @@ -1373,12 +1380,18 @@ mpegts_input_process if (type & MPS_FTABLE) mpegts_input_table_dispatch(mm, muxname, tsb, llen); if (type & MPS_TABLE) { - mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen); - mtf->mtf_len = llen; - memcpy(mtf->mtf_tsb, tsb, llen); - mtf->mtf_mux = mm; - TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link); - table_wakeup = 1; + if (mi->mi_table_queue_size >= 2*1024*1024) { + if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10)) + tvhwarn("mpegts", "too much queued table input data (over 2MB), discarding new"); + } else { + mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen); + mtf->mtf_len = llen; + memcpy(mtf->mtf_tsb, tsb, llen); + mtf->mtf_mux = mm; + mi->mi_table_queue_size += llen; + TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link); + table_wakeup = 1; + } } } else { //tvhdebug("tsdemux", "%s - SI packet had errors", name); @@ -1473,6 +1486,7 @@ mpegts_input_thread ( void * p ) tvh_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock); continue; } + mi->mi_input_queue_size -= mp->mp_len; memoryinfo_free(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + mp->mp_len); TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link); pthread_mutex_unlock(&mi->mi_input_lock); @@ -1516,6 +1530,7 @@ mpegts_input_thread ( void * p ) TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link); free(mp); } + mi->mi_input_queue_size = 0; pthread_mutex_unlock(&mi->mi_input_lock); return NULL; @@ -1537,6 +1552,7 @@ mpegts_input_table_thread ( void *aux ) tvh_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock); continue; } + mi->mi_table_queue_size -= mtf->mtf_len; TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link); pthread_mutex_unlock(&mi->mi_output_lock); @@ -1563,6 +1579,7 @@ mpegts_input_table_thread ( void *aux ) TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link); free(mtf); } + mi->mi_table_queue_size = 0; pthread_mutex_unlock(&mi->mi_output_lock); return NULL;