mq_wakeup_cb(void* arg)
{
struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
- /* even if the dtio is already active, because perhaps much
- * traffic suddenly, we leave the timer running to save on
- * managing it, the once a second timer is less work then
- * starting and stopping the timer frequently */
+
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
mq->dtio->wakeup_timer_enabled = 0;
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
if(mq->dtio->wakeup_timer_enabled) {
if(wakeupnow) {
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
comm_timer_set(mq->wakeup_timer, &tv);
}
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
if(!wakeupnow) {
tv.tv_sec = 1;
tv.tv_usec = 0;
+ /* If it is already set, keep it running. */
+ if(!comm_timer_is_set(mq->wakeup_timer))
+ comm_timer_set(mq->wakeup_timer, &tv);
+ } else {
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ comm_timer_set(mq->wakeup_timer, &tv);
}
- comm_timer_set(mq->wakeup_timer, &tv);
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
}
/* acquire lock */
lock_basic_lock(&mq->lock);
- /* if list was empty, start timer for (eventual) wakeup */
- if(mq->first == NULL)
+ /* if list was empty, start timer for (eventual) wakeup,
+ * or if dtio is not writing now an eventual wakeup is needed. */
+ if(mq->first == NULL || !mq->dtio->event_added_is_write)
wakeupstarttimer = 1;
/* if list contains more than wakeupnum elements, wakeup now,
* or if list is (going to be) almost full */
/* unregister the event polling for write, because there is
* nothing to be written */
(void)dtio_add_output_event_read(dtio);
+
+ /* Set wakeuptimer enabled off; so that the next worker thread that
+ * wants to log starts a timer if needed, since the writer thread
+ * has gone to sleep. */
+ lock_basic_lock(&dtio->wakeup_timer_lock);
+ dtio->wakeup_timer_enabled = 0;
+ lock_basic_unlock(&dtio->wakeup_timer_lock);
}
#ifdef HAVE_SSL
/* no messages on the first iteration,
* the queues are all empty */
dtio_sleep(dtio);
+ /* After putting to sleep, see if
+ * a message is in a message queue,
+ * if so, resume service. Stops a
+ * race condition where a thread could
+ * have one message but the dtio
+ * also just went to sleep. With the
+ * message queued between the
+ * dtio_find_msg and dtio_sleep
+ * calls. */
+ if(dtio_find_msg(dtio)) {
+ if(!dtio_add_output_event_write(dtio))
+ return;
+ }
}
- return; /* nothing to do */
+ if(!dtio->cur_msg)
+ return; /* nothing to do */
}
}