From: Wouter Wijngaards Date: Mon, 26 Feb 2007 14:49:11 +0000 (+0000) Subject: Threading. X-Git-Tag: release-0.1~25 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e4d39152b334a81d92c0c1c5c3bed38ac7d7f04e;p=thirdparty%2Funbound.git Threading. git-svn-id: file:///svn/unbound/trunk@146 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/daemon/daemon.c b/daemon/daemon.c index e309ad9ee..5920c2a9f 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -49,6 +49,10 @@ #include "util/config_file.h" #include "services/listen_dnsport.h" +/* @@@ TODO remove */ +#include "pthread.h" +#include + struct daemon* daemon_init() { @@ -74,22 +78,126 @@ daemon_open_shared_ports(struct daemon* daemon) return 1; } -void -daemon_fork(struct daemon* daemon) +/** + * Allocate empty worker structures. With backptr and thread-number, + * from 0..numthread initialised. Used as user arguments to new threads. + * @param daemon: the daemon with (new) config settings. + */ +static void +daemon_create_workers(struct daemon* daemon) { + int i; + log_assert(daemon && daemon->cfg); /* only one thread for now */ - log_assert(daemon); - daemon->num = 1; + daemon->num = daemon->cfg->num_threads; +#if !defined(HAVE_PTHREAD) && !defined(HAVE_SOLARIS_THREADS) + if(daemon->num != 1) { + log_err("configed %d threads, but executable was compiled " + "with no thread support. Continuing with 1.", daemon->num); + daemon->num = 1; + } +#endif /* no threads */ daemon->workers = (struct worker**)calloc((size_t)daemon->num, sizeof(struct worker*)); - if(!(daemon->workers[0] = worker_init(daemon->cfg, daemon->ports, - BUFSZ))) - fatal_exit("could not initialize thread # %d", 0); - daemon->workers[0]->daemon = daemon; - daemon->workers[0]->thread_num = 0; + for(i=0; inum; i++) { + if(!(daemon->workers[i] = worker_create(daemon, i))) + fatal_exit("malloc failure"); + } +} + +/** + * Function to start one thread. + * @param arg: user argument. + * @return: void* user return value could be used for thread_join results. + */ +static void* +thread_start(void* arg) +{ + struct worker* worker = (struct worker*)arg; + int num = worker->thread_num; + ub_thread_blocksigs(); + if(!worker_init(worker, worker->daemon->cfg, worker->daemon->ports, + BUFSZ, 0)) + fatal_exit("Could not initialize thread #%d", num); + + worker_work(worker); + return NULL; +} +/** + * Fork and init the other threads. Main thread returns for special handling. + * @param daemon: the daemon with other threads to fork. + */ +static void +daemon_start_others(struct daemon* daemon) +{ + int i; + log_assert(daemon); + log_info("start others"); + /* skip i=0, is this thread */ + for(i=1; inum; i++) { + ub_thread_create(&daemon->workers[i]->thr_id, + thread_start, daemon->workers[i]); + } +} + +/** + * Stop the other threads. + * @param daemon: the daemon with other threads. + */ +static void +daemon_stop_others(struct daemon* daemon) +{ + int i, err; + log_assert(daemon); + log_info("stop others"); + /* skip i=0, is this thread */ + /* use i=0 buffer for sending cmds; because we are #0 */ + for(i=1; inum; i++) { + log_info("killing thread %d", i); + worker_send_cmd(daemon->workers[i], + daemon->workers[0]->front->udp_buff, worker_cmd_quit); + } + /** wait for them to quit */ + for(i=1; inum; i++) { + /* join it to make sure its dead */ + log_info("join %d", i); + if((err=pthread_join(daemon->workers[i]->thr_id, NULL))) + log_err("pthread_join: %s", strerror(err)); + log_info("join success %d", i); + } +} + +void +daemon_fork(struct daemon* daemon) +{ + log_assert(daemon); + log_info("daemon_fork"); + /* first create all the worker structures, so we can pass + * them to the newly created threads. + */ + daemon_create_workers(daemon); + + /* Now create the threads and init the workers. + * By the way, this is thread #0 (the main thread). + */ + daemon_start_others(daemon); + + /* Special handling for the main thread. This is the thread + * that handles signals. + */ + if(!worker_init(daemon->workers[0], daemon->cfg, daemon->ports, + BUFSZ, 1)) + fatal_exit("Could not initialize main thread"); + /* see if other threads have started correctly? */ + + /* Start resolver service on main thread. */ log_info("start of service (%s).", PACKAGE_STRING); worker_work(daemon->workers[0]); + + /* we exited! a signal happened! Stop other threads */ + daemon_stop_others(daemon); + if(daemon->workers[0]->need_to_restart) daemon->need_to_exit = 0; else daemon->need_to_exit = 1; diff --git a/daemon/worker.c b/daemon/worker.c index 210af5dad..4a7550f35 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -60,6 +60,20 @@ /** the size of ID and flags, opcode, rcode in dns packet */ #define ID_AND_FLAGS 4 +void +worker_send_cmd(struct worker* worker, ldns_buffer* buffer, + enum worker_commands cmd) +{ + ldns_buffer_clear(buffer); + /* like DNS message, length data */ + ldns_buffer_write_u16(buffer, sizeof(uint32_t)); + ldns_buffer_write_u32(buffer, (uint32_t)cmd); + ldns_buffer_flip(buffer); + if(!write_socket(worker->cmd_send_fd, ldns_buffer_begin(buffer), + ldns_buffer_limit(buffer))) + log_err("write socket: %s", strerror(errno)); +} + /** reply to query with given error code */ static void replyerror(int r, struct worker* worker) @@ -149,6 +163,36 @@ worker_check_request(ldns_buffer* pkt) return 0; } +/** process control messages from the main thread. */ +static int +worker_handle_control_cmd(struct comm_point* c, void* arg, int error, + struct comm_reply* ATTR_UNUSED(reply_info)) +{ + struct worker* worker = (struct worker*)arg; + enum worker_commands cmd; + if(error != NETEVENT_NOERROR) { + if(error == NETEVENT_CLOSED) + comm_base_exit(worker->base); + else log_info("control cmd: %d", error); + return 0; + } + log_info("control cmd"); + if(ldns_buffer_limit(c->buffer) != sizeof(uint32_t)) { + fatal_exit("bad control msg length %d", + (int)ldns_buffer_limit(c->buffer)); + } + cmd = ldns_buffer_read_u32(c->buffer); + switch(cmd) { + case worker_cmd_quit: + comm_base_exit(worker->base); + break; + default: + log_err("bad command %d", (int)cmd); + break; + } + return 0; +} + /** handles callbacks from listening event interface */ static int worker_handle_request(struct comm_point* c, void* arg, int error, @@ -210,59 +254,97 @@ worker_sighandler(int sig, void* arg) } struct worker* -worker_init(struct config_file *cfg, struct listen_port* ports, - size_t buffer_size) +worker_create(struct daemon* daemon, int id) { struct worker* worker = (struct worker*)calloc(1, sizeof(struct worker)); - unsigned int seed; + int sv[2]; if(!worker) return NULL; + worker->daemon = daemon; + worker->thread_num = id; + worker->cmd_send_fd = -1; + worker->cmd_recv_fd = -1; + /* create socketpair to communicate with worker */ + if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { + free(worker); + log_err("socketpair: %s", strerror(errno)); + return NULL; + } + worker->cmd_send_fd = sv[0]; + worker->cmd_recv_fd = sv[1]; + return worker; +} + +int +worker_init(struct worker* worker, struct config_file *cfg, + struct listen_port* ports, size_t buffer_size, int do_sigs) +{ + unsigned int seed; + int startport; worker->need_to_restart = 0; worker->base = comm_base_create(); if(!worker->base) { log_err("could not create event handling base"); worker_delete(worker); - return NULL; + return 0; } - worker->comsig = comm_signal_create(worker->base, worker_sighandler, - worker); - if(!worker->comsig || !comm_signal_bind(worker->comsig, SIGHUP) - || !comm_signal_bind(worker->comsig, SIGINT) - || !comm_signal_bind(worker->comsig, SIGQUIT)) { - log_err("could not create signal handlers"); - worker_delete(worker); - return NULL; + if(do_sigs) { + worker->comsig = comm_signal_create(worker->base, + worker_sighandler, worker); + if(!worker->comsig || !comm_signal_bind(worker->comsig, SIGHUP) + || !comm_signal_bind(worker->comsig, SIGINT) + || !comm_signal_bind(worker->comsig, SIGQUIT)) { + log_err("could not create signal handlers"); + worker_delete(worker); + return 0; + } + ub_thread_sig_unblock(SIGHUP); + ub_thread_sig_unblock(SIGINT); + ub_thread_sig_unblock(SIGQUIT); + } else { /* !do_sigs */ + worker->comsig = 0; } worker->front = listen_create(worker->base, ports, buffer_size, worker_handle_request, worker); if(!worker->front) { log_err("could not create listening sockets"); worker_delete(worker); - return NULL; + return 0; } + startport = cfg->outgoing_base_port + + cfg->outgoing_num_ports * worker->thread_num; worker->back = outside_network_create(worker->base, buffer_size, (size_t)cfg->outgoing_num_ports, cfg->ifs, - cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, - cfg->outgoing_base_port); + cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport); if(!worker->back) { log_err("could not create outgoing sockets"); worker_delete(worker); - return NULL; + return 0; } /* init random(), large table size. */ if(!(worker->rndstate = (struct ub_randstate*)calloc(1, sizeof(struct ub_randstate)))) { log_err("malloc rndtable failed."); worker_delete(worker); - return NULL; + return 0; } - seed = (unsigned int)time(NULL) ^ (unsigned int)getpid(); + seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^ + (unsigned int)worker->thread_num; if(!ub_initstate(seed, worker->rndstate, RND_STATE_SIZE)) { log_err("could not init random numbers."); worker_delete(worker); - return NULL; + return 0; } + /* start listening to commands */ + if(!(worker->cmd_com=comm_point_create_local(worker->base, + worker->cmd_recv_fd, buffer_size, worker_handle_control_cmd, + worker))) { + log_err("could not create control compt."); + worker_delete(worker); + return 0; + } + /* set forwarder address */ if(cfg->fwd_address && cfg->fwd_address[0]) { if(!worker_set_fwd(worker, cfg->fwd_address, cfg->fwd_port)) { @@ -270,7 +352,7 @@ worker_init(struct config_file *cfg, struct listen_port* ports, fatal_exit("could not set forwarder address"); } } - return worker; + return 1; } void @@ -284,6 +366,10 @@ worker_delete(struct worker* worker) { if(!worker) return; + close(worker->cmd_send_fd); + worker->cmd_send_fd = -1; + close(worker->cmd_recv_fd); + worker->cmd_recv_fd = -1; listen_delete(worker->front); outside_network_delete(worker->back); comm_signal_delete(worker->comsig); diff --git a/daemon/worker.h b/daemon/worker.h index 666cd3e08..ec01f5fd8 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -56,6 +56,13 @@ struct ub_randstate; /** size of table used for random numbers. large to be more secure. */ #define RND_STATE_SIZE 256 +/** worker commands */ +enum worker_commands { + /** make the worker quit */ + worker_cmd_quit +}; + + /** * Structure holding working information for unbound. * Holds globally visible information. @@ -65,6 +72,12 @@ struct worker { struct daemon* daemon; /** the thread number (in daemon array). */ int thread_num; + /** thread id */ + ub_thread_t thr_id; + /** fd 0 of socketpair, write commands for worker to this one */ + int cmd_send_fd; + /** fd 1 of socketpair, worker listens on this one */ + int cmd_recv_fd; /** the event base this worker works with */ struct comm_base* base; /** the frontside listening interface where request events come in */ @@ -72,7 +85,9 @@ struct worker { /** the backside outside network interface to the auth servers */ struct outside_network* back; /** the signal handler */ - struct comm_signal *comsig; + struct comm_signal* comsig; + /** commpoint to listen to commands. */ + struct comm_point* cmd_com; /** number of requests currently active */ int num_requests; @@ -92,16 +107,27 @@ struct worker { int need_to_restart; }; +/** + * Create the worker structure. Bare bones version, zeroed struct, + * with backpointers only. Use worker_init on it later. + * @param daemon: the daemon that this worker thread is part of. + * @param id: the thread number from 0.. numthreads-1. + * @return: the new worker or NULL on alloc failure. + */ +struct worker* worker_create(struct daemon* daemon, int id); + /** * Initialize worker. * Allocates event base, listens to ports + * @param worker: worker to initialize, created with worker_create. * @param cfg: configuration settings. * @param ports: list of shared query ports. * @param buffer_size: size of datagram buffer. - * @return: The worker, or NULL on error. + * @param do_sigs: if true, worker installs signal handlers. + * @return: false on error. */ -struct worker* worker_init(struct config_file *cfg, struct listen_port* ports, - size_t buffer_size); +int worker_init(struct worker* worker, struct config_file *cfg, + struct listen_port* ports, size_t buffer_size, int do_sigs); /** * Make worker work. @@ -122,4 +148,13 @@ void worker_delete(struct worker* worker); */ int worker_set_fwd(struct worker* worker, const char* ip, int port); +/** + * Send a command to a worker. Uses blocking writes. + * @param worker: worker to send command to. + * @param buffer: an empty buffer to use. + * @param cmd: command to send. + */ +void worker_send_cmd(struct worker* worker, ldns_buffer* buffer, + enum worker_commands cmd); + #endif /* DAEMON_WORKER_H */ diff --git a/doc/Changelog b/doc/Changelog index 3456b983d..2ed52f9ef 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,5 +1,8 @@ 26 February 2007: Wouter - ub_random code used to select ID and port. + - log code prints thread id. + - unbound can thread itself, with reload(HUP) and quit working + correctly. 23 February 2007: Wouter - Can do reloads on sigHUP. Everything is stopped, and freed, diff --git a/doc/example.conf b/doc/example.conf index e52f37ad6..81869f03d 100644 --- a/doc/example.conf +++ b/doc/example.conf @@ -12,7 +12,7 @@ server: # whitespace is not necessary, but looks cleaner. - # verbosity number, 0 is least verbose. + # verbosity number, 0 is least verbose. 1 is default. verbosity: 2 # number of threads to create. 1 disables threading. diff --git a/doc/unbound.conf.5 b/doc/unbound.conf.5 index 00e83c495..bafc6f8a6 100644 --- a/doc/unbound.conf.5 +++ b/doc/unbound.conf.5 @@ -45,6 +45,9 @@ clause. The verbosity number, level 0 means no verbosity, only errors. Level 1 gives operational information. Level 2 gives query level information, output per query. Level 3 gives algorithm level information. +Default is level 1. The verbosity can also be increased from the commandline, +see +.Xr unbound 8 . .It \fBnum-threads:\fR The number of threads to create to serve clients. Use 1 for no threading. .It \fBport:\fR @@ -87,7 +90,7 @@ Sets the working directory for the program. .It \fBlogfile:\fR If "" is given, logging goes to stderr, or nowhere once daemonized. The logfile is appended to, in the following format: -[seconds since 1970] unbound[pid]: type: message. +[seconds since 1970] unbound[pid:tid]: type: message. .It \fBpidfile:\fR The process id is written to the file. Default is "unbound.pid". So, kill -HUP `cat /etc/unbound/unbound.pid` will trigger a reload, diff --git a/util/locks.c b/util/locks.c index 1dbdedd86..7de126ac6 100644 --- a/util/locks.c +++ b/util/locks.c @@ -41,4 +41,52 @@ #include "config.h" #include "util/locks.h" +#include + +void +ub_thread_blocksigs() +{ +#ifdef HAVE_PTHREAD + int err; + sigset_t sigset; + sigfillset(&sigset); + log_info("blocking signals"); + if((err=pthread_sigmask(SIG_SETMASK, &sigset, NULL))) + fatal_exit("pthread_sigmask: %s", strerror(err)); +#else +# ifdef HAVE_SOLARIS_THREADS + int err; + sigset_t sigset; + sigfillset(&sigset); + if((err=thr_sigsetmask(SIG_SETMASK, &sigset, NULL))) + fatal_exit("thr_sigsetmask: %s", strerror(err)); +# else + /* have nothing, do nothing */ +# endif /* HAVE_SOLARIS_THREADS */ +#endif /* HAVE_PTHREAD */ +} + +void ub_thread_sig_unblock(int sig) +{ +#ifdef HAVE_PTHREAD + int err; + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, sig); + log_info("unblocking signal %d", sig); + if((err=pthread_sigmask(SIG_UNBLOCK, &sigset, NULL))) + fatal_exit("pthread_sigmask: %s", strerror(err)); +#else +# ifdef HAVE_SOLARIS_THREADS + int err; + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, sig); + if((err=thr_sigsetmask(SIG_UNBLOCK, &sigset, NULL))) + fatal_exit("thr_sigsetmask: %s", strerror(err)); +# else + /* have nothing, do nothing */ +# endif /* HAVE_SOLARIS_THREADS */ +#endif /* HAVE_PTHREAD */ +} diff --git a/util/locks.h b/util/locks.h index 317187cf7..e901e79e5 100644 --- a/util/locks.h +++ b/util/locks.h @@ -121,6 +121,8 @@ typedef pthread_spinlock_t lock_quick_t; typedef pthread_t ub_thread_t; /** Pass where to store tread_t in thr. Use default NULL attributes. */ #define ub_thread_create(thr, func, arg) LOCKRET(pthread_create(thr, NULL, func, arg)) +/** get self id. */ +#define ub_thread_self() pthread_self() #else /* we do not HAVE_PTHREAD */ #ifdef HAVE_SOLARIS_THREADS @@ -150,6 +152,7 @@ typedef mutex_t lock_quick_t; /** Thread creation, create a default thread. */ typedef thread_t ub_thread_t; #define ub_thread_create(thr, func, arg) LOCKRET(thr_create(NULL, NULL, func, arg, NULL, thr)) +#define ub_thread_self() thr_self() #else /* we do not HAVE_SOLARIS_THREADS and no PTHREADS */ @@ -182,7 +185,20 @@ typedef int ub_thread_t; #define ub_thread_create(thr, func, arg) \ fatal_exit("%s %d called thread create, but no thread support " \ "has been compiled in.", __FILE__, __LINE__) +#define ub_thread_self() 0 #endif /* HAVE_SOLARIS_THREADS */ #endif /* HAVE_PTHREAD */ + +/** + * Block all signals for this thread. + * fatal exit on error. + */ +void ub_thread_blocksigs(); + +/** + * unblock one signal for this thread. + */ +void ub_thread_sig_unblock(int sig); + #endif /* UTIL_LOCKS_H */ diff --git a/util/log.c b/util/log.c index 17350dea5..de0dd3bdd 100644 --- a/util/log.c +++ b/util/log.c @@ -39,6 +39,7 @@ #include "config.h" #include "util/log.h" +#include "util/locks.h" #ifdef HAVE_TIME_H #include #endif @@ -64,6 +65,7 @@ log_init(const char* filename) strerror(errno)); return; } + verbose(VERB_DETAIL, "switching to logfile %s", filename); if(logfile && logfile != stderr) fclose(logfile); logfile = f; @@ -75,8 +77,10 @@ log_vmsg(const char* type, const char *format, va_list args) char message[MAXSYSLOGMSGLEN]; const char* ident="unbound"; vsnprintf(message, sizeof(message), format, args); - fprintf(logfile, "[%d] %s[%d] %s: %s\n", - (int)time(NULL), ident, (int)getpid(), type, message); + fprintf(logfile, "[%d] %s[%d:%x] %s: %s\n", + (int)time(NULL), ident, (int)getpid(), + (int)ub_thread_self(), + type, message); fflush(logfile); } diff --git a/util/net_help.c b/util/net_help.c index ef2ee72ac..8ffdf44ca 100644 --- a/util/net_help.c +++ b/util/net_help.c @@ -49,3 +49,23 @@ str_is_ip6(const char* str) else return 0; } +int +write_socket(int s, const void *buf, size_t size) +{ + const char* data = (const char*)buf; + size_t total_count = 0; + + while (total_count < size) { + ssize_t count + = write(s, data + total_count, size - total_count); + if (count == -1) { + if (errno != EAGAIN && errno != EINTR) { + return 0; + } else { + continue; + } + } + total_count += count; + } + return 1; +} diff --git a/util/net_help.h b/util/net_help.h index a78f1b951..8fdcc5481 100644 --- a/util/net_help.h +++ b/util/net_help.h @@ -49,4 +49,14 @@ */ int str_is_ip6(const char* str); +/** + * Write (blocking) to a nonblocking socket. + * @param s: fd. + * @param buf: data buffer. + * @param size: length of data to send. + * @return: 0 on error. errno is set. + */ +int +write_socket(int s, const void *buf, size_t size); + #endif /* NET_HELP_H */ diff --git a/util/netevent.c b/util/netevent.c index f0a895835..f183be6a7 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -137,6 +137,9 @@ static void comm_timer_callback(int fd, short event, void* arg); */ static void comm_signal_callback(int fd, short event, void* arg); +/** libevent callback for AF_UNIX fds. */ +static void comm_point_local_handle_callback(int fd, short event, void* arg); + /** create a tcp handler with a parent */ static struct comm_point* comm_point_create_tcp_handler( struct comm_base *base, struct comm_point* parent, size_t bufsize, @@ -327,7 +330,8 @@ tcp_callback_writer(struct comm_point* c) { log_assert(c->type == comm_tcp); ldns_buffer_clear(c->buffer); - c->tcp_is_reading = 1; + if(c->tcp_do_toggle_rw) + c->tcp_is_reading = 1; c->tcp_byte_count = 0; comm_point_stop_listening(c); /* for listening socket */ @@ -339,11 +343,13 @@ static void tcp_callback_reader(struct comm_point* c) { struct comm_reply rep; - log_assert(c->type == comm_tcp); + log_assert(c->type == comm_tcp || c->type == comm_local); ldns_buffer_flip(c->buffer); - c->tcp_is_reading = 0; + if(c->tcp_do_toggle_rw) + c->tcp_is_reading = 0; c->tcp_byte_count = 0; - comm_point_stop_listening(c); + if(c->type == comm_tcp) + comm_point_stop_listening(c); rep.c = c; rep.addrlen = 0; if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &rep) ) { @@ -354,13 +360,14 @@ tcp_callback_reader(struct comm_point* c) /** Handle tcp reading callback. * @param fd: file descriptor of socket. * @param c: comm point to read from into buffer. + * @param short_ok: if true, very short packets are OK (for comm_local). * @return: 0 on error */ static int -comm_point_tcp_handle_read(int fd, struct comm_point* c) +comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok) { ssize_t r; - log_assert(c->type == comm_tcp); + log_assert(c->type == comm_tcp || c->type == comm_local); if(!c->tcp_is_reading) return 0; @@ -386,7 +393,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c) } ldns_buffer_set_limit(c->buffer, ldns_buffer_read_u16_at(c->buffer, 0)); - if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) { + if(!short_ok && + ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) { verbose(VERB_DETAIL, "tcp: dropped bogus too short."); return 0; } @@ -462,7 +470,7 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg) log_assert(c->type == comm_tcp); if(event&EV_READ) { - if(!comm_point_tcp_handle_read(fd, c)) { + if(!comm_point_tcp_handle_read(fd, c, 0)) { reclaim_tcp_handler(c); if(!c->tcp_do_close) (void)(*c->callback)(c, c->cb_arg, @@ -490,6 +498,20 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg) log_err("Ignored event %d for tcphdl.", event); } +static void comm_point_local_handle_callback(int fd, short event, void* arg) +{ + struct comm_point* c = (struct comm_point*)arg; + log_assert(c->type == comm_local); + + if(event&EV_READ) { + if(!comm_point_tcp_handle_read(fd, c, 1)) { + log_err("error in localhdl"); + } + return; + } + log_err("Ignored event %d for localhdl.", event); +} + struct comm_point* comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer, comm_point_callback_t* callback, void* callback_arg) @@ -571,7 +593,7 @@ comm_point_create_tcp_handler(struct comm_base *base, c->type = comm_tcp; c->tcp_do_close = 0; c->do_not_close = 0; - c->tcp_do_toggle_rw = 0; + c->tcp_do_toggle_rw = 1; c->callback = callback; c->cb_arg = callback_arg; /* add to parent free list */ @@ -653,6 +675,56 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize, return c; } +struct comm_point* +comm_point_create_local(struct comm_base *base, int fd, size_t bufsize, + comm_point_callback_t* callback, void* callback_arg) +{ + struct comm_point* c = (struct comm_point*)calloc(1, + sizeof(struct comm_point)); + short evbits; + if(!c) + return NULL; + c->ev = (struct internal_event*)calloc(1, + sizeof(struct internal_event)); + if(!c->ev) { + free(c); + return NULL; + } + c->fd = fd; + c->buffer = ldns_buffer_new(bufsize); + if(!c->buffer) { + free(c->ev); + free(c); + return NULL; + } + c->timeout = NULL; + c->tcp_is_reading = 1; + c->tcp_byte_count = 0; + c->tcp_parent = NULL; + c->max_tcp_count = 0; + c->tcp_handlers = NULL; + c->tcp_free = NULL; + c->type = comm_local; + c->tcp_do_close = 0; + c->do_not_close = 1; + c->tcp_do_toggle_rw = 0; + c->callback = callback; + c->cb_arg = callback_arg; + /* libevent stuff */ + evbits = EV_PERSIST | EV_READ; + event_set(&c->ev->ev, c->fd, evbits, comm_point_local_handle_callback, + c); + if(event_base_set(base->eb->base, &c->ev->ev) != 0 || + event_add(&c->ev->ev, c->timeout) != 0 ) + { + log_err("could not add tcphdl event"); + free(c->ev); + free(c); + return NULL; + } + return c; +} + void comm_point_close(struct comm_point* c) { diff --git a/util/netevent.h b/util/netevent.h index 9b3098b6f..c6495549f 100644 --- a/util/netevent.h +++ b/util/netevent.h @@ -135,7 +135,9 @@ struct comm_point { /** TCP accept socket - only creates handlers if readable. */ comm_tcp_accept, /** TCP handler socket - handle byteperbyte readwrite. */ - comm_tcp + comm_tcp, + /** AF_UNIX socket - for internal commands. */ + comm_local } type; /* ---------- Behaviour ----------- */ @@ -286,6 +288,19 @@ struct comm_point* comm_point_create_tcp(struct comm_base* base, int fd, int num, size_t bufsize, comm_point_callback_t* callback, void* callback_arg); +/** + * Create commpoint to listen to a local domain file descriptor. + * @param base: in which base to alloc the commpoint. + * @param fd: file descriptor of open AF_UNIX socket set to listen nonblocking. + * @param bufsize: size of buffer to create for handlers. + * @param callback: callback function pointer for the handler. + * @param callback_arg: will be passed to your callback function. + * @return: the commpoint or NULL on error. + */ +struct comm_point* comm_point_create_local(struct comm_base* base, + int fd, size_t bufsize, + comm_point_callback_t* callback, void* callback_arg); + /** * Close a comm point fd. * @param c: comm point to close.