From 8c47d16e292177688cd92e294f233493f482b53a Mon Sep 17 00:00:00 2001 From: "W.C.A. Wijngaards" Date: Thu, 23 Jan 2020 13:27:21 +0100 Subject: [PATCH] dnstap io, sleeps thread when there is no traffic. --- dnstap/dtstream.c | 143 +++++++++++++++++++++++++++++++++++++++------- dnstap/dtstream.h | 5 ++ 2 files changed, 126 insertions(+), 22 deletions(-) diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index c0fbb3999..a81a7abd5 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -52,6 +52,13 @@ /** number of messages to process in one output callback */ #define DTIO_MESSAGES_PER_CALLBACK 100 +/** DTIO command channel commands */ +enum { + /** DTIO command channel stop */ + DTIO_COMMAND_STOP = 0, + /** DTIO command channel wakeup */ + DTIO_COMMAND_WAKEUP = 1 +} dtio_channel_command; void* fstrm_create_control_frame_start(char* contenttype, size_t* len) { @@ -137,9 +144,38 @@ dt_msg_queue_delete(struct dt_msg_queue* mq) free(mq); } +/** make the dtio wake up by sending a wakeup command */ +static void dtio_wakeup(struct dt_io_thread* dtio) +{ + uint8_t cmd = DTIO_COMMAND_WAKEUP; + if(!dtio) return; + if(!dtio->event_base) return; /* not started */ + + while(1) { + ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); + if(r == -1) { +#ifndef USE_WINSOCK + if(errno == EINTR || errno == EAGAIN) + continue; + log_err("dnstap io wakeup: write: %s", strerror(errno)); +#else + if(WSAGetLastError() == WSAEINPROGRESS) + continue; + if(WSAGetLastError() == WSAEWOULDBLOCK) + continue; + log_err("dnstap io stop: write: %s", + wsa_strerror(WSAGetLastError())); +#endif + break; + } + break; + } +} + void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) { + int wakeup = 0; struct dt_msg_entry* entry; /* check conditions */ @@ -170,6 +206,9 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) /* aqcuire lock */ lock_basic_lock(&mq->lock); + /* list was empty, wakeup dtio */ + if(mq->first == NULL) + wakeup = 1; /* see if it is going to fit */ if(mq->cursize + len > mq->maxsize) { /* buffer full, or congested. */ @@ -188,6 +227,9 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) mq->last = entry; /* release lock */ lock_basic_unlock(&mq->lock); + + if(wakeup) + dtio_wakeup(mq->dtio); } struct dt_io_thread* dt_io_thread_create(void) @@ -221,6 +263,7 @@ int dt_io_thread_register_queue(struct dt_io_thread* dtio, { struct dt_io_list_item* item = malloc(sizeof(*item)); if(!item) return 0; + mq->dtio = dtio; item->queue = mq; item->next = dtio->io_list; dtio->io_list = item; @@ -238,6 +281,7 @@ void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, if(prev) prev->next = item->next; else dtio->io_list = item->next; /* the queue itself only registered, not deleted */ + item->queue->dtio = NULL; free(item); dtio->io_list_iter = NULL; return; @@ -288,16 +332,25 @@ static int dtio_find_in_queue(struct dt_io_thread* dtio, /** find a new message to write, search message queues, false if none */ static int dtio_find_msg(struct dt_io_thread* dtio) { - struct dt_io_list_item* item; + struct dt_io_list_item *spot, *item; - if(dtio->io_list_iter) - item = dtio->io_list_iter; - else item = dtio->io_list; + spot = dtio->io_list_iter; /* use the next queue for the next message lookup, * if we hit the end(NULL) the NULL restarts the iter at start. */ - if(item) - dtio->io_list_iter = item->next; + if(spot) + dtio->io_list_iter = spot->next; + else if(dtio->io_list) + dtio->io_list_iter = dtio->io_list->next; + /* scan from spot to end-of-io_list */ + item = spot; + while(item) { + if(dtio_find_in_queue(dtio, item->queue)) + return 1; + item = item->next; + } + /* scan starting at the start-of-list (to wrap around the end) */ + item = dtio->io_list; while(item) { if(dtio_find_in_queue(dtio, item->queue)) return 1; @@ -323,6 +376,7 @@ static void dtio_del_output_event(struct dt_io_thread* dtio) return; ub_event_del(dtio->event); dtio->event_added = 0; + dtio->event_added_is_write = 0; } /** close and stop the output file descriptor event */ @@ -607,6 +661,52 @@ static int dtio_check_close(struct dt_io_thread* dtio) return 0; } +/** add the output file descriptor event for listening, read only */ +static void dtio_add_output_event_read(struct dt_io_thread* dtio) +{ + if(!dtio->event) + return; + if(dtio->event_added && !dtio->event_added_is_write) + return; + /* we have to (re-)register the event */ + if(dtio->event_added) + ub_event_del(dtio->event); + ub_event_del_bits(dtio->event, UB_EV_WRITE); + if(ub_event_add(dtio->event, NULL) != 0) { + log_err("dnstap io: out of memory (adding event)"); + return; + } + dtio->event_added = 1; + dtio->event_added_is_write = 0; +} + +/** add the output file descriptor event for listening, read and write */ +static void dtio_add_output_event_write(struct dt_io_thread* dtio) +{ + if(!dtio->event) + return; + if(dtio->event_added && dtio->event_added_is_write) + return; + /* we have to (re-)register the event */ + if(dtio->event_added) + ub_event_del(dtio->event); + ub_event_add_bits(dtio->event, UB_EV_WRITE); + if(ub_event_add(dtio->event, NULL) != 0) { + log_err("dnstap io: out of memory (adding event)"); + return; + } + dtio->event_added = 1; + dtio->event_added_is_write = 1; +} + +/** put the dtio thread to sleep */ +static void dtio_sleep(struct dt_io_thread* dtio) +{ + /* unregister the event polling for write, because there is + * nothing to be written */ + dtio_add_output_event_read(dtio); +} + /** callback for the dnstap events, to write to the output */ static void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) { @@ -628,8 +728,14 @@ static void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) for(i=0; icur_msg) { - if(!dtio_find_msg(dtio)) + if(!dtio_find_msg(dtio)) { + if(i == 0) { + /* no messages on the first iteration, + * the queues are all empty */ + dtio_sleep(dtio); + } return; /* nothing to do */ + } } /* write it */ @@ -668,8 +774,13 @@ static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) /* and then fall through to quit the thread */ } else if(r == 0) { verbose(VERB_ALGO, "dnstap io: cmd channel closed"); - } else if(r == 1 && cmd == 0) { + } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); + } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { + verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); + /* reregister event */ + dtio_add_output_event_write(dtio); + return; } else if(r == 1) { verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); } @@ -1016,18 +1127,6 @@ static void dtio_open_output(struct dt_io_thread* dtio) } } -/** add the output file descriptor event for listening */ -static void dtio_add_output_event(struct dt_io_thread* dtio) -{ - if(!dtio->event) - return; - if(ub_event_add(dtio->event, NULL) != 0) { - log_err("dnstap io: out of memory (adding event)"); - return; - } - dtio->event_added = 1; -} - /** the IO thread function for the DNSTAP IO */ static void* dnstap_io(void* arg) { @@ -1039,7 +1138,7 @@ static void* dnstap_io(void* arg) dtio_setup_base(dtio, &secs, &now); dtio_setup_cmd(dtio); dtio_open_output(dtio); - dtio_add_output_event(dtio); + dtio_add_output_event_write(dtio); verbose(VERB_ALGO, "start dnstap io thread"); /* run */ @@ -1077,7 +1176,7 @@ int dt_io_thread_start(struct dt_io_thread* dtio) void dt_io_thread_stop(struct dt_io_thread* dtio) { - uint8_t cmd = 0; + uint8_t cmd = DTIO_COMMAND_STOP; if(!dtio) return; if(!dtio->event_base) return; /* not started */ diff --git a/dnstap/dtstream.h b/dnstap/dtstream.h index 3c5bd9a4e..77bda6ae0 100644 --- a/dnstap/dtstream.h +++ b/dnstap/dtstream.h @@ -47,6 +47,7 @@ #include "util/locks.h" struct dt_msg_entry; struct dt_io_list_item; +struct dt_io_thread; struct config_file; /** @@ -70,6 +71,8 @@ struct dt_msg_queue { /** list of messages. The messages are added to the back and taken * out from the front. */ struct dt_msg_entry* first, *last; + /** reference to the io thread to wakeup */ + struct dt_io_thread* dtio; }; /** @@ -105,6 +108,8 @@ struct dt_io_thread { void* event; /** the event is added */ int event_added; + /** event added is a write event */ + int event_added_is_write; /** check for nonblocking connect errors on fd */ int check_nb_connect; /** the buffer that currently getting written, or NULL if no -- 2.47.3