]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
- Fix #305: dnstap logging significantly affects unbound performance
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Wed, 23 Sep 2020 09:13:52 +0000 (11:13 +0200)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Wed, 23 Sep 2020 09:13:52 +0000 (11:13 +0200)
  (regression in 1.11).

daemon/worker.c
dnstap/dnstap.c
dnstap/dnstap.h
dnstap/dtstream.c
dnstap/dtstream.h
doc/Changelog
util/fptr_wlist.c

index 695a4cfe564489cf05a4b10a38159481a3dc1f36..5ad8ce4e4094a0fb940b4bc2b021984f292f2a17 100644 (file)
@@ -1717,14 +1717,6 @@ worker_create(struct daemon* daemon, int id, int* ports, int n)
                return NULL;
        }
        explicit_bzero(&seed, sizeof(seed));
-#ifdef USE_DNSTAP
-       if(daemon->cfg->dnstap) {
-               log_assert(daemon->dtenv != NULL);
-               memcpy(&worker->dtenv, daemon->dtenv, sizeof(struct dt_env));
-               if(!dt_init(&worker->dtenv))
-                       fatal_exit("dt_init failed");
-       }
-#endif
        return worker;
 }
 
@@ -1783,6 +1775,14 @@ worker_init(struct worker* worker, struct config_file *cfg,
        } else { /* !do_sigs */
                worker->comsig = NULL;
        }
+#ifdef USE_DNSTAP
+       if(cfg->dnstap) {
+               log_assert(worker->daemon->dtenv != NULL);
+               memcpy(&worker->dtenv, worker->daemon->dtenv, sizeof(struct dt_env));
+               if(!dt_init(&worker->dtenv, worker->base))
+                       fatal_exit("dt_init failed");
+       }
+#endif
        worker->front = listen_create(worker->base, ports,
                cfg->msg_buffer_size, (int)cfg->incoming_num_tcp,
                cfg->do_tcp_keepalive
index bda837876dd360864a3c12d077a567e7d0203ac1..0c8c6c4d462acdecb3e83583d733c49e01bef755 100644 (file)
@@ -246,9 +246,9 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg)
 }
 
 int
-dt_init(struct dt_env *env)
+dt_init(struct dt_env *env, struct comm_base* base)
 {
-       env->msgqueue = dt_msg_queue_create();
+       env->msgqueue = dt_msg_queue_create(base);
        if(!env->msgqueue) {
                log_err("malloc failure");
                return 0;
index cfef6fc420b99e76f1c2f8f2ee2a6000cbbf62b2..783b8c51430a59bf12b34bac90754e4c9d5e0f33 100644 (file)
@@ -101,10 +101,11 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg);
 /**
  * Initialize per-worker state in dnstap environment object.
  * @param env: dnstap environment object to initialize, created with dt_create().
+ * @param base: event base for wakeup timer.
  * @return: true on success, false on failure.
  */
 int
-dt_init(struct dt_env *env);
+dt_init(struct dt_env *env, struct comm_base* base);
 
 /**
  * Deletes the per-worker state created by dt_init
index 6a9e9b8903b5848e52fd017f8d6205f8f2806cff..b0586d1ba8dc223e2a927aff915fcefb8fe74ab6 100644 (file)
@@ -68,6 +68,8 @@
 #define DTIO_RECONNECT_TIMEOUT_MAX 1000
 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
 #define DTIO_RECONNECT_TIMEOUT_SLOW 1000
+/** number of messages before wakeup of thread */
+#define DTIO_MSG_FOR_WAKEUP 32
 
 /** maximum length of received frame */
 #define DTIO_RECV_FRAME_MAX_LEN 1000
@@ -99,13 +101,18 @@ static int dtio_enable_brief_write(struct dt_io_thread* dtio);
 #endif
 
 struct dt_msg_queue*
-dt_msg_queue_create(void)
+dt_msg_queue_create(struct comm_base* base)
 {
        struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
        if(!mq) return NULL;
        mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
                about 1 M should contain 64K messages with some overhead,
                or a whole bunch smaller ones */
+       mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
+       if(!mq->wakeup_timer) {
+               free(mq);
+               return NULL;
+       }
        lock_basic_init(&mq->lock);
        lock_protect(&mq->lock, mq, sizeof(*mq));
        return mq;
@@ -125,6 +132,7 @@ dt_msg_queue_clear(struct dt_msg_queue* mq)
        mq->first = NULL;
        mq->last = NULL;
        mq->cursize = 0;
+       mq->msgcount = 0;
 }
 
 void
@@ -133,6 +141,7 @@ dt_msg_queue_delete(struct dt_msg_queue* mq)
        if(!mq) return;
        lock_basic_destroy(&mq->lock);
        dt_msg_queue_clear(mq);
+       comm_timer_delete(mq->wakeup_timer);
        free(mq);
 }
 
