]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
iothread work.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Tue, 21 Jan 2020 16:01:25 +0000 (17:01 +0100)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Tue, 21 Jan 2020 16:01:25 +0000 (17:01 +0100)
daemon/daemon.c
daemon/worker.c
dnstap/.dtstream.h.swp [new file with mode: 0644]
dnstap/dnstap.c
dnstap/dnstap.h
dnstap/dtstream.c
dnstap/dtstream.h

index 0b1200a2e00acd2dd1d9fcb6f85397ee2497a013..65d51e182eab8c90aa201bae7686fc249bb488c8 100644 (file)
@@ -452,10 +452,9 @@ daemon_create_workers(struct daemon* daemon)
        if(daemon->cfg->dnstap) {
 #ifdef USE_DNSTAP
                daemon->dtenv = dt_create(daemon->cfg->dnstap_socket_path,
-                       (unsigned int)daemon->num);
+                       (unsigned int)daemon->num, daemon->cfg);
                if (!daemon->dtenv)
                        fatal_exit("dt_create failed");
-               dt_apply_cfg(daemon->dtenv, daemon->cfg);
 #else
                fatal_exit("dnstap enabled in config but not built with dnstap support");
 #endif
index 382bbd384355d366ab7c2958b647b831a5726f22..d144095700e04e777612227d28dfb086e41721cb 100644 (file)
@@ -78,6 +78,7 @@
 #include "sldns/wire2str.h"
 #include "util/shm_side/shm_main.h"
 #include "dnscrypt/dnscrypt.h"
+#include "dnstap/dtstream.h"
 
 #ifdef HAVE_SYS_TYPES_H
 #  include <sys/types.h>
@@ -1876,6 +1877,19 @@ worker_init(struct worker* worker, struct config_file *cfg,
                ) {
                auth_xfer_pickup_initial(worker->env.auth_zones, &worker->env);
        }
+#ifdef USE_DNSTAP
+       if(worker->daemon->cfg->dnstap
+#ifndef THREADS_DISABLED
+               && worker->thread_num == 0
+#endif
+               ) {
+               if(!dt_io_thread_start(dtenv->dtio)) {
+                       log_err("could not start dnstap io thread");
+                       worker_delete(worker);
+                       return 0;
+               }
+       }
+#endif /* USE_DNSTAP */
        if(!worker->env.mesh || !worker->env.scratch_buffer) {
                worker_delete(worker);
                return 0;
@@ -1925,8 +1939,15 @@ worker_delete(struct worker* worker)
        }
        comm_base_delete(worker->base);
 #ifdef USE_DNSTAP
-       dt_deinit(&worker->dtenv);
+       if(worker->daemon->cfg->dnstap
+#ifndef THREADS_DISABLED
+               && worker->thread_num == 0
 #endif
+               ) {
+               dt_io_thread_stop(worker->dtenv.dtio);
+       }
+       dt_deinit(&worker->dtenv);
+#endif /* USE_DNSTAP */
        ub_randfree(worker->rndstate);
        alloc_clear(&worker->alloc);
        regional_destroy(worker->env.scratch);
diff --git a/dnstap/.dtstream.h.swp b/dnstap/.dtstream.h.swp
new file mode 100644 (file)
index 0000000..22f3859
Binary files /dev/null and b/dnstap/.dtstream.h.swp differ
index fccf4a721a9c54bb8ec2adbb5fd90c8909a0ff00..e942c419473505d1cdc84eca51beeeb7fd66f88e 100644 (file)
@@ -130,7 +130,7 @@ check_socket_file(const char* socket_path)
 }
 
 struct dt_env *
-dt_create(const char *socket_path, unsigned num_workers)
+dt_create(const char *socket_path, unsigned num_workers, struct config_file* cfg)
 {
 #ifdef UNBOUND_DEBUG
        fstrm_res res;
@@ -180,6 +180,16 @@ dt_create(const char *socket_path, unsigned num_workers)
        fstrm_unix_writer_options_destroy(&fuwopt);
        fstrm_writer_options_destroy(&fwopt);
 
+       env->dtio = dt_io_thread_create();
+       if(!env->dtio) {
+               log_err("malloc failure");
+               fstrm_writer_destroy(&fw);
+               fstrm_iothr_destroy(&env->iothr);
+               free(env);
+               return NULL;
+       }
+       dt_io_thread_apply_cfg(env->dtio, cfg);
+       dt_apply_cfg(env, cfg);
        return env;
 }
 
@@ -275,12 +285,17 @@ dt_init(struct dt_env *env)
                log_err("malloc failure");
                return 0;
        }
