#endif /* UB_ON_WINDOWS */
}
comm_base_delete(worker->base);
+#ifdef USE_DNSTAP
+ dt_deinit(&worker->dtenv);
+#endif
ub_randfree(worker->rndstate);
alloc_clear(&worker->alloc);
regional_destroy(worker->env.scratch);
#include <protobuf-c/protobuf-c.h>
#include "dnstap/dnstap.h"
+#include "dnstap/dtstream.h"
#include "dnstap/dnstap.pb-c.h"
#define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
static void
dt_send(const struct dt_env *env, void *buf, size_t len_buf)
{
- fstrm_res res;
- if (!buf)
- return;
- res = fstrm_iothr_submit(env->iothr, env->ioq, buf, len_buf,
- fstrm_free_wrapper, NULL);
- if (res != fstrm_res_success)
- free(buf);
+ dt_msg_queue_submit(env->msgqueue, buf, len_buf);
}
static void
env->ioq = fstrm_iothr_get_input_queue(env->iothr);
if (env->ioq == NULL)
return 0;
+ env->msgqueue = dt_msg_queue_create();
+ if(!env->msgqueue) {
+ log_err("malloc failure");
+ return 0;
+ }
return 1;
}
+void
+dt_deinit(struct dt_env* env)
+{
+ dt_msg_queue_delete(env->msgqueue);
+}
+
void
dt_delete(struct dt_env *env)
{
struct fstrm_io;
struct fstrm_queue;
struct sldns_buffer;
+struct dt_msg_queue;
struct dt_env {
/** dnstap I/O thread */
/** dnstap I/O thread input queue */
struct fstrm_iothr_queue *ioq;
+ /** valid in worker struct, not in daemon struct, the per-worker
+ * message list */
+ struct dt_msg_queue* msgqueue;
/** dnstap "identity" field, NULL if disabled */
char *identity;
int
dt_init(struct dt_env *env);
+/**
+ * Deletes the per-worker state created by dt_init
+ */
+void dt_deinit(struct dt_env *env);
+
/**
* Delete dnstap environment object. Closes dnstap I/O socket and deletes all
* per-worker I/O queues.
#include "config.h"
#include "dnstap/dtstream.h"
+struct dt_msg_queue*
+dt_msg_queue_create(void)
+{
+ struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
+ if(!mq) return NULL;
+ mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
+ about 1 M should contain 64K messages with some overhead,
+ or a whole bunch smaller ones */
+ lock_basic_init(&mq->lock);
+ lock_protect(&mq->lock, mq, sizeof(*mq));
+ return mq;
+}
+
+/** clear the message list, caller must hold the lock */
+static void
+dt_msg_queue_clear(struct dt_msg_queue* mq)
+{
+ struct dt_msg_entry* e = mq->first, *next=NULL;
+ while(e) {
+ next = e->next;
+ free(e->buf);
+ free(e);
+ e = next;
+ }
+ mq->first = NULL;
+ mq->last = NULL;
+ mq->cursize = 0;
+}
+
+void
+dt_msg_queue_delete(struct dt_msg_queue* mq)
+{
+ if(!mq) return;
+ lock_basic_destroy(&mq->lock);
+ dt_msg_queue_clear(mq);
+ free(mq);
+}
+
+void
+dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
+{
+ struct dt_msg_entry* entry;
+
+ /* check conditions */
+ if(!buf) return;
+ if(len == 0) {
+ /* it is not possible to log entries with zero length,
+ * because the framestream protocol does not carry it.
+ * However the protobuf serialization does not create zero
+ * length datagrams for dnstap, so this should not happen. */
+ free(buf);
+ return;
+ }
+ if(!mq) {
+ free(buf);
+ return;
+ }
+
+ /* allocate memory for queue entry */
+ entry = malloc(sizeof(*entry));
+ if(!entry) {
+ log_err("out of memory logging dnstap");
+ free(buf);
+ return;
+ }
+ entry->next = NULL;
+ entry->buf = buf;
+ entry->len = len;
+
+ /* aqcuire lock */
+ lock_basic_lock(&mq->lock);
+ /* see if it is going to fit */
+ if(mq->cursize + len > mq->maxsize) {
+ /* buffer full, or congested. */
+ /* drop */
+ lock_basic_unlock(&mq->lock);
+ free(buf);
+ return;
+ }
+ mq->cursize += len;
+ /* append to list */
+ if(mq->last) {
+ mq->last->next = entry;
+ } else {
+ mq->first = entry;
+ }
+ mq->last = entry;
+ /* release lock */
+ lock_basic_unlock(&mq->lock);
+}
+
/* routine to send a frame. */
/* routine to send STOP message. */
+/**
+ * Create new (empty) worker message queue. Limit set to default on max.
+ * @return NULL on malloc failure or a new queue (not locked).
+ */
+struct dt_msg_queue* dt_msg_queue_create(void);
+
+/**
+ * Delete a worker message queue. It has to be unlinked from access,
+ * so it can be deleted without lock worries. The queue is emptied (deleted).
+ * @param mq: message queue.
+ */
+void dt_msg_queue_delete(struct dt_msg_queue* mq);
+
+/**
+ * Submit a message to the queue. The queue is locked by the routine,
+ * the message is inserted, and then the queue is unlocked so the
+ * message can be picked up by the writer thread.
+ * @param mq: message queue.
+ * @param buf: buffer with message (dnstap contents).
+ * The buffer must have been malloced by caller. It is linked in
+ * the queue, and is free()d after use. If the routine fails
+ * the buffer is freed as well (and nothing happens, the item
+ * could not be logged).
+ * @param len: length of buffer.
+ */
+void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
#endif /* DTSTREAM_H */