streaming_queue_t *sq = opauqe;
pthread_mutex_lock(&sq->sq_mutex);
- TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
+
+ /* queue size protection */
+ int queue_size = streaming_queue_size(&sq->sq_queue);
+ if (queue_size > 1500000)
+ streaming_msg_free(sm);
+ else
+ TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
+
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
}
}
+/**
+ *
+ */
+int streaming_queue_size(struct streaming_message_queue *q)
+{
+ streaming_message_t *sm;
+ int size = 0;
+
+ TAILQ_FOREACH(sm, q, sm_link) {
+ if (sm->sm_type == SMT_PACKET)
+ {
+ th_pkt_t *pkt = sm->sm_data;
+ if (pkt && pkt->pkt_payload)
+ {
+ size += pkt->pkt_payload->pb_size;
+ }
+ }
+ else if (sm->sm_type == SMT_MPEGTS)
+ {
+ pktbuf_t *pkt_payload = sm->sm_data;
+ if (pkt_payload)
+ {
+ size += pkt_payload->pb_size;
+ }
+ }
+ }
+ return size;
+}
+
+
/**
*
*/
void streaming_queue_clear(struct streaming_message_queue *q);
+int streaming_queue_size(struct streaming_message_queue *q);
+
void streaming_queue_deinit(streaming_queue_t *sq);
void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);