pthread_mutex_t mi_input_lock;
tvh_cond_t mi_input_cond;
TAILQ_HEAD(,mpegts_packet) mi_input_queue;
+ uint64_t mi_input_queue_size;
+ tvhlog_limit_t mi_input_queue_loglimit;
/* Data processing/output */
// Note: this lock (mi_output_lock) protects all the remaining
pthread_t mi_table_tid;
tvh_cond_t mi_table_cond;
mpegts_table_feed_queue_t mi_table_queue;
+ uint64_t mi_table_queue_size;
+ tvhlog_limit_t mi_table_queue_loglimit;
/* DBus */
#if ENABLE_DBUS_1
pthread_mutex_lock(&mi->mi_input_lock);
if (mmi->mmi_mux->mm_active == mmi) {
- memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2);
- TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
- tvh_cond_signal(&mi->mi_input_cond, 0);
+ if (mi->mi_input_queue_size < 50*1024*1024) {
+ mi->mi_input_queue_size += len2;
+ memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2);
+ TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
+ tvh_cond_signal(&mi->mi_input_cond, 0);
+ } else {
+ if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
+ tvhwarn("mpegts", "too much queued input data (over 50MB), discarding new");
+ free(mp);
+ }
} else {
free(mp);
}
if (type & MPS_FTABLE)
mpegts_input_table_dispatch(mm, muxname, tsb, llen);
if (type & MPS_TABLE) {
- mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen);
- mtf->mtf_len = llen;
- memcpy(mtf->mtf_tsb, tsb, llen);
- mtf->mtf_mux = mm;
- TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
- table_wakeup = 1;
+ if (mi->mi_table_queue_size >= 2*1024*1024) {
+ if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
+ tvhwarn("mpegts", "too much queued table input data (over 2MB), discarding new");
+ } else {
+ mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen);
+ mtf->mtf_len = llen;
+ memcpy(mtf->mtf_tsb, tsb, llen);
+ mtf->mtf_mux = mm;
+ mi->mi_table_queue_size += llen;
+ TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
+ table_wakeup = 1;
+ }
}
} else {
//tvhdebug("tsdemux", "%s - SI packet had errors", name);
tvh_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock);
continue;
}
+ mi->mi_input_queue_size -= mp->mp_len;
memoryinfo_free(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + mp->mp_len);
TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
pthread_mutex_unlock(&mi->mi_input_lock);
TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
free(mp);
}
+ mi->mi_input_queue_size = 0;
pthread_mutex_unlock(&mi->mi_input_lock);
return NULL;
tvh_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock);
continue;
}
+ mi->mi_table_queue_size -= mtf->mtf_len;
TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
pthread_mutex_unlock(&mi->mi_output_lock);
TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
free(mtf);
}
+ mi->mi_table_queue_size = 0;
pthread_mutex_unlock(&mi->mi_output_lock);
return NULL;