int hmq_strict_prio; /* Serve this queue 'til it's empty */
int hmq_length;
int hmq_payload; /* Bytes of streaming payload that's enqueued */
+ int hmq_dead;
} htsp_msg_q_t;
/**
htsp_msg_q_t htsp_hmq_qstatus;
struct htsp_subscription_list htsp_subscriptions;
+ struct htsp_subscription_list htsp_dead_subscriptions;
struct htsp_file_list htsp_files;
int htsp_file_id;
*
*/
static void
-htsp_flush_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq)
+htsp_flush_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq, int dead)
{
htsp_msg_t *hm;
// reset
hmq->hmq_length = 0;
hmq->hmq_payload = 0;
+ hmq->hmq_dead = dead;
pthread_mutex_unlock(&htsp->htsp_out_mutex);
}
htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs)
{
LIST_REMOVE(hs, hs_link);
+ LIST_INSERT_HEAD(&htsp->htsp_dead_subscriptions, hs, hs_link);
+
subscription_unsubscribe(hs->hs_s);
if(hs->hs_tsfix != NULL)
transcoder_destroy(hs->hs_transcoder);
#endif
- htsp_flush_queue(htsp, &hs->hs_q);
+ htsp_flush_queue(htsp, &hs->hs_q, 1);
#if ENABLE_TIMESHIFT
if(hs->hs_tshift)
timeshift_destroy(hs->hs_tshift);
#endif
+}
-
+/**
+ *
+ */
+static void
+htsp_subscription_free(htsp_connection_t *htsp, htsp_subscription_t *hs)
+{
+ LIST_REMOVE(hs, hs_link);
+ htsp_flush_queue(htsp, &hs->hs_q, 1);
free(hs);
}
pthread_mutex_lock(&htsp->htsp_out_mutex);
+ assert(!hmq->hmq_dead);
+
TAILQ_INSERT_TAIL(&hmq->hmq_q, hm, hm_link);
if(hmq->hmq_length == 0) {
/* Beware! Closing subscriptions will invoke a lot of callbacks
down in the streaming code. So we do this as early as possible
to avoid any weird lockups */
- while((s = LIST_FIRST(&htsp.htsp_subscriptions)) != NULL) {
+ while((s = LIST_FIRST(&htsp.htsp_subscriptions)) != NULL)
htsp_subscription_destroy(&htsp, s);
- }
pthread_mutex_unlock(&global_lock);
pthread_join(htsp.htsp_writer_thread, NULL);
+ while((s = LIST_FIRST(&htsp.htsp_dead_subscriptions)) != NULL)
+ htsp_subscription_free(&htsp, s);
+
htsp_msg_q_t *hmq;
TAILQ_FOREACH(hmq, &htsp.htsp_active_output_queues, hmq_link) {
/* Flush pkt buffers */
if (skip->type != SMT_SKIP_ERROR)
- htsp_flush_queue(hs->hs_htsp, &hs->hs_q);
+ htsp_flush_queue(hs->hs_htsp, &hs->hs_q, 0);
if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_ABS_SIZE)
htsmsg_add_u32(m, "absolute", 1);