]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
Threading.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Mon, 26 Feb 2007 14:49:11 +0000 (14:49 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Mon, 26 Feb 2007 14:49:11 +0000 (14:49 +0000)
git-svn-id: file:///svn/unbound/trunk@146 be551aaa-1e26-0410-a405-d3ace91eadb9

13 files changed:
daemon/daemon.c
daemon/worker.c
daemon/worker.h
doc/Changelog
doc/example.conf
doc/unbound.conf.5
util/locks.c
util/locks.h
util/log.c
util/net_help.c
util/net_help.h
util/netevent.c
util/netevent.h

index e309ad9eea9644c86e47904ae04ad4fc9af2a5a6..5920c2a9f705ad356b2b75598fdbdbc8fd8c4762 100644 (file)
 #include "util/config_file.h"
 #include "services/listen_dnsport.h"
 
+/* @@@ TODO remove */
+#include "pthread.h"
+#include <signal.h>
+
 struct daemon* 
 daemon_init()
 {
@@ -74,22 +78,126 @@ daemon_open_shared_ports(struct daemon* daemon)
        return 1;
 }
 
-void 
-daemon_fork(struct daemon* daemon)
+/**
+ * Allocate empty worker structures. With backptr and thread-number,
+ * from 0..numthread initialised. Used as user arguments to new threads.
+ * @param daemon: the daemon with (new) config settings.
+ */
+static void 
+daemon_create_workers(struct daemon* daemon)
 {
+       int i;
+       log_assert(daemon && daemon->cfg);
        /* only one thread for now */
-       log_assert(daemon);
-       daemon->num = 1;
+       daemon->num = daemon->cfg->num_threads;
+#if !defined(HAVE_PTHREAD) && !defined(HAVE_SOLARIS_THREADS)
+       if(daemon->num != 1) {
+               log_err("configed %d threads, but executable was compiled "
+               "with no thread support. Continuing with 1.", daemon->num);
+               daemon->num = 1;
+       }
+#endif /* no threads */
        daemon->workers = (struct worker**)calloc((size_t)daemon->num, 
                sizeof(struct worker*));
-       if(!(daemon->workers[0] = worker_init(daemon->cfg, daemon->ports, 
-               BUFSZ)))
-               fatal_exit("could not initialize thread # %d", 0);
-       daemon->workers[0]->daemon = daemon;
-       daemon->workers[0]->thread_num = 0;
+       for(i=0; i<daemon->num; i++) {
+               if(!(daemon->workers[i] = worker_create(daemon, i)))
+                       fatal_exit("malloc failure");
+       }
+}
+
+/**
+ * Function to start one thread. 
+ * @param arg: user argument.
+ * @return: void* user return value could be used for thread_join results.
+ */
+static void* 
+thread_start(void* arg)
+{
+       struct worker* worker = (struct worker*)arg;
+       int num = worker->thread_num;
+       ub_thread_blocksigs();
+       if(!worker_init(worker, worker->daemon->cfg, worker->daemon->ports,
+               BUFSZ, 0))
+               fatal_exit("Could not initialize thread #%d", num);
+
+       worker_work(worker);
+       return NULL;
+}
 
+/**
+ * Fork and init the other threads. Main thread returns for special handling.
+ * @param daemon: the daemon with other threads to fork.
+ */
+static void
+daemon_start_others(struct daemon* daemon)
+{
+       int i;
+       log_assert(daemon);
+       log_info("start others");
+       /* skip i=0, is this thread */
+       for(i=1; i<daemon->num; i++) {
+               ub_thread_create(&daemon->workers[i]->thr_id,
+                       thread_start, daemon->workers[i]);
+       }
+}
+
+/**
+ * Stop the other threads.
+ * @param daemon: the daemon with other threads.
+ */
+static void
+daemon_stop_others(struct daemon* daemon)
+{
+       int i, err;
+       log_assert(daemon);
+       log_info("stop others");
+       /* skip i=0, is this thread */
+       /* use i=0 buffer for sending cmds; because we are #0 */
+       for(i=1; i<daemon->num; i++) {
+               log_info("killing thread %d", i);
+               worker_send_cmd(daemon->workers[i], 
+                       daemon->workers[0]->front->udp_buff, worker_cmd_quit);
+       }
+       /** wait for them to quit */
+       for(i=1; i<daemon->num; i++) {
+               /* join it to make sure its dead */
+               log_info("join %d", i);
+               if((err=pthread_join(daemon->workers[i]->thr_id, NULL)))
+                       log_err("pthread_join: %s", strerror(err));
+               log_info("join success %d", i);
+       }
+}
+
+void 
+daemon_fork(struct daemon* daemon)
+{
+       log_assert(daemon);
+       log_info("daemon_fork");
+       /* first create all the worker structures, so we can pass
+        * them to the newly created threads. 
+        */
+       daemon_create_workers(daemon);
+       
+       /* Now create the threads and init the workers.
+        * By the way, this is thread #0 (the main thread).
+        */
+       daemon_start_others(daemon);
+
+       /* Special handling for the main thread. This is the thread
+        * that handles signals.
+        */
+       if(!worker_init(daemon->workers[0], daemon->cfg, daemon->ports,
+               BUFSZ, 1))
+               fatal_exit("Could not initialize main thread");
+       /* see if other threads have started correctly? */
+
+       /* Start resolver service on main thread. */
        log_info("start of service (%s).", PACKAGE_STRING);
        worker_work(daemon->workers[0]);
+
+       /* we exited! a signal happened! Stop other threads */
+       daemon_stop_others(daemon);
+
        if(daemon->workers[0]->need_to_restart)
                daemon->need_to_exit = 0;
        else    daemon->need_to_exit = 1;
index 210af5dad6994e83d3930fed98345e9dc0f5840f..4a7550f35ac826f13b60f7462d2826f52571e13c 100644 (file)
 /** the size of ID and flags, opcode, rcode in dns packet */
 #define ID_AND_FLAGS 4
 
+void 
+worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
+       enum worker_commands cmd)
+{
+       ldns_buffer_clear(buffer);
+       /* like DNS message, length data */
+       ldns_buffer_write_u16(buffer, sizeof(uint32_t));
+       ldns_buffer_write_u32(buffer, (uint32_t)cmd);
+       ldns_buffer_flip(buffer);
+       if(!write_socket(worker->cmd_send_fd, ldns_buffer_begin(buffer),
+               ldns_buffer_limit(buffer)))
+               log_err("write socket: %s", strerror(errno));
+}
+
 /** reply to query with given error code */
 static void 
 replyerror(int r, struct worker* worker)
@@ -149,6 +163,36 @@ worker_check_request(ldns_buffer* pkt)
        return 0;
 }
 
