]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
mpegts input: add queue size checks for raw mpegts data for slow machines
authorJaroslav Kysela <perex@perex.cz>
Fri, 13 May 2016 07:18:08 +0000 (09:18 +0200)
committerJaroslav Kysela <perex@perex.cz>
Fri, 13 May 2016 07:18:08 +0000 (09:18 +0200)
src/input/mpegts.h
src/input/mpegts/mpegts_input.c

index ca9b64bc7683ffd9e088aa28ed575f2aac23b884..c8b260d0d6f378c57eb919c7beeb5613218bd363 100644 (file)
@@ -683,6 +683,8 @@ struct mpegts_input
   pthread_mutex_t                 mi_input_lock;
   tvh_cond_t                      mi_input_cond;
   TAILQ_HEAD(,mpegts_packet)      mi_input_queue;
+  uint64_t                        mi_input_queue_size;
+  tvhlog_limit_t                  mi_input_queue_loglimit;
 
   /* Data processing/output */
   // Note: this lock (mi_output_lock) protects all the remaining
@@ -696,6 +698,8 @@ struct mpegts_input
   pthread_t                       mi_table_tid;
   tvh_cond_t                      mi_table_cond;
   mpegts_table_feed_queue_t       mi_table_queue;
+  uint64_t                        mi_table_queue_size;
+  tvhlog_limit_t                  mi_table_queue_loglimit;
 
   /* DBus */
 #if ENABLE_DBUS_1
index 762776ee550a2404573321fff6e1707ced0dabf9..d95586e9f89707ba6c30d9f235bd96215a56dd3a 100644 (file)
@@ -1123,9 +1123,16 @@ retry:
 
     pthread_mutex_lock(&mi->mi_input_lock);
     if (mmi->mmi_mux->mm_active == mmi) {
-      memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2);
-      TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
-      tvh_cond_signal(&mi->mi_input_cond, 0);
+      if (mi->mi_input_queue_size < 50*1024*1024) {
+        mi->mi_input_queue_size += len2;
+        memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2);
+        TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
+        tvh_cond_signal(&mi->mi_input_cond, 0);
+      } else {
+        if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
+          tvhwarn("mpegts", "too much queued input data (over 50MB), discarding new");
+        free(mp);
+      }
     } else {
       free(mp);
     }
@@ -1373,12 +1380,18 @@ mpegts_input_process
           if (type & MPS_FTABLE)
             mpegts_input_table_dispatch(mm, muxname, tsb, llen);
           if (type & MPS_TABLE) {
-            mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen);
-            mtf->mtf_len = llen;
-            memcpy(mtf->mtf_tsb, tsb, llen);
-            mtf->mtf_mux = mm;
-            TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
-            table_wakeup = 1;
+            if (mi->mi_table_queue_size >= 2*1024*1024) {
+              if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
+                tvhwarn("mpegts", "too much queued table input data (over 2MB), discarding new");
+            } else {
+              mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen);
+              mtf->mtf_len = llen;
+              memcpy(mtf->mtf_tsb, tsb, llen);
+              mtf->mtf_mux = mm;
+              mi->mi_table_queue_size += llen;
+              TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
+              table_wakeup = 1;
+            }
           }
         } else {
           //tvhdebug("tsdemux", "%s - SI packet had errors", name);
@@ -1473,6 +1486,7 @@ mpegts_input_thread ( void * p )
       tvh_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock);
       continue;
     }
+    mi->mi_input_queue_size -= mp->mp_len;
     memoryinfo_free(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + mp->mp_len);
     TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
     pthread_mutex_unlock(&mi->mi_input_lock);
@@ -1516,6 +1530,7 @@ mpegts_input_thread ( void * p )
     TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
     free(mp);
   }
+  mi->mi_input_queue_size = 0;
   pthread_mutex_unlock(&mi->mi_input_lock);
 
   return NULL;
@@ -1537,6 +1552,7 @@ mpegts_input_table_thread ( void *aux )
       tvh_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock);
       continue;
     }
+    mi->mi_table_queue_size -= mtf->mtf_len;
     TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
     pthread_mutex_unlock(&mi->mi_output_lock);
     
@@ -1563,6 +1579,7 @@ mpegts_input_table_thread ( void *aux )
     TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
     free(mtf);
   }
+  mi->mi_table_queue_size = 0;
   pthread_mutex_unlock(&mi->mi_output_lock);
 
   return NULL;