/** number of messages to process in one output callback */
#define DTIO_MESSAGES_PER_CALLBACK 100
+/** DTIO command channel commands */
+enum {
+ /** DTIO command channel stop */
+ DTIO_COMMAND_STOP = 0,
+ /** DTIO command channel wakeup */
+ DTIO_COMMAND_WAKEUP = 1
+} dtio_channel_command;
void* fstrm_create_control_frame_start(char* contenttype, size_t* len)
{
free(mq);
}
+/** make the dtio wake up by sending a wakeup command */
+static void dtio_wakeup(struct dt_io_thread* dtio)
+{
+ uint8_t cmd = DTIO_COMMAND_WAKEUP;
+ if(!dtio) return;
+ if(!dtio->event_base) return; /* not started */
+
+ while(1) {
+ ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
+ if(r == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ continue;
+ log_err("dnstap io wakeup: write: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ continue;
+ if(WSAGetLastError() == WSAEWOULDBLOCK)
+ continue;
+ log_err("dnstap io stop: write: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ break;
+ }
+ break;
+ }
+}
+
void
dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
{
+ int wakeup = 0;
struct dt_msg_entry* entry;
/* check conditions */
/* aqcuire lock */
lock_basic_lock(&mq->lock);
+ /* list was empty, wakeup dtio */
+ if(mq->first == NULL)
+ wakeup = 1;
/* see if it is going to fit */
if(mq->cursize + len > mq->maxsize) {
/* buffer full, or congested. */
mq->last = entry;
/* release lock */
lock_basic_unlock(&mq->lock);
+
+ if(wakeup)
+ dtio_wakeup(mq->dtio);
}
struct dt_io_thread* dt_io_thread_create(void)
{
struct dt_io_list_item* item = malloc(sizeof(*item));
if(!item) return 0;
+ mq->dtio = dtio;
item->queue = mq;
item->next = dtio->io_list;
dtio->io_list = item;
if(prev) prev->next = item->next;
else dtio->io_list = item->next;
/* the queue itself only registered, not deleted */
+ item->queue->dtio = NULL;
free(item);
dtio->io_list_iter = NULL;
return;
/** find a new message to write, search message queues, false if none */
static int dtio_find_msg(struct dt_io_thread* dtio)
{
- struct dt_io_list_item* item;
+ struct dt_io_list_item *spot, *item;
- if(dtio->io_list_iter)
- item = dtio->io_list_iter;
- else item = dtio->io_list;
+ spot = dtio->io_list_iter;
/* use the next queue for the next message lookup,
* if we hit the end(NULL) the NULL restarts the iter at start. */
- if(item)
- dtio->io_list_iter = item->next;
+ if(spot)
+ dtio->io_list_iter = spot->next;
+ else if(dtio->io_list)
+ dtio->io_list_iter = dtio->io_list->next;
+ /* scan from spot to end-of-io_list */
+ item = spot;
+ while(item) {
+ if(dtio_find_in_queue(dtio, item->queue))
+ return 1;
+ item = item->next;
+ }
+ /* scan starting at the start-of-list (to wrap around the end) */
+ item = dtio->io_list;
while(item) {
if(dtio_find_in_queue(dtio, item->queue))
return 1;
return;
ub_event_del(dtio->event);
dtio->event_added = 0;
+ dtio->event_added_is_write = 0;
}
/** close and stop the output file descriptor event */
return 0;
}
+/** add the output file descriptor event for listening, read only */
+static void dtio_add_output_event_read(struct dt_io_thread* dtio)
+{
+ if(!dtio->event)
+ return;
+ if(dtio->event_added && !dtio->event_added_is_write)
+ return;
+ /* we have to (re-)register the event */
+ if(dtio->event_added)
+ ub_event_del(dtio->event);
+ ub_event_del_bits(dtio->event, UB_EV_WRITE);
+ if(ub_event_add(dtio->event, NULL) != 0) {
+ log_err("dnstap io: out of memory (adding event)");
+ return;
+ }
+ dtio->event_added = 1;
+ dtio->event_added_is_write = 0;
+}
+
+/** add the output file descriptor event for listening, read and write */
+static void dtio_add_output_event_write(struct dt_io_thread* dtio)
+{
+ if(!dtio->event)
+ return;
+ if(dtio->event_added && dtio->event_added_is_write)
+ return;
+ /* we have to (re-)register the event */
+ if(dtio->event_added)
+ ub_event_del(dtio->event);
+ ub_event_add_bits(dtio->event, UB_EV_WRITE);
+ if(ub_event_add(dtio->event, NULL) != 0) {
+ log_err("dnstap io: out of memory (adding event)");
+ return;
+ }
+ dtio->event_added = 1;
+ dtio->event_added_is_write = 1;
+}
+
+/** put the dtio thread to sleep */
+static void dtio_sleep(struct dt_io_thread* dtio)
+{
+ /* unregister the event polling for write, because there is
+ * nothing to be written */
+ dtio_add_output_event_read(dtio);
+}
+
/** callback for the dnstap events, to write to the output */
static void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
{
for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
/* see if there are messages that need writing */
if(!dtio->cur_msg) {
- if(!dtio_find_msg(dtio))
+ if(!dtio_find_msg(dtio)) {
+ if(i == 0) {
+ /* no messages on the first iteration,
+ * the queues are all empty */
+ dtio_sleep(dtio);
+ }
return; /* nothing to do */
+ }
}
/* write it */
/* and then fall through to quit the thread */
} else if(r == 0) {
verbose(VERB_ALGO, "dnstap io: cmd channel closed");
- } else if(r == 1 && cmd == 0) {
+ } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
+ } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
+ verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
+ /* reregister event */
+ dtio_add_output_event_write(dtio);
+ return;
} else if(r == 1) {
verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
}
}
}
-/** add the output file descriptor event for listening */
-static void dtio_add_output_event(struct dt_io_thread* dtio)
-{
- if(!dtio->event)
- return;
- if(ub_event_add(dtio->event, NULL) != 0) {
- log_err("dnstap io: out of memory (adding event)");
- return;
- }
- dtio->event_added = 1;
-}
-
/** the IO thread function for the DNSTAP IO */
static void* dnstap_io(void* arg)
{
dtio_setup_base(dtio, &secs, &now);
dtio_setup_cmd(dtio);
dtio_open_output(dtio);
- dtio_add_output_event(dtio);
+ dtio_add_output_event_write(dtio);
verbose(VERB_ALGO, "start dnstap io thread");
/* run */
void dt_io_thread_stop(struct dt_io_thread* dtio)
{
- uint8_t cmd = 0;
+ uint8_t cmd = DTIO_COMMAND_STOP;
if(!dtio) return;
if(!dtio->event_base) return; /* not started */