]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
dnstap io, sleeps thread when there is no traffic.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 23 Jan 2020 12:27:21 +0000 (13:27 +0100)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 23 Jan 2020 12:27:21 +0000 (13:27 +0100)
dnstap/dtstream.c
dnstap/dtstream.h

index c0fbb3999243c96f8a2c7cb204e2f66689d4ca6f..a81a7abd581e333784fbff77c801a7541dfc4161 100644 (file)
 
 /** number of messages to process in one output callback */
 #define DTIO_MESSAGES_PER_CALLBACK 100
+/** DTIO command channel commands */
+enum {
+       /** DTIO command channel stop */
+       DTIO_COMMAND_STOP = 0,
+       /** DTIO command channel wakeup */
+       DTIO_COMMAND_WAKEUP = 1
+} dtio_channel_command;
 
 void* fstrm_create_control_frame_start(char* contenttype, size_t* len)
 {
@@ -137,9 +144,38 @@ dt_msg_queue_delete(struct dt_msg_queue* mq)
        free(mq);
 }
 
+/** make the dtio wake up by sending a wakeup command */
+static void dtio_wakeup(struct dt_io_thread* dtio)
+{
+       uint8_t cmd = DTIO_COMMAND_WAKEUP;
+       if(!dtio) return;
+       if(!dtio->event_base) return; /* not started */
+
+       while(1) {
+               ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
+               if(r == -1) {
+#ifndef USE_WINSOCK
+                       if(errno == EINTR || errno == EAGAIN)
+                               continue;
+                       log_err("dnstap io wakeup: write: %s", strerror(errno));
+#else
+                       if(WSAGetLastError() == WSAEINPROGRESS)
+                               continue;
+                       if(WSAGetLastError() == WSAEWOULDBLOCK)
+                               continue;
+                       log_err("dnstap io stop: write: %s",
+                               wsa_strerror(WSAGetLastError()));
+#endif
+                       break;
+               }
+               break;
+       }
+}
+
 void
 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
 {
+       int wakeup = 0;
        struct dt_msg_entry* entry;
 
        /* check conditions */
@@ -170,6 +206,9 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
 
        /* aqcuire lock */
        lock_basic_lock(&mq->lock);
+       /* list was empty, wakeup dtio */
+       if(mq->first == NULL)
+               wakeup = 1;
        /* see if it is going to fit */
        if(mq->cursize + len > mq->maxsize) {
                /* buffer full, or congested. */
@@ -188,6 +227,9 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
        mq->last = entry;
        /* release lock */
        lock_basic_unlock(&mq->lock);
+
+       if(wakeup)
+               dtio_wakeup(mq->dtio);
 }
 
 struct dt_io_thread* dt_io_thread_create(void)
@@ -221,6 +263,7 @@ int dt_io_thread_register_queue(struct dt_io_thread* dtio,
 {
        struct dt_io_list_item* item = malloc(sizeof(*item));
        if(!item) return 0;
+       mq->dtio = dtio;
        item->queue = mq;
        item->next = dtio->io_list;
        dtio->io_list = item;
@@ -238,6 +281,7 @@ void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
                        if(prev) prev->next = item->next;
                        else dtio->io_list = item->next;
                        /* the queue itself only registered, not deleted */
+                       item->queue->dtio = NULL;
                        free(item);
                        dtio->io_list_iter = NULL;
                        return;
@@ -288,16 +332,25 @@ static int dtio_find_in_queue(struct dt_io_thread* dtio,
 /** find a new message to write, search message queues, false if none */
 static int dtio_find_msg(struct dt_io_thread* dtio)
 {
-       struct dt_io_list_itemitem;
+       struct dt_io_list_item *spot, *item;
 
-       if(dtio->io_list_iter)
-               item = dtio->io_list_iter;
-       else    item = dtio->io_list;
+       spot = dtio->io_list_iter;
        /* use the next queue for the next message lookup,
         * if we hit the end(NULL) the NULL restarts the iter at start. */
-       if(item)
-               dtio->io_list_iter = item->next;
+       if(spot)
+               dtio->io_list_iter = spot->next;
+       else if(dtio->io_list)
+               dtio->io_list_iter = dtio->io_list->next;
 
+       /* scan from spot to end-of-io_list */
+       item = spot;
+       while(item) {
+               if(dtio_find_in_queue(dtio, item->queue))
+                       return 1;
+               item = item->next;
+       }
+       /* scan starting at the start-of-list (to wrap around the end) */
+       item = dtio->io_list;
        while(item) {
                if(dtio_find_in_queue(dtio, item->queue))
                        return 1;
@@ -323,6 +376,7 @@ static void dtio_del_output_event(struct dt_io_thread* dtio)
                return;
        ub_event_del(dtio->event);
        dtio->event_added = 0;
+       dtio->event_added_is_write = 0;
 }
 
 /** close and stop the output file descriptor event */
@@ -607,6 +661,52 @@ static int dtio_check_close(struct dt_io_thread* dtio)
        return 0;
 }
 
+/** add the output file descriptor event for listening, read only */
+static void dtio_add_output_event_read(struct dt_io_thread* dtio)
+{
+       if(!dtio->event)
+               return;
+       if(dtio->event_added && !dtio->event_added_is_write)
+               return;
+       /* we have to (re-)register the event */
+       if(dtio->event_added)
+               ub_event_del(dtio->event);
+       ub_event_del_bits(dtio->event, UB_EV_WRITE);
+       if(ub_event_add(dtio->event, NULL) != 0) {
+               log_err("dnstap io: out of memory (adding event)");
+               return;
+       }
+       dtio->event_added = 1;
+       dtio->event_added_is_write = 0;
+}
+
+/** add the output file descriptor event for listening, read and write */
+static void dtio_add_output_event_write(struct dt_io_thread* dtio)
+{
+       if(!dtio->event)
+               return;
+       if(dtio->event_added && dtio->event_added_is_write)
+               return;
+       /* we have to (re-)register the event */
+       if(dtio->event_added)
+               ub_event_del(dtio->event);
+       ub_event_add_bits(dtio->event, UB_EV_WRITE);
+       if(ub_event_add(dtio->event, NULL) != 0) {
+               log_err("dnstap io: out of memory (adding event)");
+               return;
+       }
+       dtio->event_added = 1;
+       dtio->event_added_is_write = 1;
+}
+
+/** put the dtio thread to sleep */
+static void dtio_sleep(struct dt_io_thread* dtio)
+{
+       /* unregister the event polling for write, because there is
+        * nothing to be written */
+       dtio_add_output_event_read(dtio);
+}
+
 /** callback for the dnstap events, to write to the output */
 static void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
 {
@@ -628,8 +728,14 @@ static void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
        for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
                /* see if there are messages that need writing */
                if(!dtio->cur_msg) {
-                       if(!dtio_find_msg(dtio))
+                       if(!dtio_find_msg(dtio)) {
+                               if(i == 0) {
+                                       /* no messages on the first iteration,
+                                        * the queues are all empty */
+                                       dtio_sleep(dtio);
+                               }
                                return; /* nothing to do */
+                       }
                }
 
                /* write it */
@@ -668,8 +774,13 @@ static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
                /* and then fall through to quit the thread */
        } else if(r == 0) {
                verbose(VERB_ALGO, "dnstap io: cmd channel closed");
-       } else if(r == 1 && cmd == 0) {
+       } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
                verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
+       } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
+               verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
+               /* reregister event */
+               dtio_add_output_event_write(dtio);
+               return;
        } else if(r == 1) {
                verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
        }
@@ -1016,18 +1127,6 @@ static void dtio_open_output(struct dt_io_thread* dtio)
        }
 }
 
