From: W.C.A. Wijngaards Date: Tue, 21 Jan 2020 13:50:37 +0000 (+0100) Subject: the framestream queue. X-Git-Tag: 1.11.0rc1~120^2~107 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=57ad1696051874348cbd85bbd8b8a768c0abadb8;p=thirdparty%2Funbound.git the framestream queue. --- diff --git a/daemon/worker.c b/daemon/worker.c index aa16650ec..382bbd384 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -1924,6 +1924,9 @@ worker_delete(struct worker* worker) #endif /* UB_ON_WINDOWS */ } comm_base_delete(worker->base); +#ifdef USE_DNSTAP + dt_deinit(&worker->dtenv); +#endif ub_randfree(worker->rndstate); alloc_clear(&worker->alloc); regional_destroy(worker->env.scratch); diff --git a/dnstap/dnstap.c b/dnstap/dnstap.c index aabf8eec9..fccf4a721 100644 --- a/dnstap/dnstap.c +++ b/dnstap/dnstap.c @@ -53,6 +53,7 @@ #include #include "dnstap/dnstap.h" +#include "dnstap/dtstream.h" #include "dnstap/dnstap.pb-c.h" #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap" @@ -90,13 +91,7 @@ dt_pack(const Dnstap__Dnstap *d, void **buf, size_t *sz) static void dt_send(const struct dt_env *env, void *buf, size_t len_buf) { - fstrm_res res; - if (!buf) - return; - res = fstrm_iothr_submit(env->iothr, env->ioq, buf, len_buf, - fstrm_free_wrapper, NULL); - if (res != fstrm_res_success) - free(buf); + dt_msg_queue_submit(env->msgqueue, buf, len_buf); } static void @@ -275,9 +270,20 @@ dt_init(struct dt_env *env) env->ioq = fstrm_iothr_get_input_queue(env->iothr); if (env->ioq == NULL) return 0; + env->msgqueue = dt_msg_queue_create(); + if(!env->msgqueue) { + log_err("malloc failure"); + return 0; + } return 1; } +void +dt_deinit(struct dt_env* env) +{ + dt_msg_queue_delete(env->msgqueue); +} + void dt_delete(struct dt_env *env) { diff --git a/dnstap/dnstap.h b/dnstap/dnstap.h index 0103c1c0e..6183300b6 100644 --- a/dnstap/dnstap.h +++ b/dnstap/dnstap.h @@ -43,6 +43,7 @@ struct config_file; struct fstrm_io; struct fstrm_queue; struct sldns_buffer; +struct dt_msg_queue; struct dt_env { /** dnstap I/O thread */ @@ -50,6 +51,9 @@ struct dt_env { /** dnstap I/O thread input queue */ struct fstrm_iothr_queue *ioq; + /** valid in worker struct, not in daemon struct, the per-worker + * message list */ + struct dt_msg_queue* msgqueue; /** dnstap "identity" field, NULL if disabled */ char *identity; @@ -107,6 +111,11 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg); int dt_init(struct dt_env *env); +/** + * Deletes the per-worker state created by dt_init + */ +void dt_deinit(struct dt_env *env); + /** * Delete dnstap environment object. Closes dnstap I/O socket and deletes all * per-worker I/O queues. diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index cad4d1f48..6d28f4c67 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -44,3 +44,94 @@ #include "config.h" #include "dnstap/dtstream.h" +struct dt_msg_queue* +dt_msg_queue_create(void) +{ + struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); + if(!mq) return NULL; + mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, + about 1 M should contain 64K messages with some overhead, + or a whole bunch smaller ones */ + lock_basic_init(&mq->lock); + lock_protect(&mq->lock, mq, sizeof(*mq)); + return mq; +} + +/** clear the message list, caller must hold the lock */ +static void +dt_msg_queue_clear(struct dt_msg_queue* mq) +{ + struct dt_msg_entry* e = mq->first, *next=NULL; + while(e) { + next = e->next; + free(e->buf); + free(e); + e = next; + } + mq->first = NULL; + mq->last = NULL; + mq->cursize = 0; +} + +void +dt_msg_queue_delete(struct dt_msg_queue* mq) +{ + if(!mq) return; + lock_basic_destroy(&mq->lock); + dt_msg_queue_clear(mq); + free(mq); +} + +void +dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) +{ + struct dt_msg_entry* entry; + + /* check conditions */ + if(!buf) return; + if(len == 0) { + /* it is not possible to log entries with zero length, + * because the framestream protocol does not carry it. + * However the protobuf serialization does not create zero + * length datagrams for dnstap, so this should not happen. */ + free(buf); + return; + } + if(!mq) { + free(buf); + return; + } + + /* allocate memory for queue entry */ + entry = malloc(sizeof(*entry)); + if(!entry) { + log_err("out of memory logging dnstap"); + free(buf); + return; + } + entry->next = NULL; + entry->buf = buf; + entry->len = len; + + /* aqcuire lock */ + lock_basic_lock(&mq->lock); + /* see if it is going to fit */ + if(mq->cursize + len > mq->maxsize) { + /* buffer full, or congested. */ + /* drop */ + lock_basic_unlock(&mq->lock); + free(buf); + return; + } + mq->cursize += len; + /* append to list */ + if(mq->last) { + mq->last->next = entry; + } else { + mq->first = entry; + } + mq->last = entry; + /* release lock */ + lock_basic_unlock(&mq->lock); +} + diff --git a/dnstap/dtstream.h b/dnstap/dtstream.h index e76264f50..6030b86d6 100644 --- a/dnstap/dtstream.h +++ b/dnstap/dtstream.h @@ -209,5 +209,31 @@ struct dt_io_list_item { /* routine to send a frame. */ /* routine to send STOP message. */ +/** + * Create new (empty) worker message queue. Limit set to default on max. + * @return NULL on malloc failure or a new queue (not locked). + */ +struct dt_msg_queue* dt_msg_queue_create(void); + +/** + * Delete a worker message queue. It has to be unlinked from access, + * so it can be deleted without lock worries. The queue is emptied (deleted). + * @param mq: message queue. + */ +void dt_msg_queue_delete(struct dt_msg_queue* mq); + +/** + * Submit a message to the queue. The queue is locked by the routine, + * the message is inserted, and then the queue is unlocked so the + * message can be picked up by the writer thread. + * @param mq: message queue. + * @param buf: buffer with message (dnstap contents). + * The buffer must have been malloced by caller. It is linked in + * the queue, and is free()d after use. If the routine fails + * the buffer is freed as well (and nothing happens, the item + * could not be logged). + * @param len: length of buffer. + */ +void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); #endif /* DTSTREAM_H */