+       if(!dt_io_thread_register_queue(env->dtio, env->msgqueue)) {
+               log_err("malloc failure");
+               return 0;
+       }
        return 1;
 }
 
 void
 dt_deinit(struct dt_env* env)
 {
+       dt_io_thread_unregister_queue(env->dtio, env->msgqueue);
        dt_msg_queue_delete(env->msgqueue);
 }
 
@@ -291,6 +306,7 @@ dt_delete(struct dt_env *env)
                return;
        verbose(VERB_OPS, "closing dnstap socket");
        fstrm_iothr_destroy(&env->iothr);
+       dt_io_thread_delete(env->dtio);
        free(env->identity);
        free(env->version);
        free(env);
index 6183300b6744e314e015d0306ed90b9e1e0bdb29..428691ed951629eb78310dc1f383de363a8233c1 100644 (file)
@@ -48,6 +48,8 @@ struct dt_msg_queue;
 struct dt_env {
        /** dnstap I/O thread */
        struct fstrm_iothr *iothr;
+       /** the io thread (made by the struct daemon) */
+       struct dt_io_thread* dtio;
 
        /** dnstap I/O thread input queue */
        struct fstrm_iothr_queue *ioq;
@@ -90,10 +92,11 @@ struct dt_env {
  * share access to the dnstap I/O socket.
  * @param socket_path: path to dnstap logging socket, must be non-NULL.
  * @param num_workers: number of worker threads, must be > 0.
+ * @param cfg: with config settings.
  * @return dt_env object, NULL on failure.
  */
 struct dt_env *
-dt_create(const char *socket_path, unsigned num_workers);
+dt_create(const char *socket_path, unsigned num_workers, struct config_file* cfg);
 
 /**
  * Apply config settings.
index 6d28f4c673b4c551bed3e2440dee8fe32b8fdada..318e04bfc577a5d5bc7741f1e917b9937eb10028 100644 (file)
 
 #include "config.h"
 #include "dnstap/dtstream.h"
+#include "util/config_file.h"
+#include "util/ub_event.h"
+#include "util/net_help.h"
+#ifdef HAVE_SYS_UN_H
+#include <sys/un.h>
+#endif
 
 struct dt_msg_queue*
 dt_msg_queue_create(void)
@@ -135,3 +141,285 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
        lock_basic_unlock(&mq->lock);
 }
 
+struct dt_io_thread* dt_io_thread_create(void)
+{
+       struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
+       return dtio;
+}
+
+void dt_io_thread_delete(struct dt_io_thread* dtio)
+{
+       struct dt_io_list_item* item, *nextitem;
+       if(!dtio) return;
+       item=dtio->io_list;
+       while(item) {
+               nextitem = item->next;
+               free(item);
+               item = nextitem;
+       }
+       free(dtio->socket_path);
+       free(dtio);
+}
+
+void dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
+{
+       dtio->upstream_is_unix = 1;
+       dtio->socket_path = strdup(cfg->dnstap_socket_path);
+}
+
+int dt_io_thread_register_queue(struct dt_io_thread* dtio,
+        struct dt_msg_queue* mq)
+{
+       struct dt_io_list_item* item = malloc(sizeof(*item));
+       if(!item) return 0;
+       item->queue = mq;
+       item->next = dtio->io_list;
+       dtio->io_list = item;
+       return 1;
+}
+
+void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
+        struct dt_msg_queue* mq)
+{
+       struct dt_io_list_item* item=dtio->io_list, *prev=NULL;
+       while(item) {
+               if(item->queue == mq) {
+                       /* found it */
+                       if(prev) prev->next = item->next;
+                       else dtio->io_list = item->next;
+                       /* the queue itself only registered, not deleted */
+                       free(item);
+                       return;
+               }
+               prev = item;
+               item = item->next;
+       }
+}
+
+/** find a new message to write, search message queues, false if none */
+static int dtio_find_msg(struct dt_io_thread* dtio)
+{
+}
+
+/** write more of the current messsage. false if incomplete, true if
+ * the message is done */
+static int dtio_write_more(struct dt_io_thread* dtio)
+{
+}
+
+/** callback for the dnstap events, to write to the output */
+static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg)
+{
+       struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+       
+       /* see if there are messages that need writing */
+       if(!dtio->cur_msg) {
+               if(!dtio_find_msg(dtio))
+                       return; /* nothing to do */
+       }
+
+       /* write it */
+       if(dtio->cur_msg_done < dtio->cur_msg_len) {
+               if(!dtio_write_more(dtio))
+                       return;
+       }
+
+       /* done with the current message */
+       free(dtio->cur_msg);
+       dtio->cur_msg = NULL;
+       dtio->cur_msg_len = 0;
+       dtio->cur_msg_done = 0;
+}
+
+/** callback for the dnstap commandpipe, to stop the dnstap IO */
+static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
+{
+       struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+       uint8_t cmd;
+       ssize_t r;
+       if(dtio->want_to_exit)
+               return;
+       r = read(fd, &cmd, sizeof(cmd));
+       if(r == -1) {
+               if(errno == EINTR || errno == EAGAIN)
+                       return; /* ignore this */
+               log_err("dnstap io: failed to read: %s", strerror(errno));
+               /* and then fall through to quit the thread */
+       }
+       if(r == 0) {
+               verbose(VERB_ALGO, "dnstap io: cmd channel closed");
+       } else if(r == 1 && cmd == 0) {
+               verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
+       }
+       dtio->want_to_exit = 1;
+       if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
+               != 0) {
+               log_err("dnstap io: could not loopexit");
+       }
+}
+
+/** setup the event base for the dnstap io thread */
+static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
+       struct timeval* now)
+{
+       memset(now, 0, sizeof(*now));
+       dtio->event_base = ub_default_event_base(0, secs, now);
+       if(!dtio->event_base) {
+               fatal_exit("dnstap io: could not create event_base");
+       }
+}
+
+/** setup the cmd event for dnstap io */
+static void dtio_setup_cmd(struct dt_io_thread* dtio)
+{
+       struct ub_event* cmdev;
+       fd_set_nonblock(dtio->commandpipe[0]);
+       cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
+               UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
+       if(!cmdev) {
+               fatal_exit("dnstap io: out of memory");
+       }
+       dtio->command_event = cmdev;
+       if(ub_event_add(cmdev, NULL) != 0) {
+               fatal_exit("dnstap io: out of memory (adding event)");
+       }
+}
+
+/** del the output file descriptor event for listening */
+static void dtio_del_output_event(struct dt_io_thread* dtio)
+{
+       if(!dtio->event_added)
+               return;
+       ub_event_del(dtio->event);
+       dtio->event_added = 0;
+}
+
+/** close and stop the output file descriptor event */
+static void dtio_close_output(struct dt_io_thread* dtio)
+{
+       if(!dtio->event)
+               return;
+       ub_event_free(dtio->event);
+       dtio->event = NULL;
+       close(dtio->fd);
+       dtio->fd = -1;
+}
+
+/** perform desetup and free stuff when the dnstap io thread exits */
+static void dtio_desetup(struct dt_io_thread* dtio)
+{
+       dtio_del_output_event(dtio);
+       dtio_close_output(dtio);
+       ub_event_del(dtio->command_event);
+       ub_event_free(dtio->command_event);
+       close(dtio->commandpipe[0]);
+       dtio->commandpipe[0] = -1;
+       ub_event_base_free(dtio->event_base);
+}
+
+/** open the output file descriptor */
+static void dtio_open_output(struct dt_io_thread* dtio)
+{
+       struct ub_event* ev;
+       struct sockaddr_un s;
+       dtio->fd = socket(AF_LOCAL, SOCK_STREAM, SOCK_CLOEXEC);
+       if(dtio->fd == -1) {
+               log_err("dnstap io: failed to create socket: %s",
+                       strerror(errno));
+               return;
+       }
+       memset(&s, 0, sizeof(s));
+#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
+        /* this member exists on BSDs, not Linux */
+        s.sun_len = (unsigned)sizeof(usock);
+#endif
+       s.sun_family = AF_LOCAL;
+       /* length is 92-108, 104 on FreeBSD */
+        (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
+       if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
+               == -1) {
+               log_err("dnstap io: failed to connect: %s", strerror(errno));
+               return;
+       }
+       fd_set_nonblock(dtio->fd);
+
+       /* the EV_READ is to catch channel close, write to write packets */
+       ev = ub_event_new(dtio->event_base, dtio->fd,
+               UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
+               dtio);
+       if(!ev) {
+               fatal_exit("dnstap io: out of memory");
+       }
+       dtio->event = ev;
+
+}
+
+/** add the output file descriptor event for listening */
+static void dtio_add_output_event(struct dt_io_thread* dtio)
+{
+       if(ub_event_add(dtio->event, NULL) != 0) {
+               fatal_exit("dnstap io: out of memory (adding event)");
+       }
+       dtio->event_added = 1;
+}
+
+/** the IO thread function for the DNSTAP IO */
+static void* dnstap_io(void* arg)
+{
+       struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+       time_t secs = 0;
+       struct timeval now;
+
+       /* setup */
+       dtio_setup_base(dtio, &secs, &now);
+       dtio_setup_cmd(dtio);
+       dtio_open_output(dtio);
+       dtio_add_output_event(dtio);
+       verbose(VERB_ALGO, "start dnstap io thread");
+
+       /* run */
+       if(ub_event_base_dispatch(dtio->event_base) < 0) {
+               log_err("dnstap io: dispatch failed, errno is %s",
+                       strerror(errno));
+       }
+
+       /* cleanup */
+       verbose(VERB_ALGO, "stop dnstap io thread");
+       dtio_desetup(dtio);
+       return NULL;
+}
+
+int dt_io_thread_start(struct dt_io_thread* dtio)
+{
+       /* set up the thread, can fail */
+       if(pipe(dtio->commandpipe) == -1) {
+               log_err("failed to create pipe: %s", strerror(errno));
+               return 0;
+       }
+
+       /* start the thread */
+       ub_thread_create(&dtio->tid, dnstap_io, dtio);
+       return 1;
+}
+
+void dt_io_thread_stop(struct dt_io_thread* dtio)
+{
+       uint8_t cmd = 0;
+       if(!dtio) return;
+       if(!dtio->event_base) return; /* not started */
+
+       while(1) {
+               ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
+               if(r == -1) {
+                       if(errno == EINTR || errno == EAGAIN)
+                               continue;
+                       log_err("dnstap io stop: write: %s", strerror(errno));
+                       break;
+               }
+               break;
+       }
+
+       close(dtio->commandpipe[1]);
+       dtio->commandpipe[1] = -1;
+       ub_thread_join(dtio->tid);
+}
index 6030b86d6814074b150537a7b84a347a4c330e88..5d4e57646399e47420237a2b395adebedaa32b3e 100644 (file)
@@ -47,6 +47,7 @@
 #include "util/locks.h"
 struct dt_msg_entry;
 struct dt_io_list_item;