@@ -163,10 +172,57 @@ static void dtio_wakeup(struct dt_io_thread* dtio)
        }
 }
 
+void
+mq_wakeup_cb(void* arg)
+{
+       struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
+       /* even if the dtio is already active, because perhaps much
+        * traffic suddenly, we leave the timer running to save on
+        * managing it, the once a second timer is less work then
+        * starting and stopping the timer frequently */
+       lock_basic_lock(&mq->dtio->wakeup_timer_lock);
+       mq->dtio->wakeup_timer_enabled = 0;
+       lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
+       dtio_wakeup(mq->dtio);
+}
+
+/** start timer to wakeup dtio because there is content in the queue */
+static void
+dt_msg_queue_start_timer(struct dt_msg_queue* mq)
+{
+       struct timeval tv;
+       /* Start a timer to process messages to be logged.
+        * If we woke up the dtio thread for every message, the wakeup
+        * messages take up too much processing power.  If the queue
+        * fills up the wakeup happens immediately.  The timer wakes it up
+        * if there are infrequent messages to log. */
+
+       /* we cannot start a timer in dtio thread, because it is a different
+        * thread and its event base is in use by the other thread, it would
+        * give race conditions if we tried to modify its event base,
+        * and locks would wait until it woke up, and this is what we do. */
+
+       /* do not start the timer if a timer already exists, perhaps
+        * in another worker.  So this variable is protected by a lock in
+        * dtio */
+       lock_basic_lock(&mq->dtio->wakeup_timer_lock);
+       if(mq->dtio->wakeup_timer_enabled) {
+               lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
+               return;
+       }
+       mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
+       lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
+
+       /* start the timer, in mq, in the event base of our worker */
+       tv.tv_sec = 1;
+       tv.tv_usec = 0;
+       comm_timer_set(mq->wakeup_timer, &tv);
+}
+
 void
 dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
 {
-       int wakeup = 0;
+       int wakeupnow = 0, wakeupstarttimer = 0;
        struct dt_msg_entry* entry;
 
        /* check conditions */
@@ -197,9 +253,14 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
 
        /* aqcuire lock */
        lock_basic_lock(&mq->lock);
-       /* list was empty, wakeup dtio */
+       /* if list was empty, start timer for (eventual) wakeup */
        if(mq->first == NULL)
-               wakeup = 1;
+               wakeupstarttimer = 1;
+       /* if list contains more than wakeupnum elements, wakeup now,
+        * or if list is (going to be) almost full */
+       if(mq->msgcount+1 > DTIO_MSG_FOR_WAKEUP ||
+               mq->cursize+len >= mq->maxsize * 9 / 10)
+               wakeupnow = 1;
        /* see if it is going to fit */
        if(mq->cursize + len > mq->maxsize) {
                /* buffer full, or congested. */
@@ -210,6 +271,7 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
                return;
        }
        mq->cursize += len;
+       mq->msgcount ++;
        /* append to list */
        if(mq->last) {
                mq->last->next = entry;
@@ -220,13 +282,19 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
        /* release lock */
        lock_basic_unlock(&mq->lock);
 
-       if(wakeup)
+       if(wakeupnow) {
                dtio_wakeup(mq->dtio);
+       } else if(wakeupstarttimer) {
+               dt_msg_queue_start_timer(mq);
+       }
 }
 
 struct dt_io_thread* dt_io_thread_create(void)
 {
        struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
+       lock_basic_init(&dtio->wakeup_timer_lock);
+       lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
+               sizeof(dtio->wakeup_timer_enabled));
        return dtio;
 }
 
