From: W.C.A. Wijngaards Date: Tue, 21 Jan 2020 16:01:25 +0000 (+0100) Subject: iothread work. X-Git-Tag: 1.11.0rc1~120^2~106 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=efc79beb2d8bae70c62fa4ca703587bd810590ff;p=thirdparty%2Funbound.git iothread work. --- diff --git a/daemon/daemon.c b/daemon/daemon.c index 0b1200a2e..65d51e182 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -452,10 +452,9 @@ daemon_create_workers(struct daemon* daemon) if(daemon->cfg->dnstap) { #ifdef USE_DNSTAP daemon->dtenv = dt_create(daemon->cfg->dnstap_socket_path, - (unsigned int)daemon->num); + (unsigned int)daemon->num, daemon->cfg); if (!daemon->dtenv) fatal_exit("dt_create failed"); - dt_apply_cfg(daemon->dtenv, daemon->cfg); #else fatal_exit("dnstap enabled in config but not built with dnstap support"); #endif diff --git a/daemon/worker.c b/daemon/worker.c index 382bbd384..d14409570 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -78,6 +78,7 @@ #include "sldns/wire2str.h" #include "util/shm_side/shm_main.h" #include "dnscrypt/dnscrypt.h" +#include "dnstap/dtstream.h" #ifdef HAVE_SYS_TYPES_H # include @@ -1876,6 +1877,19 @@ worker_init(struct worker* worker, struct config_file *cfg, ) { auth_xfer_pickup_initial(worker->env.auth_zones, &worker->env); } +#ifdef USE_DNSTAP + if(worker->daemon->cfg->dnstap +#ifndef THREADS_DISABLED + && worker->thread_num == 0 +#endif + ) { + if(!dt_io_thread_start(dtenv->dtio)) { + log_err("could not start dnstap io thread"); + worker_delete(worker); + return 0; + } + } +#endif /* USE_DNSTAP */ if(!worker->env.mesh || !worker->env.scratch_buffer) { worker_delete(worker); return 0; @@ -1925,8 +1939,15 @@ worker_delete(struct worker* worker) } comm_base_delete(worker->base); #ifdef USE_DNSTAP - dt_deinit(&worker->dtenv); + if(worker->daemon->cfg->dnstap +#ifndef THREADS_DISABLED + && worker->thread_num == 0 #endif + ) { + dt_io_thread_stop(worker->dtenv.dtio); + } + dt_deinit(&worker->dtenv); +#endif /* USE_DNSTAP */ ub_randfree(worker->rndstate); alloc_clear(&worker->alloc); regional_destroy(worker->env.scratch); diff --git a/dnstap/.dtstream.h.swp b/dnstap/.dtstream.h.swp new file mode 100644 index 000000000..22f3859ad Binary files /dev/null and b/dnstap/.dtstream.h.swp differ diff --git a/dnstap/dnstap.c b/dnstap/dnstap.c index fccf4a721..e942c4194 100644 --- a/dnstap/dnstap.c +++ b/dnstap/dnstap.c @@ -130,7 +130,7 @@ check_socket_file(const char* socket_path) } struct dt_env * -dt_create(const char *socket_path, unsigned num_workers) +dt_create(const char *socket_path, unsigned num_workers, struct config_file* cfg) { #ifdef UNBOUND_DEBUG fstrm_res res; @@ -180,6 +180,16 @@ dt_create(const char *socket_path, unsigned num_workers) fstrm_unix_writer_options_destroy(&fuwopt); fstrm_writer_options_destroy(&fwopt); + env->dtio = dt_io_thread_create(); + if(!env->dtio) { + log_err("malloc failure"); + fstrm_writer_destroy(&fw); + fstrm_iothr_destroy(&env->iothr); + free(env); + return NULL; + } + dt_io_thread_apply_cfg(env->dtio, cfg); + dt_apply_cfg(env, cfg); return env; } @@ -275,12 +285,17 @@ dt_init(struct dt_env *env) log_err("malloc failure"); return 0; } + if(!dt_io_thread_register_queue(env->dtio, env->msgqueue)) { + log_err("malloc failure"); + return 0; + } return 1; } void dt_deinit(struct dt_env* env) { + dt_io_thread_unregister_queue(env->dtio, env->msgqueue); dt_msg_queue_delete(env->msgqueue); } @@ -291,6 +306,7 @@ dt_delete(struct dt_env *env) return; verbose(VERB_OPS, "closing dnstap socket"); fstrm_iothr_destroy(&env->iothr); + dt_io_thread_delete(env->dtio); free(env->identity); free(env->version); free(env); diff --git a/dnstap/dnstap.h b/dnstap/dnstap.h index 6183300b6..428691ed9 100644 --- a/dnstap/dnstap.h +++ b/dnstap/dnstap.h @@ -48,6 +48,8 @@ struct dt_msg_queue; struct dt_env { /** dnstap I/O thread */ struct fstrm_iothr *iothr; + /** the io thread (made by the struct daemon) */ + struct dt_io_thread* dtio; /** dnstap I/O thread input queue */ struct fstrm_iothr_queue *ioq; @@ -90,10 +92,11 @@ struct dt_env { * share access to the dnstap I/O socket. * @param socket_path: path to dnstap logging socket, must be non-NULL. * @param num_workers: number of worker threads, must be > 0. + * @param cfg: with config settings. * @return dt_env object, NULL on failure. */ struct dt_env * -dt_create(const char *socket_path, unsigned num_workers); +dt_create(const char *socket_path, unsigned num_workers, struct config_file* cfg); /** * Apply config settings. diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index 6d28f4c67..318e04bfc 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -43,6 +43,12 @@ #include "config.h" #include "dnstap/dtstream.h" +#include "util/config_file.h" +#include "util/ub_event.h" +#include "util/net_help.h" +#ifdef HAVE_SYS_UN_H +#include +#endif struct dt_msg_queue* dt_msg_queue_create(void) @@ -135,3 +141,285 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) lock_basic_unlock(&mq->lock); } +struct dt_io_thread* dt_io_thread_create(void) +{ + struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); + return dtio; +} + +void dt_io_thread_delete(struct dt_io_thread* dtio) +{ + struct dt_io_list_item* item, *nextitem; + if(!dtio) return; + item=dtio->io_list; + while(item) { + nextitem = item->next; + free(item); + item = nextitem; + } + free(dtio->socket_path); + free(dtio); +} + +void dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) +{ + dtio->upstream_is_unix = 1; + dtio->socket_path = strdup(cfg->dnstap_socket_path); +} + +int dt_io_thread_register_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq) +{ + struct dt_io_list_item* item = malloc(sizeof(*item)); + if(!item) return 0; + item->queue = mq; + item->next = dtio->io_list; + dtio->io_list = item; + return 1; +} + +void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq) +{ + struct dt_io_list_item* item=dtio->io_list, *prev=NULL; + while(item) { + if(item->queue == mq) { + /* found it */ + if(prev) prev->next = item->next; + else dtio->io_list = item->next; + /* the queue itself only registered, not deleted */ + free(item); + return; + } + prev = item; + item = item->next; + } +} + +/** find a new message to write, search message queues, false if none */ +static int dtio_find_msg(struct dt_io_thread* dtio) +{ +} + +/** write more of the current messsage. false if incomplete, true if + * the message is done */ +static int dtio_write_more(struct dt_io_thread* dtio) +{ +} + +/** callback for the dnstap events, to write to the output */ +static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg) +{ + struct dt_io_thread* dtio = (struct dt_io_thread*)arg; + + /* see if there are messages that need writing */ + if(!dtio->cur_msg) { + if(!dtio_find_msg(dtio)) + return; /* nothing to do */ + } + + /* write it */ + if(dtio->cur_msg_done < dtio->cur_msg_len) { + if(!dtio_write_more(dtio)) + return; + } + + /* done with the current message */ + free(dtio->cur_msg); + dtio->cur_msg = NULL; + dtio->cur_msg_len = 0; + dtio->cur_msg_done = 0; +} + +/** callback for the dnstap commandpipe, to stop the dnstap IO */ +static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) +{ + struct dt_io_thread* dtio = (struct dt_io_thread*)arg; + uint8_t cmd; + ssize_t r; + if(dtio->want_to_exit) + return; + r = read(fd, &cmd, sizeof(cmd)); + if(r == -1) { + if(errno == EINTR || errno == EAGAIN) + return; /* ignore this */ + log_err("dnstap io: failed to read: %s", strerror(errno)); + /* and then fall through to quit the thread */ + } + if(r == 0) { + verbose(VERB_ALGO, "dnstap io: cmd channel closed"); + } else if(r == 1 && cmd == 0) { + verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); + } + dtio->want_to_exit = 1; + if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) + != 0) { + log_err("dnstap io: could not loopexit"); + } +} + +/** setup the event base for the dnstap io thread */ +static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, + struct timeval* now) +{ + memset(now, 0, sizeof(*now)); + dtio->event_base = ub_default_event_base(0, secs, now); + if(!dtio->event_base) { + fatal_exit("dnstap io: could not create event_base"); + } +} + +/** setup the cmd event for dnstap io */ +static void dtio_setup_cmd(struct dt_io_thread* dtio) +{ + struct ub_event* cmdev; + fd_set_nonblock(dtio->commandpipe[0]); + cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], + UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); + if(!cmdev) { + fatal_exit("dnstap io: out of memory"); + } + dtio->command_event = cmdev; + if(ub_event_add(cmdev, NULL) != 0) { + fatal_exit("dnstap io: out of memory (adding event)"); + } +} + +/** del the output file descriptor event for listening */ +static void dtio_del_output_event(struct dt_io_thread* dtio) +{ + if(!dtio->event_added) + return; + ub_event_del(dtio->event); + dtio->event_added = 0; +} + +/** close and stop the output file descriptor event */ +static void dtio_close_output(struct dt_io_thread* dtio) +{ + if(!dtio->event) + return; + ub_event_free(dtio->event); + dtio->event = NULL; + close(dtio->fd); + dtio->fd = -1; +} + +/** perform desetup and free stuff when the dnstap io thread exits */ +static void dtio_desetup(struct dt_io_thread* dtio) +{ + dtio_del_output_event(dtio); + dtio_close_output(dtio); + ub_event_del(dtio->command_event); + ub_event_free(dtio->command_event); + close(dtio->commandpipe[0]); + dtio->commandpipe[0] = -1; + ub_event_base_free(dtio->event_base); +} + +/** open the output file descriptor */ +static void dtio_open_output(struct dt_io_thread* dtio) +{ + struct ub_event* ev; + struct sockaddr_un s; + dtio->fd = socket(AF_LOCAL, SOCK_STREAM, SOCK_CLOEXEC); + if(dtio->fd == -1) { + log_err("dnstap io: failed to create socket: %s", + strerror(errno)); + return; + } + memset(&s, 0, sizeof(s)); +#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN + /* this member exists on BSDs, not Linux */ + s.sun_len = (unsigned)sizeof(usock); +#endif + s.sun_family = AF_LOCAL; + /* length is 92-108, 104 on FreeBSD */ + (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); + if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) + == -1) { + log_err("dnstap io: failed to connect: %s", strerror(errno)); + return; + } + fd_set_nonblock(dtio->fd); + + /* the EV_READ is to catch channel close, write to write packets */ + ev = ub_event_new(dtio->event_base, dtio->fd, + UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, + dtio); + if(!ev) { + fatal_exit("dnstap io: out of memory"); + } + dtio->event = ev; + +} + +/** add the output file descriptor event for listening */ +static void dtio_add_output_event(struct dt_io_thread* dtio) +{ + if(ub_event_add(dtio->event, NULL) != 0) { + fatal_exit("dnstap io: out of memory (adding event)"); + } + dtio->event_added = 1; +} + +/** the IO thread function for the DNSTAP IO */ +static void* dnstap_io(void* arg) +{ + struct dt_io_thread* dtio = (struct dt_io_thread*)arg; + time_t secs = 0; + struct timeval now; + + /* setup */ + dtio_setup_base(dtio, &secs, &now); + dtio_setup_cmd(dtio); + dtio_open_output(dtio); + dtio_add_output_event(dtio); + verbose(VERB_ALGO, "start dnstap io thread"); + + /* run */ + if(ub_event_base_dispatch(dtio->event_base) < 0) { + log_err("dnstap io: dispatch failed, errno is %s", + strerror(errno)); + } + + /* cleanup */ + verbose(VERB_ALGO, "stop dnstap io thread"); + dtio_desetup(dtio); + return NULL; +} + +int dt_io_thread_start(struct dt_io_thread* dtio) +{ + /* set up the thread, can fail */ + if(pipe(dtio->commandpipe) == -1) { + log_err("failed to create pipe: %s", strerror(errno)); + return 0; + } + + /* start the thread */ + ub_thread_create(&dtio->tid, dnstap_io, dtio); + return 1; +} + +void dt_io_thread_stop(struct dt_io_thread* dtio) +{ + uint8_t cmd = 0; + if(!dtio) return; + if(!dtio->event_base) return; /* not started */ + + while(1) { + ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); + if(r == -1) { + if(errno == EINTR || errno == EAGAIN) + continue; + log_err("dnstap io stop: write: %s", strerror(errno)); + break; + } + break; + } + + close(dtio->commandpipe[1]); + dtio->commandpipe[1] = -1; + ub_thread_join(dtio->tid); +} diff --git a/dnstap/dtstream.h b/dnstap/dtstream.h index 6030b86d6..5d4e57646 100644 --- a/dnstap/dtstream.h +++ b/dnstap/dtstream.h @@ -47,6 +47,7 @@ #include "util/locks.h" struct dt_msg_entry; struct dt_io_list_item; +struct config_file; /** * A message buffer with dnstap messages queued up. It is per-worker. @@ -92,16 +93,29 @@ struct dt_io_thread { void* event_base; /** list of queues that is registered to get written */ struct dt_io_list_item* io_list; + /** thread id, of the io thread */ + ub_thread_type tid; /** file descriptor that the thread writes to */ int fd; /** event structure that the thread uses */ void* event; + /** the event is added */ + int event_added; + /** the buffer that currently getting written, or NULL if no + * (partial) message written now */ + void* cur_msg; + /** length of the current message */ + size_t cur_msg_len; + /** number of bytes written for the current message */ + size_t cur_msg_done; /** command pipe that stops the pipe if closed. Used to quit * the program. [0] is read, [1] is written to. */ int commandpipe[2]; /** the event to listen to the commandpipe */ void* command_event; + /** the io thread wants to exit */ + int want_to_exit; /** If the log server is connected to over unix domain sockets, * eg. a file is named that is created to log onto. */ @@ -236,4 +250,56 @@ void dt_msg_queue_delete(struct dt_msg_queue* mq); */ void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); +/** + * Create IO thread. + * @return new io thread object. not yet started. or NULL malloc failure. + */ +struct dt_io_thread* dt_io_thread_create(void); + +/** + * Delete the IO thread structure. + * @param dtio: the io thread that is deleted. It must not be running. + */ +void dt_io_thread_delete(struct dt_io_thread* dtio); + +/** + * Apply config to the dtio thread + * @param dtio: io thread, not yet started. + * @param cfg: config file struct. + */ +void dt_io_thread_apply_cfg(struct dt_io_thread* dtio, + struct config_file *cfg); + +/** + * Register a msg queue to the io thread. It will be polled to see if + * there are messages and those then get removed and sent, when the thread + * is running. + * @param dtio: the io thread. + * @param mq: message queue to register. + * @return false on failure (malloc failure). + */ +int dt_io_thread_register_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq); + +/** + * Unregister queue from io thread. + * @param dtio: the io thread. + * @param mq: message queue. + */ +void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq); + +/** + * Start the io thread + * @param dtio: the io thread. + * @return false on failure. + */ +int dt_io_thread_start(struct dt_io_thread* dtio); + +/** + * Stop the io thread + * @param dtio: the io thread. + */ +void dt_io_thread_stop(struct dt_io_thread* dtio); + #endif /* DTSTREAM_H */