+struct config_file;
 
 /**
  * A message buffer with dnstap messages queued up.  It is per-worker.
@@ -92,16 +93,29 @@ struct dt_io_thread {
        void* event_base;
        /** list of queues that is registered to get written */
        struct dt_io_list_item* io_list;
+       /** thread id, of the io thread */
+       ub_thread_type tid;
        /** file descriptor that the thread writes to */
        int fd;
        /** event structure that the thread uses */
        void* event;
+       /** the event is added */
+       int event_added;
+       /** the buffer that currently getting written, or NULL if no
+        * (partial) message written now */
+       void* cur_msg;
+       /** length of the current message */
+       size_t cur_msg_len;
+       /** number of bytes written for the current message */
+       size_t cur_msg_done;
 
        /** command pipe that stops the pipe if closed.  Used to quit
         * the program. [0] is read, [1] is written to. */
        int commandpipe[2];
        /** the event to listen to the commandpipe */
        void* command_event;
+       /** the io thread wants to exit */
+       int want_to_exit;
 
        /** If the log server is connected to over unix domain sockets,
         * eg. a file is named that is created to log onto. */
@@ -236,4 +250,56 @@ void dt_msg_queue_delete(struct dt_msg_queue* mq);
  */
 void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
 
+/**
+ * Create IO thread.
+ * @return new io thread object. not yet started. or NULL malloc failure.
+ */
+struct dt_io_thread* dt_io_thread_create(void);
+
+/**
+ * Delete the IO thread structure.
+ * @param dtio: the io thread that is deleted.  It must not be running.
+ */
+void dt_io_thread_delete(struct dt_io_thread* dtio);
+
+/**
+ * Apply config to the dtio thread
+ * @param dtio: io thread, not yet started.
+ * @param cfg: config file struct.
+ */
+void dt_io_thread_apply_cfg(struct dt_io_thread* dtio,
+       struct config_file *cfg);
+
+/**
+ * Register a msg queue to the io thread.  It will be polled to see if
+ * there are messages and those then get removed and sent, when the thread
+ * is running.
+ * @param dtio: the io thread.
+ * @param mq: message queue to register.
+ * @return false on failure (malloc failure).
+ */
+int dt_io_thread_register_queue(struct dt_io_thread* dtio,
+       struct dt_msg_queue* mq);
+
+/**
+ * Unregister queue from io thread.
+ * @param dtio: the io thread.
+ * @param mq: message queue.
+ */
+void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
+        struct dt_msg_queue* mq);
+
+/**
+ * Start the io thread
+ * @param dtio: the io thread.
+ * @return false on failure.
+ */
+int dt_io_thread_start(struct dt_io_thread* dtio);
+
+/** 
+ * Stop the io thread
+ * @param dtio: the io thread.
+ */
+void dt_io_thread_stop(struct dt_io_thread* dtio);
+
 #endif /* DTSTREAM_H */