-/** add the output file descriptor event for listening */
-static void dtio_add_output_event(struct dt_io_thread* dtio)
-{
-       if(!dtio->event)
-               return;
-       if(ub_event_add(dtio->event, NULL) != 0) {
-               log_err("dnstap io: out of memory (adding event)");
-               return;
-       }
-       dtio->event_added = 1;
-}
-
 /** the IO thread function for the DNSTAP IO */
 static void* dnstap_io(void* arg)
 {
@@ -1039,7 +1138,7 @@ static void* dnstap_io(void* arg)
        dtio_setup_base(dtio, &secs, &now);
        dtio_setup_cmd(dtio);
        dtio_open_output(dtio);
-       dtio_add_output_event(dtio);
+       dtio_add_output_event_write(dtio);
        verbose(VERB_ALGO, "start dnstap io thread");
 
        /* run */
@@ -1077,7 +1176,7 @@ int dt_io_thread_start(struct dt_io_thread* dtio)
 
 void dt_io_thread_stop(struct dt_io_thread* dtio)
 {
-       uint8_t cmd = 0;
+       uint8_t cmd = DTIO_COMMAND_STOP;
        if(!dtio) return;
        if(!dtio->event_base) return; /* not started */
 
index 3c5bd9a4e8abc4e5b18368485a4d7b70ce82a003..77bda6ae0bf3cb14cf761d93870998a48255b691 100644 (file)
@@ -47,6 +47,7 @@
 #include "util/locks.h"
 struct dt_msg_entry;
 struct dt_io_list_item;
+struct dt_io_thread;
 struct config_file;
 
 /**
@@ -70,6 +71,8 @@ struct dt_msg_queue {
        /** list of messages.  The messages are added to the back and taken
         * out from the front. */
        struct dt_msg_entry* first, *last;
+       /** reference to the io thread to wakeup */
+       struct dt_io_thread* dtio;
 };
 
 /**
@@ -105,6 +108,8 @@ struct dt_io_thread {
        void* event;
        /** the event is added */
        int event_added;
+       /** event added is a write event */
+       int event_added_is_write;
        /** check for nonblocking connect errors on fd */
        int check_nb_connect;
        /** the buffer that currently getting written, or NULL if no