+/** process control messages from the main thread. */
+static int 
+worker_handle_control_cmd(struct comm_point* c, void* arg, int error, 
+       struct comm_reply* ATTR_UNUSED(reply_info))
+{
+       struct worker* worker = (struct worker*)arg;
+       enum worker_commands cmd;
+       if(error != NETEVENT_NOERROR) {
+               if(error == NETEVENT_CLOSED)
+                       comm_base_exit(worker->base);
+               else    log_info("control cmd: %d", error);
+               return 0;
+       }
+       log_info("control cmd");
+       if(ldns_buffer_limit(c->buffer) != sizeof(uint32_t)) {
+               fatal_exit("bad control msg length %d", 
+                       (int)ldns_buffer_limit(c->buffer));
+       }
+       cmd = ldns_buffer_read_u32(c->buffer);
+       switch(cmd) {
+       case worker_cmd_quit:
+               comm_base_exit(worker->base);
+               break;
+       default:
+               log_err("bad command %d", (int)cmd);
+               break;
+       }
+       return 0;
+}
+
 /** handles callbacks from listening event interface */
 static int 
 worker_handle_request(struct comm_point* c, void* arg, int error,
@@ -210,59 +254,97 @@ worker_sighandler(int sig, void* arg)
 }
 
 struct worker* 