@@ -234,6 +302,7 @@ void dt_io_thread_delete(struct dt_io_thread* dtio)
 {
        struct dt_io_list_item* item, *nextitem;
        if(!dtio) return;
+       lock_basic_destroy(&dtio->wakeup_timer_lock);
        item=dtio->io_list;
        while(item) {
                nextitem = item->next;
@@ -416,6 +485,7 @@ static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
                mq->first = entry->next;
                if(!entry->next) mq->last = NULL;
                mq->cursize -= entry->len;
+               mq->msgcount --;
                lock_basic_unlock(&mq->lock);
 
                *buf = entry->buf;
index ede491f30d3ef4f4ff11a4d4db67062f068cde83..f87d6dc8d386c6cbf94789c369795a9f0022d01d 100644 (file)
@@ -49,6 +49,7 @@ struct dt_msg_entry;
 struct dt_io_list_item;
 struct dt_io_thread;
 struct config_file;
+struct comm_base;
 
 /**
  * A message buffer with dnstap messages queued up.  It is per-worker.
@@ -68,11 +69,15 @@ struct dt_msg_queue {
        /** current size of the buffer, in bytes.  data bytes of messages.
         * If a new message make it more than maxsize, the buffer is full */
        size_t cursize;
+       /** number of messages in the queue */
+       int msgcount;
        /** list of messages.  The messages are added to the back and taken
         * out from the front. */
        struct dt_msg_entry* first, *last;
        /** reference to the io thread to wakeup */
        struct dt_io_thread* dtio;
+       /** the wakeup timer for dtio, on worker event base */
+       struct comm_timer* wakeup_timer;
 };
 
 /**
@@ -166,6 +171,10 @@ struct dt_io_thread {
         * for the current message length that precedes the frame */
        size_t cur_msg_len_done;
 
+       /** lock on wakeup_timer_enabled */
+       lock_basic_type wakeup_timer_lock;
+       /** if wakeup timer is enabled in some thread */
+       int wakeup_timer_enabled;
        /** command pipe that stops the pipe if closed.  Used to quit
         * the program. [0] is read, [1] is written to. */
        int commandpipe[2];
@@ -233,9 +242,10 @@ struct dt_io_list_item {
 
 /**
  * Create new (empty) worker message queue. Limit set to default on max.
+ * @param base: event base for wakeup timer.
  * @return NULL on malloc failure or a new queue (not locked).
  */
-struct dt_msg_queue* dt_msg_queue_create(void);
+struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base);
 
 /**
  * Delete a worker message queue.  It has to be unlinked from access,
@@ -258,6 +268,9 @@ void dt_msg_queue_delete(struct dt_msg_queue* mq);
  */
 void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
 
+/** timer callback to wakeup dtio thread to process messages */
+void mq_wakeup_cb(void* arg);
+
 /**
  * Create IO thread.
  * @return new io thread object. not yet started. or NULL malloc failure.
index 7e6455da80154d2ef609e5ad532c297f23ac5569..9188da015a0dfe8399e4afb7e4966334c7fce373 100644 (file)
@@ -1,3 +1,7 @@
+23 September 2020: Wouter
+       - Fix #305: dnstap logging significantly affects unbound performance
+         (regression in 1.11).
+
 21 September 2020: Ralph
        - Fix #304: dnstap logging not recovering after dnstap process restarts
 
index aa275ed534b7b05bd951d846679f8fd3ca66c8ab..241d9e5407b5a3997844a1aadf60d8bb9780b737 100644 (file)
@@ -138,6 +138,7 @@ fptr_whitelist_comm_timer(void (*fptr)(void*))
        else if(fptr == &auth_xfer_probe_timer_callback) return 1;
        else if(fptr == &auth_xfer_transfer_timer_callback) return 1;
        else if(fptr == &mesh_serve_expired_callback) return 1;
+       else if(fptr == &mq_wakeup_cb) return 1;
        return 0;
 }