#define DTIO_RECONNECT_TIMEOUT_MAX 1000
/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
+/** number of messages before wakeup of thread */
+#define DTIO_MSG_FOR_WAKEUP 32
/** maximum length of received frame */
#define DTIO_RECV_FRAME_MAX_LEN 1000
#endif
struct dt_msg_queue*
-dt_msg_queue_create(void)
+dt_msg_queue_create(struct comm_base* base)
{
struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
if(!mq) return NULL;
mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
about 1 M should contain 64K messages with some overhead,
or a whole bunch smaller ones */
+ mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
+ if(!mq->wakeup_timer) {
+ free(mq);
+ return NULL;
+ }
lock_basic_init(&mq->lock);
lock_protect(&mq->lock, mq, sizeof(*mq));
return mq;
mq->first = NULL;
mq->last = NULL;
mq->cursize = 0;
+ mq->msgcount = 0;
}
void
if(!mq) return;
lock_basic_destroy(&mq->lock);
dt_msg_queue_clear(mq);
+ comm_timer_delete(mq->wakeup_timer);
free(mq);
}
}
}
+void
+mq_wakeup_cb(void* arg)
+{
+ struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
+ /* even if the dtio is already active, because perhaps much
+ * traffic suddenly, we leave the timer running to save on
+ * managing it, the once a second timer is less work then
+ * starting and stopping the timer frequently */
+ lock_basic_lock(&mq->dtio->wakeup_timer_lock);
+ mq->dtio->wakeup_timer_enabled = 0;
+ lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
+ dtio_wakeup(mq->dtio);
+}
+
+/** start timer to wakeup dtio because there is content in the queue */
+static void
+dt_msg_queue_start_timer(struct dt_msg_queue* mq)
+{
+ struct timeval tv;
+ /* Start a timer to process messages to be logged.
+ * If we woke up the dtio thread for every message, the wakeup
+ * messages take up too much processing power. If the queue
+ * fills up the wakeup happens immediately. The timer wakes it up
+ * if there are infrequent messages to log. */
+
+ /* we cannot start a timer in dtio thread, because it is a different
+ * thread and its event base is in use by the other thread, it would
+ * give race conditions if we tried to modify its event base,
+ * and locks would wait until it woke up, and this is what we do. */
+
+ /* do not start the timer if a timer already exists, perhaps
+ * in another worker. So this variable is protected by a lock in
+ * dtio */
+ lock_basic_lock(&mq->dtio->wakeup_timer_lock);
+ if(mq->dtio->wakeup_timer_enabled) {
+ lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
+ return;
+ }
+ mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
+ lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
+
+ /* start the timer, in mq, in the event base of our worker */
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ comm_timer_set(mq->wakeup_timer, &tv);
+}
+
void
dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
{
- int wakeup = 0;
+ int wakeupnow = 0, wakeupstarttimer = 0;
struct dt_msg_entry* entry;
/* check conditions */
/* aqcuire lock */
lock_basic_lock(&mq->lock);
- /* list was empty, wakeup dtio */
+ /* if list was empty, start timer for (eventual) wakeup */
if(mq->first == NULL)
- wakeup = 1;
+ wakeupstarttimer = 1;
+ /* if list contains more than wakeupnum elements, wakeup now,
+ * or if list is (going to be) almost full */
+ if(mq->msgcount+1 > DTIO_MSG_FOR_WAKEUP ||
+ mq->cursize+len >= mq->maxsize * 9 / 10)
+ wakeupnow = 1;
/* see if it is going to fit */
if(mq->cursize + len > mq->maxsize) {
/* buffer full, or congested. */
return;
}
mq->cursize += len;
+ mq->msgcount ++;
/* append to list */
if(mq->last) {
mq->last->next = entry;
/* release lock */
lock_basic_unlock(&mq->lock);
- if(wakeup)
+ if(wakeupnow) {
dtio_wakeup(mq->dtio);
+ } else if(wakeupstarttimer) {
+ dt_msg_queue_start_timer(mq);
+ }
}
struct dt_io_thread* dt_io_thread_create(void)
{
struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
+ lock_basic_init(&dtio->wakeup_timer_lock);
+ lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
+ sizeof(dtio->wakeup_timer_enabled));
return dtio;
}
{
struct dt_io_list_item* item, *nextitem;
if(!dtio) return;
+ lock_basic_destroy(&dtio->wakeup_timer_lock);
item=dtio->io_list;
while(item) {
nextitem = item->next;
mq->first = entry->next;
if(!entry->next) mq->last = NULL;
mq->cursize -= entry->len;
+ mq->msgcount --;
lock_basic_unlock(&mq->lock);
*buf = entry->buf;
struct dt_io_list_item;
struct dt_io_thread;
struct config_file;
+struct comm_base;
/**
* A message buffer with dnstap messages queued up. It is per-worker.
/** current size of the buffer, in bytes. data bytes of messages.
* If a new message make it more than maxsize, the buffer is full */
size_t cursize;
+ /** number of messages in the queue */
+ int msgcount;
/** list of messages. The messages are added to the back and taken
* out from the front. */
struct dt_msg_entry* first, *last;
/** reference to the io thread to wakeup */
struct dt_io_thread* dtio;
+ /** the wakeup timer for dtio, on worker event base */
+ struct comm_timer* wakeup_timer;
};
/**
* for the current message length that precedes the frame */
size_t cur_msg_len_done;
+ /** lock on wakeup_timer_enabled */
+ lock_basic_type wakeup_timer_lock;
+ /** if wakeup timer is enabled in some thread */
+ int wakeup_timer_enabled;
/** command pipe that stops the pipe if closed. Used to quit
* the program. [0] is read, [1] is written to. */
int commandpipe[2];
/**
* Create new (empty) worker message queue. Limit set to default on max.
+ * @param base: event base for wakeup timer.
* @return NULL on malloc failure or a new queue (not locked).
*/
-struct dt_msg_queue* dt_msg_queue_create(void);
+struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base);
/**
* Delete a worker message queue. It has to be unlinked from access,
*/
void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
+/** timer callback to wakeup dtio thread to process messages */
+void mq_wakeup_cb(void* arg);
+
/**
* Create IO thread.
* @return new io thread object. not yet started. or NULL malloc failure.