-worker_init(struct config_file *cfg, struct listen_port* ports,
-       size_t buffer_size)
+worker_create(struct daemon* daemon, int id)
 {
        struct worker* worker = (struct worker*)calloc(1, 
                sizeof(struct worker));
-       unsigned int seed;
+       int sv[2];
        if(!worker) 
                return NULL;
+       worker->daemon = daemon;
+       worker->thread_num = id;
+       worker->cmd_send_fd = -1;
+       worker->cmd_recv_fd = -1;
+       /* create socketpair to communicate with worker */
+       if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
+               free(worker);
+               log_err("socketpair: %s", strerror(errno));
+               return NULL;
+       }
+       worker->cmd_send_fd = sv[0];
+       worker->cmd_recv_fd = sv[1];
+       return worker;
+}
+
+int
+worker_init(struct worker* worker, struct config_file *cfg, 
+       struct listen_port* ports, size_t buffer_size, int do_sigs)
+{
+       unsigned int seed;
+       int startport;
        worker->need_to_restart = 0;
        worker->base = comm_base_create();
        if(!worker->base) {
                log_err("could not create event handling base");
                worker_delete(worker);
-               return NULL;
+               return 0;
        }
-       worker->comsig = comm_signal_create(worker->base, worker_sighandler, 
-               worker);
-       if(!worker->comsig || !comm_signal_bind(worker->comsig, SIGHUP)
-               || !comm_signal_bind(worker->comsig, SIGINT)
-               || !comm_signal_bind(worker->comsig, SIGQUIT)) {
-               log_err("could not create signal handlers");
-               worker_delete(worker);
-               return NULL;
+       if(do_sigs) {
+               worker->comsig = comm_signal_create(worker->base, 
+                       worker_sighandler, worker);
+               if(!worker->comsig || !comm_signal_bind(worker->comsig, SIGHUP)
+                       || !comm_signal_bind(worker->comsig, SIGINT)
+                       || !comm_signal_bind(worker->comsig, SIGQUIT)) {
+                       log_err("could not create signal handlers");
+                       worker_delete(worker);
+                       return 0;
+               }
+               ub_thread_sig_unblock(SIGHUP);
+               ub_thread_sig_unblock(SIGINT);
+               ub_thread_sig_unblock(SIGQUIT);
+       } else { /* !do_sigs */
+               worker->comsig = 0;
        }
        worker->front = listen_create(worker->base, ports,
                buffer_size, worker_handle_request, worker);
        if(!worker->front) {
                log_err("could not create listening sockets");
                worker_delete(worker);
-               return NULL;
+               return 0;
        }
+       startport  = cfg->outgoing_base_port + 
+               cfg->outgoing_num_ports * worker->thread_num;
        worker->back = outside_network_create(worker->base,
                buffer_size, (size_t)cfg->outgoing_num_ports, cfg->ifs, 
-               cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, 
-               cfg->outgoing_base_port);
+               cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport);
        if(!worker->back) {
                log_err("could not create outgoing sockets");
                worker_delete(worker);
-               return NULL;
+               return 0;
        }
        /* init random(), large table size. */
        if(!(worker->rndstate = (struct ub_randstate*)calloc(1,
                sizeof(struct ub_randstate)))) {
                log_err("malloc rndtable failed.");
                worker_delete(worker);
-               return NULL;
+               return 0;
        }
-       seed = (unsigned int)time(NULL) ^ (unsigned int)getpid();
+       seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^
+               (unsigned int)worker->thread_num;
        if(!ub_initstate(seed, worker->rndstate, RND_STATE_SIZE)) {
                log_err("could not init random numbers.");
                worker_delete(worker);
-               return NULL;
+               return 0;
        }
+       /* start listening to commands */
+       if(!(worker->cmd_com=comm_point_create_local(worker->base, 
+               worker->cmd_recv_fd, buffer_size, worker_handle_control_cmd, 
+               worker))) {
+               log_err("could not create control compt.");
+               worker_delete(worker);
+               return 0;
+       }
+
        /* set forwarder address */
        if(cfg->fwd_address && cfg->fwd_address[0]) {
                if(!worker_set_fwd(worker, cfg->fwd_address, cfg->fwd_port)) {
@@ -270,7 +352,7 @@ worker_init(struct config_file *cfg, struct listen_port* ports,
                        fatal_exit("could not set forwarder address");
                }
        }
-       return worker;
+       return 1;
 }
 
 void 
@@ -284,6 +366,10 @@ worker_delete(struct worker* worker)
 {
        if(!worker) 
                return;
+       close(worker->cmd_send_fd);
+       worker->cmd_send_fd = -1;
+       close(worker->cmd_recv_fd);
+       worker->cmd_recv_fd = -1;
        listen_delete(worker->front);
        outside_network_delete(worker->back);
        comm_signal_delete(worker->comsig);
index 666cd3e08aff9ee01e3e5c7432fa5b07deb3bbec..ec01f5fd8fa22bdc0d985036f53c1770bd008b9b 100644 (file)
@@ -56,6 +56,13 @@ struct ub_randstate;
 /** size of table used for random numbers. large to be more secure. */
 #define RND_STATE_SIZE 256
 
+/** worker commands */
+enum worker_commands {
+       /** make the worker quit */
+       worker_cmd_quit
+};
+
+
 /**
  * Structure holding working information for unbound.
  * Holds globally visible information.
@@ -65,6 +72,12 @@ struct worker {
        struct daemon* daemon;
        /** the thread number (in daemon array). */
        int thread_num;
+       /** thread id */
+       ub_thread_t thr_id;
+       /** fd 0 of socketpair, write commands for worker to this one */
+       int cmd_send_fd;
+       /** fd 1 of socketpair, worker listens on this one */
+       int cmd_recv_fd;
        /** the event base this worker works with */
        struct comm_base* base;
        /** the frontside listening interface where request events come in */
@@ -72,7 +85,9 @@ struct worker {
        /** the backside outside network interface to the auth servers */
        struct outside_network* back;
        /** the signal handler */
-       struct comm_signal *comsig;
+       struct comm_signal* comsig;
+       /** commpoint to listen to commands. */
+       struct comm_point* cmd_com;
 
        /** number of requests currently active */
        int num_requests;
@@ -92,16 +107,27 @@ struct worker {
        int need_to_restart;
 };
 
+/**
+ * Create the worker structure. Bare bones version, zeroed struct,
+ * with backpointers only. Use worker_init on it later.
+ * @param daemon: the daemon that this worker thread is part of.
+ * @param id: the thread number from 0.. numthreads-1.
+ * @return: the new worker or NULL on alloc failure.
+ */
+struct worker* worker_create(struct daemon* daemon, int id);
+
 /**
  * Initialize worker.
  * Allocates event base, listens to ports
+ * @param worker: worker to initialize, created with worker_create.
  * @param cfg: configuration settings.
  * @param ports: list of shared query ports.
  * @param buffer_size: size of datagram buffer.
- * @return: The worker, or NULL on error.
+ * @param do_sigs: if true, worker installs signal handlers.
+ * @return: false on error.
  */
-struct worker* worker_init(struct config_file *cfg, struct listen_port* ports,
-       size_t buffer_size);
+int worker_init(struct worker* worker, struct config_file *cfg, 
+       struct listen_port* ports, size_t buffer_size, int do_sigs);
 
 /**
  * Make worker work.
@@ -122,4 +148,13 @@ void worker_delete(struct worker* worker);
  */
 int worker_set_fwd(struct worker* worker, const char* ip, int port);
 
+/**
+ * Send a command to a worker. Uses blocking writes.
+ * @param worker: worker to send command to.
+ * @param buffer: an empty buffer to use.
+ * @param cmd: command to send.
+ */
+void worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
+        enum worker_commands cmd);
+
 #endif /* DAEMON_WORKER_H */
index 3456b983dba49627537d1c1a1a5e882522afe307..2ed52f9ef75da69be835590f912932cdc1ce7c18 100644 (file)
@@ -1,5 +1,8 @@
 26 February 2007: Wouter
        - ub_random code used to select ID and port.
+       - log code prints thread id.
+       - unbound can thread itself, with reload(HUP) and quit working
+         correctly.
 
 23 February 2007: Wouter
        - Can do reloads on sigHUP. Everything is stopped, and freed,
index e52f37ad690d205e45250fe3ee0cd1b038b217ed..81869f03d75f5715663057e41ce472488d097dc2 100644 (file)
@@ -12,7 +12,7 @@
 server:
        # whitespace is not necessary, but looks cleaner.
 
-       # verbosity number, 0 is least verbose.
+       # verbosity number, 0 is least verbose. 1 is default.
        verbosity: 2
        
        # number of threads to create. 1 disables threading.
index 00e83c495182a6c1269abc2e4fd72a77cb4b7795..bafc6f8a68428df4482f625c7a0254462b68debf 100644 (file)
@@ -45,6 +45,9 @@ clause.
 The verbosity number, level 0 means no verbosity, only errors. Level 1 
 gives operational information. Level 2 gives query level information, 
 output per query. Level 3 gives algorithm level information.  
+Default is level 1. The verbosity can also be increased from the commandline,
+see
+.Xr unbound 8 .
 .It \fBnum-threads:\fR <number>
 The number of threads to create to serve clients. Use 1 for no threading.
 .It \fBport:\fR <port number>
@@ -87,7 +90,7 @@ Sets the working directory for the program.
 .It \fBlogfile:\fR <filename>
 If "" is given, logging goes to stderr, or nowhere once daemonized.
 The logfile is appended to, in the following format: 
-[seconds since 1970] unbound[pid]: type: message. 
+[seconds since 1970] unbound[pid:tid]: type: message. 
 .It \fBpidfile:\fR <filename>
 The process id is written to the file. Default is "unbound.pid". So,
 kill -HUP `cat /etc/unbound/unbound.pid` will trigger a reload,
index 1dbdedd862dbceb0167ba4f36ce8c737b113b03d..7de126ac6d03e1720e0b505b03ce798e06b0b569 100644 (file)
 
 #include "config.h"
 #include "util/locks.h"
+#include <signal.h>
+
+void 
+ub_thread_blocksigs()
+{
+#ifdef HAVE_PTHREAD
+       int err;
+       sigset_t sigset;
+       sigfillset(&sigset);
+       log_info("blocking signals");
+       if((err=pthread_sigmask(SIG_SETMASK, &sigset, NULL)))
+               fatal_exit("pthread_sigmask: %s", strerror(err));
+#else
+#  ifdef HAVE_SOLARIS_THREADS
+       int err;
+       sigset_t sigset;
+       sigfillset(&sigset);
+       if((err=thr_sigsetmask(SIG_SETMASK, &sigset, NULL)))
+               fatal_exit("thr_sigsetmask: %s", strerror(err));
+#  else 
+       /* have nothing, do nothing */
+#  endif /* HAVE_SOLARIS_THREADS */
+#endif /* HAVE_PTHREAD */
+}
+
+void ub_thread_sig_unblock(int sig)
+{
+#ifdef HAVE_PTHREAD
+       int err;
+       sigset_t sigset;
+       sigemptyset(&sigset);
+       sigaddset(&sigset, sig);
+       log_info("unblocking signal %d", sig);
+       if((err=pthread_sigmask(SIG_UNBLOCK, &sigset, NULL)))
+               fatal_exit("pthread_sigmask: %s", strerror(err));
+#else
+#  ifdef HAVE_SOLARIS_THREADS
+       int err;
+       sigset_t sigset;
+       sigemptyset(&sigset);
+       sigaddset(&sigset, sig);
+       if((err=thr_sigsetmask(SIG_UNBLOCK, &sigset, NULL)))
+               fatal_exit("thr_sigsetmask: %s", strerror(err));
+#  else 
+       /* have nothing, do nothing */
+#  endif /* HAVE_SOLARIS_THREADS */
+#endif /* HAVE_PTHREAD */
+}
 
index 317187cf701f7f336f04107f733e02a934518b66..e901e79e5e5eeb1472e7364deb87d815ad8f0ed7 100644 (file)
@@ -121,6 +121,8 @@ typedef pthread_spinlock_t lock_quick_t;
 typedef pthread_t ub_thread_t;
 /** Pass where to store tread_t in thr. Use default NULL attributes. */
 #define ub_thread_create(thr, func, arg) LOCKRET(pthread_create(thr, NULL, func, arg))
+/** get self id. */
+#define ub_thread_self() pthread_self()
 
 #else /* we do not HAVE_PTHREAD */
 #ifdef HAVE_SOLARIS_THREADS
@@ -150,6 +152,7 @@ typedef mutex_t lock_quick_t;
 /** Thread creation, create a default thread. */
 typedef thread_t ub_thread_t;
 #define ub_thread_create(thr, func, arg) LOCKRET(thr_create(NULL, NULL, func, arg, NULL, thr))
+#define ub_thread_self() thr_self()
 
 #else /* we do not HAVE_SOLARIS_THREADS and no PTHREADS */
 
@@ -182,7 +185,20 @@ typedef int ub_thread_t;
 #define ub_thread_create(thr, func, arg) \
        fatal_exit("%s %d called thread create, but no thread support "  \
                "has been compiled in.",  __FILE__, __LINE__)
+#define ub_thread_self() 0
 
 #endif /* HAVE_SOLARIS_THREADS */
 #endif /* HAVE_PTHREAD */
+
+/**
+ * Block all signals for this thread.
+ * fatal exit on error.
+ */
+void ub_thread_blocksigs();
+
+/**
+ * unblock one signal for this thread.
+ */
+void ub_thread_sig_unblock(int sig);
+
 #endif /* UTIL_LOCKS_H */
index 17350dea58160d07c949edbf7772d00aeaec2811..de0dd3bddc1e76312b7c95664429219d7cd1020a 100644 (file)
@@ -39,6 +39,7 @@
 
 #include "config.h"
 #include "util/log.h"
+#include "util/locks.h"
 #ifdef HAVE_TIME_H
 #include <time.h>
 #endif
@@ -64,6 +65,7 @@ log_init(const char* filename)
                        strerror(errno));
                return;
        }
