]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
the framestream queue.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Tue, 21 Jan 2020 13:50:37 +0000 (14:50 +0100)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Tue, 21 Jan 2020 13:50:37 +0000 (14:50 +0100)
daemon/worker.c
dnstap/dnstap.c
dnstap/dnstap.h
dnstap/dtstream.c
dnstap/dtstream.h

index aa16650ec7a629d159bc8ba2d4b1ad8784a3498d..382bbd384355d366ab7c2958b647b831a5726f22 100644 (file)
@@ -1924,6 +1924,9 @@ worker_delete(struct worker* worker)
 #endif /* UB_ON_WINDOWS */
        }
        comm_base_delete(worker->base);
+#ifdef USE_DNSTAP
+       dt_deinit(&worker->dtenv);
+#endif
        ub_randfree(worker->rndstate);
        alloc_clear(&worker->alloc);
        regional_destroy(worker->env.scratch);
index aabf8eec907129972df4ad45f42740be5e28c3d5..fccf4a721a9c54bb8ec2adbb5fd90c8909a0ff00 100644 (file)
@@ -53,6 +53,7 @@
 #include <protobuf-c/protobuf-c.h>
 
 #include "dnstap/dnstap.h"
+#include "dnstap/dtstream.h"
 #include "dnstap/dnstap.pb-c.h"
 
 #define DNSTAP_CONTENT_TYPE            "protobuf:dnstap.Dnstap"
@@ -90,13 +91,7 @@ dt_pack(const Dnstap__Dnstap *d, void **buf, size_t *sz)
 static void
 dt_send(const struct dt_env *env, void *buf, size_t len_buf)
 {
-       fstrm_res res;
-       if (!buf)
-               return;
-       res = fstrm_iothr_submit(env->iothr, env->ioq, buf, len_buf,
-                                fstrm_free_wrapper, NULL);
-       if (res != fstrm_res_success)
-               free(buf);
+       dt_msg_queue_submit(env->msgqueue, buf, len_buf);
 }
 
 static void
@@ -275,9 +270,20 @@ dt_init(struct dt_env *env)
        env->ioq = fstrm_iothr_get_input_queue(env->iothr);
        if (env->ioq == NULL)
                return 0;
+       env->msgqueue = dt_msg_queue_create();
+       if(!env->msgqueue) {
+               log_err("malloc failure");
+               return 0;
+       }
        return 1;
 }
 
+void
+dt_deinit(struct dt_env* env)
+{
+       dt_msg_queue_delete(env->msgqueue);
+}
+
 void
 dt_delete(struct dt_env *env)
 {
index 0103c1c0e20133d158c427e8daafe19fd7b925a9..6183300b6744e314e015d0306ed90b9e1e0bdb29 100644 (file)
@@ -43,6 +43,7 @@ struct config_file;
 struct fstrm_io;
 struct fstrm_queue;
 struct sldns_buffer;
+struct dt_msg_queue;
 
 struct dt_env {
        /** dnstap I/O thread */
@@ -50,6 +51,9 @@ struct dt_env {
 
        /** dnstap I/O thread input queue */
        struct fstrm_iothr_queue *ioq;
+       /** valid in worker struct, not in daemon struct, the per-worker
+        * message list */
+       struct dt_msg_queue* msgqueue;
 
        /** dnstap "identity" field, NULL if disabled */
        char *identity;
@@ -107,6 +111,11 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg);
 int
 dt_init(struct dt_env *env);
 
+/**
+ * Deletes the per-worker state created by dt_init
+ */
+void dt_deinit(struct dt_env *env);
+
 /**
  * Delete dnstap environment object. Closes dnstap I/O socket and deletes all
  * per-worker I/O queues.
index cad4d1f483a51790ff78fb25b085df5a83fbf660..6d28f4c673b4c551bed3e2440dee8fe32b8fdada 100644 (file)
 #include "config.h"
 #include "dnstap/dtstream.h"
 
+struct dt_msg_queue*
+dt_msg_queue_create(void)
+{
+       struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
+       if(!mq) return NULL;
+       mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
+               about 1 M should contain 64K messages with some overhead,
+               or a whole bunch smaller ones */
+       lock_basic_init(&mq->lock);
+       lock_protect(&mq->lock, mq, sizeof(*mq));
+       return mq;
+}
+
+/** clear the message list, caller must hold the lock */
+static void
+dt_msg_queue_clear(struct dt_msg_queue* mq)
+{
+       struct dt_msg_entry* e = mq->first, *next=NULL;
+       while(e) {
+               next = e->next;
+               free(e->buf);
+               free(e);
+               e = next;
+       }
+       mq->first = NULL;
+       mq->last = NULL;
+       mq->cursize = 0;
+}
+
+void
+dt_msg_queue_delete(struct dt_msg_queue* mq)
+{
+       if(!mq) return;
+       lock_basic_destroy(&mq->lock);
+       dt_msg_queue_clear(mq);
+       free(mq);
+}
+
+void
+dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
+{
+       struct dt_msg_entry* entry;
+
+       /* check conditions */
+       if(!buf) return;
+       if(len == 0) {
+               /* it is not possible to log entries with zero length,
+                * because the framestream protocol does not carry it.
+                * However the protobuf serialization does not create zero
+                * length datagrams for dnstap, so this should not happen. */
+               free(buf);
+               return;
+       }
+       if(!mq) {
+               free(buf);
+               return;
+       }
+
+       /* allocate memory for queue entry */
+       entry = malloc(sizeof(*entry));
+       if(!entry) {
+               log_err("out of memory logging dnstap");
+               free(buf);
+               return;
+       }
+       entry->next = NULL;
+       entry->buf = buf;
+       entry->len = len;
+
+       /* aqcuire lock */
+       lock_basic_lock(&mq->lock);
+       /* see if it is going to fit */
+       if(mq->cursize + len > mq->maxsize) {
+               /* buffer full, or congested. */
+               /* drop */
+               lock_basic_unlock(&mq->lock);
+               free(buf);
+               return;
+       }
+       mq->cursize += len;
+       /* append to list */
+       if(mq->last) {
+               mq->last->next = entry;
+       } else {
+               mq->first = entry;
+       }
+       mq->last = entry;
+       /* release lock */
+       lock_basic_unlock(&mq->lock);
+}
+
index e76264f50f02a2237d1b996b752591d714feb35a..6030b86d6814074b150537a7b84a347a4c330e88 100644 (file)
@@ -209,5 +209,31 @@ struct dt_io_list_item {
 /* routine to send a frame. */
 /* routine to send STOP message. */
 
+/**
+ * Create new (empty) worker message queue. Limit set to default on max.
+ * @return NULL on malloc failure or a new queue (not locked).
+ */
+struct dt_msg_queue* dt_msg_queue_create(void);
+
+/**
+ * Delete a worker message queue.  It has to be unlinked from access,
+ * so it can be deleted without lock worries.  The queue is emptied (deleted).
+ * @param mq: message queue.
+ */
+void dt_msg_queue_delete(struct dt_msg_queue* mq);
+
+/**
+ * Submit a message to the queue.  The queue is locked by the routine,
+ * the message is inserted, and then the queue is unlocked so the
+ * message can be picked up by the writer thread.
+ * @param mq: message queue.
+ * @param buf: buffer with message (dnstap contents).
+ *     The buffer must have been malloced by caller.  It is linked in
+ *     the queue, and is free()d after use.  If the routine fails
+ *     the buffer is freed as well (and nothing happens, the item
+ *     could not be logged).
+ * @param len: length of buffer.
+ */
+void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
 
 #endif /* DTSTREAM_H */