/** start timer to wakeup dtio because there is content in the queue */
static void
-dt_msg_queue_start_timer(struct dt_msg_queue* mq)
+dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
{
- struct timeval tv;
+ struct timeval tv = {0};
/* Start a timer to process messages to be logged.
* If we woke up the dtio thread for every message, the wakeup
* messages take up too much processing power. If the queue
/* do not start the timer if a timer already exists, perhaps
* in another worker. So this variable is protected by a lock in
- * dtio */
+ * dtio. */
+
+ /* If we need to wakeupnow, 0 the timer to force the callback. */
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
if(mq->dtio->wakeup_timer_enabled) {
+ if(wakeupnow) {
+ comm_timer_set(mq->wakeup_timer, &tv);
+ }
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
return;
}
mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
- lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
/* start the timer, in mq, in the event base of our worker */
- tv.tv_sec = 1;
- tv.tv_usec = 0;
+ if(!wakeupnow) {
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ }
comm_timer_set(mq->wakeup_timer, &tv);
+ lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
}
void
/* release lock */
lock_basic_unlock(&mq->lock);
- if(wakeupnow) {
- dtio_wakeup(mq->dtio);
- } else if(wakeupstarttimer) {
- dt_msg_queue_start_timer(mq);
+ if(wakeupnow || wakeupstarttimer) {
+ dt_msg_queue_start_timer(mq, wakeupnow);
}
}