+       verbose(VERB_DETAIL, "switching to logfile %s", filename);
        if(logfile && logfile != stderr)
                fclose(logfile);
        logfile = f;
@@ -75,8 +77,10 @@ log_vmsg(const char* type, const char *format, va_list args)
        char message[MAXSYSLOGMSGLEN];
        const char* ident="unbound";
        vsnprintf(message, sizeof(message), format, args);
-       fprintf(logfile, "[%d] %s[%d] %s: %s\n",
-               (int)time(NULL), ident, (int)getpid(), type, message);
+       fprintf(logfile, "[%d] %s[%d:%x] %s: %s\n",
+               (int)time(NULL), ident, (int)getpid(), 
+               (int)ub_thread_self(),
+               type, message);
        fflush(logfile);
 }
 
index ef2ee72acbe32d6edc4e5a121c36b1a61fc63998..8ffdf44caa870524d03d3b1290b9abe1f236e873 100644 (file)
@@ -49,3 +49,23 @@ str_is_ip6(const char* str)
        else    return 0;
 }
 
+int 
+write_socket(int s, const void *buf, size_t size)
+{
+        const char* data = (const char*)buf;
+        size_t total_count = 0;
+
+        while (total_count < size) {
+                ssize_t count
+                        = write(s, data + total_count, size - total_count);
+                if (count == -1) {
+                        if (errno != EAGAIN && errno != EINTR) {
+                                return 0;
+                        } else {
+                                continue;
+                        }
+                }
+                total_count += count;
+        }
+        return 1;
+}
index a78f1b9519f2d4e72194109c0f461535133e85cd..8fdcc548172a8b6a14345f0f6acb37c2bb4b2c14 100644 (file)
  */
 int str_is_ip6(const char* str);
 
