The queue size limits are now configurable in the queue init function.
For now only HTTP queues are bounded, the others should not be necessary.
pthread_mutex_lock(&sq->sq_mutex);
/* queue size protection */
- int queue_size = streaming_queue_size(&sq->sq_queue);
- if (queue_size > 1500000)
+ // TODO: would be better to update size as we go, but this would
+ // require updates elsewhere to ensure all removals from the queue
+ // are covered (new function)
+ if (sq->sq_maxsize && streaming_queue_size(&sq->sq_queue) >= sq->sq_maxsize)
streaming_msg_free(sm);
else
TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
*
*/
void
-streaming_queue_init(streaming_queue_t *sq, int reject_filter)
+streaming_queue_init2(streaming_queue_t *sq, int reject_filter, size_t maxsize)
{
streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter);
pthread_mutex_init(&sq->sq_mutex, NULL);
pthread_cond_init(&sq->sq_cond, NULL);
TAILQ_INIT(&sq->sq_queue);
+
+ sq->sq_maxsize = maxsize;
+}
+
+/**
+ *
+ */
+void
+streaming_queue_init(streaming_queue_t *sq, int reject_filter)
+{
+ streaming_queue_init2(sq, reject_filter, 0); // 0 = unlimited
}
/**
*
*/
-int streaming_queue_size(struct streaming_message_queue *q)
+size_t streaming_queue_size(struct streaming_message_queue *q)
{
streaming_message_t *sm;
int size = 0;
void streaming_queue_init(streaming_queue_t *sq, int reject_filter);
+void streaming_queue_init2
+ (streaming_queue_t *sq, int reject_filter, size_t maxsize);
+
void streaming_queue_clear(struct streaming_message_queue *q);
-int streaming_queue_size(struct streaming_message_queue *q);
+size_t streaming_queue_size(struct streaming_message_queue *q);
void streaming_queue_deinit(streaming_queue_t *sq);
streaming_target_t sq_st;
- pthread_mutex_t sq_mutex; /* Protects sp_queue */
- pthread_cond_t sq_cond; /* Condvar for signalling new
- packets */
+ pthread_mutex_t sq_mutex; /* Protects sp_queue */
+ pthread_cond_t sq_cond; /* Condvar for signalling new packets */
+
+ size_t sq_maxsize; /* Max queue size (bytes) */
struct streaming_message_queue sq_queue;
dvr_config_t *cfg;
muxer_container_type_t mc;
int flags;
+ const char *str;
+ size_t qsize ;
mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
if(mc == MC_UNKNOWN) {
mc = cfg->dvr_mc;
}
+ if ((str = http_arg_get(&hc->hc_req_args, "qsize")))
+ qsize = atoll(str);
+ else
+ qsize = 1500000;
+
if(mc == MC_PASS) {
- streaming_queue_init(&sq, SMT_PACKET);
+ streaming_queue_init2(&sq, SMT_PACKET, qsize);
gh = NULL;
tsfix = NULL;
st = &sq.sq_st;
flags = SUBSCRIPTION_RAW_MPEGTS;
} else {
- streaming_queue_init(&sq, 0);
+ streaming_queue_init2(&sq, 0, qsize);
gh = globalheaders_create(&sq.sq_st);
tsfix = tsfix_create(gh);
st = tsfix;
int priority = 100;
int flags;
muxer_container_type_t mc;
+ char *str;
+ size_t qsize;
mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
if(mc == MC_UNKNOWN) {
mc = cfg->dvr_mc;
}
+ if ((str = http_arg_get(&hc->hc_req_args, "qsize")))
+ qsize = atoll(str);
+ else
+ qsize = 1500000;
+
if(mc == MC_PASS) {
- streaming_queue_init(&sq, SMT_PACKET);
+ streaming_queue_init2(&sq, SMT_PACKET, qsize);
gh = NULL;
tsfix = NULL;
st = &sq.sq_st;
flags = SUBSCRIPTION_RAW_MPEGTS;
} else {
- streaming_queue_init(&sq, 0);
+ streaming_queue_init2(&sq, 0, qsize);
gh = globalheaders_create(&sq.sq_st);
tsfix = tsfix_create(gh);
st = tsfix;