From 48a56751e97655843572244fb6bb6ced3dcfe071 Mon Sep 17 00:00:00 2001 From: "W.C.A. Wijngaards" Date: Wed, 23 Sep 2020 11:13:52 +0200 Subject: [PATCH] - Fix #305: dnstap logging significantly affects unbound performance (regression in 1.11). --- daemon/worker.c | 16 +++++----- dnstap/dnstap.c | 4 +-- dnstap/dnstap.h | 3 +- dnstap/dtstream.c | 80 ++++++++++++++++++++++++++++++++++++++++++++--- dnstap/dtstream.h | 15 ++++++++- doc/Changelog | 4 +++ util/fptr_wlist.c | 1 + 7 files changed, 106 insertions(+), 17 deletions(-) diff --git a/daemon/worker.c b/daemon/worker.c index 695a4cfe5..5ad8ce4e4 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -1717,14 +1717,6 @@ worker_create(struct daemon* daemon, int id, int* ports, int n) return NULL; } explicit_bzero(&seed, sizeof(seed)); -#ifdef USE_DNSTAP - if(daemon->cfg->dnstap) { - log_assert(daemon->dtenv != NULL); - memcpy(&worker->dtenv, daemon->dtenv, sizeof(struct dt_env)); - if(!dt_init(&worker->dtenv)) - fatal_exit("dt_init failed"); - } -#endif return worker; } @@ -1783,6 +1775,14 @@ worker_init(struct worker* worker, struct config_file *cfg, } else { /* !do_sigs */ worker->comsig = NULL; } +#ifdef USE_DNSTAP + if(cfg->dnstap) { + log_assert(worker->daemon->dtenv != NULL); + memcpy(&worker->dtenv, worker->daemon->dtenv, sizeof(struct dt_env)); + if(!dt_init(&worker->dtenv, worker->base)) + fatal_exit("dt_init failed"); + } +#endif worker->front = listen_create(worker->base, ports, cfg->msg_buffer_size, (int)cfg->incoming_num_tcp, cfg->do_tcp_keepalive diff --git a/dnstap/dnstap.c b/dnstap/dnstap.c index bda837876..0c8c6c4d4 100644 --- a/dnstap/dnstap.c +++ b/dnstap/dnstap.c @@ -246,9 +246,9 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg) } int -dt_init(struct dt_env *env) +dt_init(struct dt_env *env, struct comm_base* base) { - env->msgqueue = dt_msg_queue_create(); + env->msgqueue = dt_msg_queue_create(base); if(!env->msgqueue) { log_err("malloc failure"); return 0; diff --git a/dnstap/dnstap.h b/dnstap/dnstap.h index cfef6fc42..783b8c514 100644 --- a/dnstap/dnstap.h +++ b/dnstap/dnstap.h @@ -101,10 +101,11 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg); /** * Initialize per-worker state in dnstap environment object. * @param env: dnstap environment object to initialize, created with dt_create(). + * @param base: event base for wakeup timer. * @return: true on success, false on failure. */ int -dt_init(struct dt_env *env); +dt_init(struct dt_env *env, struct comm_base* base); /** * Deletes the per-worker state created by dt_init diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index 6a9e9b890..b0586d1ba 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -68,6 +68,8 @@ #define DTIO_RECONNECT_TIMEOUT_MAX 1000 /** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ #define DTIO_RECONNECT_TIMEOUT_SLOW 1000 +/** number of messages before wakeup of thread */ +#define DTIO_MSG_FOR_WAKEUP 32 /** maximum length of received frame */ #define DTIO_RECV_FRAME_MAX_LEN 1000 @@ -99,13 +101,18 @@ static int dtio_enable_brief_write(struct dt_io_thread* dtio); #endif struct dt_msg_queue* -dt_msg_queue_create(void) +dt_msg_queue_create(struct comm_base* base) { struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); if(!mq) return NULL; mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, about 1 M should contain 64K messages with some overhead, or a whole bunch smaller ones */ + mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); + if(!mq->wakeup_timer) { + free(mq); + return NULL; + } lock_basic_init(&mq->lock); lock_protect(&mq->lock, mq, sizeof(*mq)); return mq; @@ -125,6 +132,7 @@ dt_msg_queue_clear(struct dt_msg_queue* mq) mq->first = NULL; mq->last = NULL; mq->cursize = 0; + mq->msgcount = 0; } void @@ -133,6 +141,7 @@ dt_msg_queue_delete(struct dt_msg_queue* mq) if(!mq) return; lock_basic_destroy(&mq->lock); dt_msg_queue_clear(mq); + comm_timer_delete(mq->wakeup_timer); free(mq); } @@ -163,10 +172,57 @@ static void dtio_wakeup(struct dt_io_thread* dtio) } } +void +mq_wakeup_cb(void* arg) +{ + struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; + /* even if the dtio is already active, because perhaps much + * traffic suddenly, we leave the timer running to save on + * managing it, the once a second timer is less work then + * starting and stopping the timer frequently */ + lock_basic_lock(&mq->dtio->wakeup_timer_lock); + mq->dtio->wakeup_timer_enabled = 0; + lock_basic_unlock(&mq->dtio->wakeup_timer_lock); + dtio_wakeup(mq->dtio); +} + +/** start timer to wakeup dtio because there is content in the queue */ +static void +dt_msg_queue_start_timer(struct dt_msg_queue* mq) +{ + struct timeval tv; + /* Start a timer to process messages to be logged. + * If we woke up the dtio thread for every message, the wakeup + * messages take up too much processing power. If the queue + * fills up the wakeup happens immediately. The timer wakes it up + * if there are infrequent messages to log. */ + + /* we cannot start a timer in dtio thread, because it is a different + * thread and its event base is in use by the other thread, it would + * give race conditions if we tried to modify its event base, + * and locks would wait until it woke up, and this is what we do. */ + + /* do not start the timer if a timer already exists, perhaps + * in another worker. So this variable is protected by a lock in + * dtio */ + lock_basic_lock(&mq->dtio->wakeup_timer_lock); + if(mq->dtio->wakeup_timer_enabled) { + lock_basic_unlock(&mq->dtio->wakeup_timer_lock); + return; + } + mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ + lock_basic_unlock(&mq->dtio->wakeup_timer_lock); + + /* start the timer, in mq, in the event base of our worker */ + tv.tv_sec = 1; + tv.tv_usec = 0; + comm_timer_set(mq->wakeup_timer, &tv); +} + void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) { - int wakeup = 0; + int wakeupnow = 0, wakeupstarttimer = 0; struct dt_msg_entry* entry; /* check conditions */ @@ -197,9 +253,14 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) /* aqcuire lock */ lock_basic_lock(&mq->lock); - /* list was empty, wakeup dtio */ + /* if list was empty, start timer for (eventual) wakeup */ if(mq->first == NULL) - wakeup = 1; + wakeupstarttimer = 1; + /* if list contains more than wakeupnum elements, wakeup now, + * or if list is (going to be) almost full */ + if(mq->msgcount+1 > DTIO_MSG_FOR_WAKEUP || + mq->cursize+len >= mq->maxsize * 9 / 10) + wakeupnow = 1; /* see if it is going to fit */ if(mq->cursize + len > mq->maxsize) { /* buffer full, or congested. */ @@ -210,6 +271,7 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) return; } mq->cursize += len; + mq->msgcount ++; /* append to list */ if(mq->last) { mq->last->next = entry; @@ -220,13 +282,19 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) /* release lock */ lock_basic_unlock(&mq->lock); - if(wakeup) + if(wakeupnow) { dtio_wakeup(mq->dtio); + } else if(wakeupstarttimer) { + dt_msg_queue_start_timer(mq); + } } struct dt_io_thread* dt_io_thread_create(void) { struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); + lock_basic_init(&dtio->wakeup_timer_lock); + lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, + sizeof(dtio->wakeup_timer_enabled)); return dtio; } @@ -234,6 +302,7 @@ void dt_io_thread_delete(struct dt_io_thread* dtio) { struct dt_io_list_item* item, *nextitem; if(!dtio) return; + lock_basic_destroy(&dtio->wakeup_timer_lock); item=dtio->io_list; while(item) { nextitem = item->next; @@ -416,6 +485,7 @@ static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, mq->first = entry->next; if(!entry->next) mq->last = NULL; mq->cursize -= entry->len; + mq->msgcount --; lock_basic_unlock(&mq->lock); *buf = entry->buf; diff --git a/dnstap/dtstream.h b/dnstap/dtstream.h index ede491f30..f87d6dc8d 100644 --- a/dnstap/dtstream.h +++ b/dnstap/dtstream.h @@ -49,6 +49,7 @@ struct dt_msg_entry; struct dt_io_list_item; struct dt_io_thread; struct config_file; +struct comm_base; /** * A message buffer with dnstap messages queued up. It is per-worker. @@ -68,11 +69,15 @@ struct dt_msg_queue { /** current size of the buffer, in bytes. data bytes of messages. * If a new message make it more than maxsize, the buffer is full */ size_t cursize; + /** number of messages in the queue */ + int msgcount; /** list of messages. The messages are added to the back and taken * out from the front. */ struct dt_msg_entry* first, *last; /** reference to the io thread to wakeup */ struct dt_io_thread* dtio; + /** the wakeup timer for dtio, on worker event base */ + struct comm_timer* wakeup_timer; }; /** @@ -166,6 +171,10 @@ struct dt_io_thread { * for the current message length that precedes the frame */ size_t cur_msg_len_done; + /** lock on wakeup_timer_enabled */ + lock_basic_type wakeup_timer_lock; + /** if wakeup timer is enabled in some thread */ + int wakeup_timer_enabled; /** command pipe that stops the pipe if closed. Used to quit * the program. [0] is read, [1] is written to. */ int commandpipe[2]; @@ -233,9 +242,10 @@ struct dt_io_list_item { /** * Create new (empty) worker message queue. Limit set to default on max. + * @param base: event base for wakeup timer. * @return NULL on malloc failure or a new queue (not locked). */ -struct dt_msg_queue* dt_msg_queue_create(void); +struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base); /** * Delete a worker message queue. It has to be unlinked from access, @@ -258,6 +268,9 @@ void dt_msg_queue_delete(struct dt_msg_queue* mq); */ void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); +/** timer callback to wakeup dtio thread to process messages */ +void mq_wakeup_cb(void* arg); + /** * Create IO thread. * @return new io thread object. not yet started. or NULL malloc failure. diff --git a/doc/Changelog b/doc/Changelog index 7e6455da8..9188da015 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,3 +1,7 @@ +23 September 2020: Wouter + - Fix #305: dnstap logging significantly affects unbound performance + (regression in 1.11). + 21 September 2020: Ralph - Fix #304: dnstap logging not recovering after dnstap process restarts diff --git a/util/fptr_wlist.c b/util/fptr_wlist.c index aa275ed53..241d9e540 100644 --- a/util/fptr_wlist.c +++ b/util/fptr_wlist.c @@ -138,6 +138,7 @@ fptr_whitelist_comm_timer(void (*fptr)(void*)) else if(fptr == &auth_xfer_probe_timer_callback) return 1; else if(fptr == &auth_xfer_transfer_timer_callback) return 1; else if(fptr == &mesh_serve_expired_callback) return 1; + else if(fptr == &mq_wakeup_cb) return 1; return 0; } -- 2.47.3