+/**
+ * Write (blocking) to a nonblocking socket.
+ * @param s: fd.
+ * @param buf: data buffer.
+ * @param size: length of data to send.
+ * @return: 0 on error. errno is set.
+ */
+int
+write_socket(int s, const void *buf, size_t size);
+
 #endif /* NET_HELP_H */
index f0a8958359c3c079a2a8afdc5dd80fa747fd995c..f183be6a786a473b2ca665c80f7591b2a412a603 100644 (file)
@@ -137,6 +137,9 @@ static void comm_timer_callback(int fd, short event, void* arg);
  */
 static void comm_signal_callback(int fd, short event, void* arg);
 
+/** libevent callback for AF_UNIX fds. */
+static void comm_point_local_handle_callback(int fd, short event, void* arg);
+
 /** create a tcp handler with a parent */
 static struct comm_point* comm_point_create_tcp_handler(
        struct comm_base *base, struct comm_point* parent, size_t bufsize,
@@ -327,7 +330,8 @@ tcp_callback_writer(struct comm_point* c)
 {
        log_assert(c->type == comm_tcp);
        ldns_buffer_clear(c->buffer);
-       c->tcp_is_reading = 1;
+       if(c->tcp_do_toggle_rw)
+               c->tcp_is_reading = 1;
        c->tcp_byte_count = 0;
        comm_point_stop_listening(c);
        /* for listening socket */
@@ -339,11 +343,13 @@ static void
 tcp_callback_reader(struct comm_point* c)
 {
        struct comm_reply rep;
-       log_assert(c->type == comm_tcp);
+       log_assert(c->type == comm_tcp || c->type == comm_local);
        ldns_buffer_flip(c->buffer);
-       c->tcp_is_reading = 0;
+       if(c->tcp_do_toggle_rw)
+               c->tcp_is_reading = 0;
        c->tcp_byte_count = 0;
-       comm_point_stop_listening(c);
+       if(c->type == comm_tcp)
+               comm_point_stop_listening(c);
        rep.c = c;
        rep.addrlen = 0;
        if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &rep) ) {
@@ -354,13 +360,14 @@ tcp_callback_reader(struct comm_point* c)
 /** Handle tcp reading callback. 
  * @param fd: file descriptor of socket.
  * @param c: comm point to read from into buffer.
+ * @param short_ok: if true, very short packets are OK (for comm_local).
  * @return: 0 on error 
  */
 static int
-comm_point_tcp_handle_read(int fd, struct comm_point* c)
+comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
 {
        ssize_t r;
-       log_assert(c->type == comm_tcp);
+       log_assert(c->type == comm_tcp || c->type == comm_local);
        if(!c->tcp_is_reading)
                return 0;
 
@@ -386,7 +393,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c)
                }
                ldns_buffer_set_limit(c->buffer, 
                        ldns_buffer_read_u16_at(c->buffer, 0));
-               if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
+               if(!short_ok && 
+                       ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
                        verbose(VERB_DETAIL, "tcp: dropped bogus too short.");
                        return 0;
                }
@@ -462,7 +470,7 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg)
        log_assert(c->type == comm_tcp);
 
        if(event&EV_READ) {
-               if(!comm_point_tcp_handle_read(fd, c)) {
+               if(!comm_point_tcp_handle_read(fd, c, 0)) {
                        reclaim_tcp_handler(c);
                        if(!c->tcp_do_close)
                                (void)(*c->callback)(c, c->cb_arg, 
@@ -490,6 +498,20 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg)
        log_err("Ignored event %d for tcphdl.", event);
 }
 
+static void comm_point_local_handle_callback(int fd, short event, void* arg)
+{
+       struct comm_point* c = (struct comm_point*)arg;
+       log_assert(c->type == comm_local);
+
+       if(event&EV_READ) {
+               if(!comm_point_tcp_handle_read(fd, c, 1)) {
+                       log_err("error in localhdl");
+               }
+               return;
+       }
+       log_err("Ignored event %d for localhdl.", event);
+}
+
 struct comm_point* 
 comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
        comm_point_callback_t* callback, void* callback_arg)
@@ -571,7 +593,7 @@ comm_point_create_tcp_handler(struct comm_base *base,
        c->type = comm_tcp;
        c->tcp_do_close = 0;
        c->do_not_close = 0;
-       c->tcp_do_toggle_rw = 0;
+       c->tcp_do_toggle_rw = 1;
        c->callback = callback;
        c->cb_arg = callback_arg;
        /* add to parent free list */
@@ -653,6 +675,56 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize,
        return c;
 }
 
+struct comm_point* 
+comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
+        comm_point_callback_t* callback, void* callback_arg)
+{
+       struct comm_point* c = (struct comm_point*)calloc(1,
+               sizeof(struct comm_point));
+       short evbits;
+       if(!c)
+               return NULL;
+       c->ev = (struct internal_event*)calloc(1,
+               sizeof(struct internal_event));
+       if(!c->ev) {
+               free(c);
+               return NULL;
+       }
+       c->fd = fd;
+       c->buffer = ldns_buffer_new(bufsize);
+       if(!c->buffer) {
+               free(c->ev);
+               free(c);
+               return NULL;
+       }
+       c->timeout = NULL;
+       c->tcp_is_reading = 1;
+       c->tcp_byte_count = 0;
+       c->tcp_parent = NULL;
+       c->max_tcp_count = 0;
+       c->tcp_handlers = NULL;
+       c->tcp_free = NULL;
+       c->type = comm_local;
+       c->tcp_do_close = 0;
+       c->do_not_close = 1;
+       c->tcp_do_toggle_rw = 0;
+       c->callback = callback;
+       c->cb_arg = callback_arg;
+       /* libevent stuff */
+       evbits = EV_PERSIST | EV_READ;
+       event_set(&c->ev->ev, c->fd, evbits, comm_point_local_handle_callback, 
+               c);
+       if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
+               event_add(&c->ev->ev, c->timeout) != 0 )
+       {
+               log_err("could not add tcphdl event");
+               free(c->ev);
+               free(c);
+               return NULL;
+       }
+       return c;
+}
+
 void 
 comm_point_close(struct comm_point* c)
 {
index 9b3098b6feab4840215a79d3d96ff3611236a041..c6495549f594e0efa863827ae8e185729a2785c3 100644 (file)
@@ -135,7 +135,9 @@ struct comm_point {
                /** TCP accept socket - only creates handlers if readable. */
                comm_tcp_accept, 
                /** TCP handler socket - handle byteperbyte readwrite. */
-               comm_tcp  
+               comm_tcp,
+               /** AF_UNIX socket - for internal commands. */
+               comm_local
        } type;
 
        /* ---------- Behaviour ----------- */
@@ -286,6 +288,19 @@ struct comm_point* comm_point_create_tcp(struct comm_base* base,
        int fd, int num, size_t bufsize, 
        comm_point_callback_t* callback, void* callback_arg);
 
+/**
+ * Create commpoint to listen to a local domain file descriptor.
+ * @param base: in which base to alloc the commpoint.
+ * @param fd: file descriptor of open AF_UNIX socket set to listen nonblocking.
+ * @param bufsize: size of buffer to create for handlers.
+ * @param callback: callback function pointer for the handler.
+ * @param callback_arg: will be passed to your callback function.
+ * @return: the commpoint or NULL on error.
+ */
+struct comm_point* comm_point_create_local(struct comm_base* base,
+       int fd, size_t bufsize, 
+       comm_point_callback_t* callback, void* callback_arg);
+
 /**
  * Close a comm point fd.
  * @param c: comm point to close.