]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
[PR-171] Update stream queue size protection to be flexible.
authorAdam Sutton <dev@adamsutton.me.uk>
Tue, 23 Oct 2012 12:42:06 +0000 (13:42 +0100)
committerAdam Sutton <dev@adamsutton.me.uk>
Tue, 23 Oct 2012 13:03:48 +0000 (14:03 +0100)
The queue size limits are now configurable in the queue init function.

For now only HTTP queues are bounded, the others should not be necessary.

src/streaming.c
src/streaming.h
src/tvheadend.h
src/webui/webui.c

index 2c3c0dadd99e34722bd081c46f26cc5944346a49..bf1614f8827485803ec7881222973743bfc9e1a2 100755 (executable)
@@ -54,8 +54,10 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
   pthread_mutex_lock(&sq->sq_mutex);
 
   /* queue size protection */
-  int queue_size = streaming_queue_size(&sq->sq_queue);
-  if (queue_size > 1500000)
+  // TODO: would be better to update size as we go, but this would
+  //       require updates elsewhere to ensure all removals from the queue
+  //       are covered (new function)
+  if (sq->sq_maxsize && streaming_queue_size(&sq->sq_queue) >= sq->sq_maxsize)
     streaming_msg_free(sm);
   else
     TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
@@ -69,13 +71,24 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
  *
  */
 void
-streaming_queue_init(streaming_queue_t *sq, int reject_filter)
+streaming_queue_init2(streaming_queue_t *sq, int reject_filter, size_t maxsize)
 {
   streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter);
 
   pthread_mutex_init(&sq->sq_mutex, NULL);
   pthread_cond_init(&sq->sq_cond, NULL);
   TAILQ_INIT(&sq->sq_queue);
+
+  sq->sq_maxsize = maxsize;
+}
+
+/**
+ *
+ */
+void
+streaming_queue_init(streaming_queue_t *sq, int reject_filter)
+{
+  streaming_queue_init2(sq, reject_filter, 0); // 0 = unlimited
 }
 
 
@@ -341,7 +354,7 @@ streaming_queue_clear(struct streaming_message_queue *q)
 /**
  *
  */
-int streaming_queue_size(struct streaming_message_queue *q)
+size_t streaming_queue_size(struct streaming_message_queue *q)
 {
   streaming_message_t *sm;
   int size = 0;
index cee24cf8bea0e6b12601c3a242369d395d2e2e15..5dcda7db88fe579513d5a6520a6cd05c1e5d4355 100644 (file)
@@ -71,9 +71,12 @@ void streaming_target_init(streaming_target_t *st,
 
 void streaming_queue_init(streaming_queue_t *sq, int reject_filter);
 
+void streaming_queue_init2
+  (streaming_queue_t *sq, int reject_filter, size_t maxsize);
+
 void streaming_queue_clear(struct streaming_message_queue *q);
 
-int streaming_queue_size(struct streaming_message_queue *q);
+size_t streaming_queue_size(struct streaming_message_queue *q);
 
 void streaming_queue_deinit(streaming_queue_t *sq);
 
index f7bb0f7e1434e0cc9ef7f842e332d0e2acc8e384..ae785555acbc8a9a87f037484a08595dcc4118e1 100644 (file)
@@ -332,9 +332,10 @@ typedef struct streaming_queue {
   
   streaming_target_t sq_st;
 
-  pthread_mutex_t sq_mutex;              /* Protects sp_queue */
-  pthread_cond_t  sq_cond;               /* Condvar for signalling new
-                                           packets */
+  pthread_mutex_t sq_mutex;    /* Protects sp_queue */
+  pthread_cond_t  sq_cond;     /* Condvar for signalling new packets */
+
+  size_t          sq_maxsize;  /* Max queue size (bytes) */
   
   struct streaming_message_queue sq_queue;
 
index fb22dbd0f7ff8d6bc8497150525f14cbe52eb5bd..e404d858862fe096dcf7393239ec1b8c612833ec 100644 (file)
@@ -541,6 +541,8 @@ http_stream_service(http_connection_t *hc, service_t *service)
   dvr_config_t *cfg;
   muxer_container_type_t mc;
   int flags;
+  const char *str;
+  size_t qsize ;
 
   mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
   if(mc == MC_UNKNOWN) {
@@ -548,14 +550,19 @@ http_stream_service(http_connection_t *hc, service_t *service)
     mc = cfg->dvr_mc;
   }
 
+  if ((str = http_arg_get(&hc->hc_req_args, "qsize")))
+    qsize = atoll(str);
+  else
+    qsize = 1500000;
+
   if(mc == MC_PASS) {
-    streaming_queue_init(&sq, SMT_PACKET);
+    streaming_queue_init2(&sq, SMT_PACKET, qsize);
     gh = NULL;
     tsfix = NULL;
     st = &sq.sq_st;
     flags = SUBSCRIPTION_RAW_MPEGTS;
   } else {
-    streaming_queue_init(&sq, 0);
+    streaming_queue_init2(&sq, 0, qsize);
     gh = globalheaders_create(&sq.sq_st);
     tsfix = tsfix_create(gh);
     st = tsfix;
@@ -600,6 +607,8 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
   int priority = 100;
   int flags;
   muxer_container_type_t mc;
+  char *str;
+  size_t qsize;
 
   mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
   if(mc == MC_UNKNOWN) {
@@ -607,14 +616,19 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
     mc = cfg->dvr_mc;
   }
 
+  if ((str = http_arg_get(&hc->hc_req_args, "qsize")))
+    qsize = atoll(str);
+  else
+    qsize = 1500000;
+
   if(mc == MC_PASS) {
-    streaming_queue_init(&sq, SMT_PACKET);
+    streaming_queue_init2(&sq, SMT_PACKET, qsize);
     gh = NULL;
     tsfix = NULL;
     st = &sq.sq_st;
     flags = SUBSCRIPTION_RAW_MPEGTS;
   } else {
-    streaming_queue_init(&sq, 0);
+    streaming_queue_init2(&sq, 0, qsize);
     gh = globalheaders_create(&sq.sq_st);
     tsfix = tsfix_create(gh);
     st = tsfix;