#include "util/config_file.h"
#include "services/listen_dnsport.h"
+/* @@@ TODO remove */
+#include "pthread.h"
+#include <signal.h>
+
struct daemon*
daemon_init()
{
return 1;
}
-void
-daemon_fork(struct daemon* daemon)
+/**
+ * Allocate empty worker structures. With backptr and thread-number,
+ * from 0..numthread initialised. Used as user arguments to new threads.
+ * @param daemon: the daemon with (new) config settings.
+ */
+static void
+daemon_create_workers(struct daemon* daemon)
{
+ int i;
+ log_assert(daemon && daemon->cfg);
/* only one thread for now */
- log_assert(daemon);
- daemon->num = 1;
+ daemon->num = daemon->cfg->num_threads;
+#if !defined(HAVE_PTHREAD) && !defined(HAVE_SOLARIS_THREADS)
+ if(daemon->num != 1) {
+ log_err("configed %d threads, but executable was compiled "
+ "with no thread support. Continuing with 1.", daemon->num);
+ daemon->num = 1;
+ }
+#endif /* no threads */
daemon->workers = (struct worker**)calloc((size_t)daemon->num,
sizeof(struct worker*));
- if(!(daemon->workers[0] = worker_init(daemon->cfg, daemon->ports,
- BUFSZ)))
- fatal_exit("could not initialize thread # %d", 0);
- daemon->workers[0]->daemon = daemon;
- daemon->workers[0]->thread_num = 0;
+ for(i=0; i<daemon->num; i++) {
+ if(!(daemon->workers[i] = worker_create(daemon, i)))
+ fatal_exit("malloc failure");
+ }
+}
+
+/**
+ * Function to start one thread.
+ * @param arg: user argument.
+ * @return: void* user return value could be used for thread_join results.
+ */
+static void*
+thread_start(void* arg)
+{
+ struct worker* worker = (struct worker*)arg;
+ int num = worker->thread_num;
+ ub_thread_blocksigs();
+ if(!worker_init(worker, worker->daemon->cfg, worker->daemon->ports,
+ BUFSZ, 0))
+ fatal_exit("Could not initialize thread #%d", num);
+
+ worker_work(worker);
+ return NULL;
+}
+/**
+ * Fork and init the other threads. Main thread returns for special handling.
+ * @param daemon: the daemon with other threads to fork.
+ */
+static void
+daemon_start_others(struct daemon* daemon)
+{
+ int i;
+ log_assert(daemon);
+ log_info("start others");
+ /* skip i=0, is this thread */
+ for(i=1; i<daemon->num; i++) {
+ ub_thread_create(&daemon->workers[i]->thr_id,
+ thread_start, daemon->workers[i]);
+ }
+}
+
+/**
+ * Stop the other threads.
+ * @param daemon: the daemon with other threads.
+ */
+static void
+daemon_stop_others(struct daemon* daemon)
+{
+ int i, err;
+ log_assert(daemon);
+ log_info("stop others");
+ /* skip i=0, is this thread */
+ /* use i=0 buffer for sending cmds; because we are #0 */
+ for(i=1; i<daemon->num; i++) {
+ log_info("killing thread %d", i);
+ worker_send_cmd(daemon->workers[i],
+ daemon->workers[0]->front->udp_buff, worker_cmd_quit);
+ }
+ /** wait for them to quit */
+ for(i=1; i<daemon->num; i++) {
+ /* join it to make sure its dead */
+ log_info("join %d", i);
+ if((err=pthread_join(daemon->workers[i]->thr_id, NULL)))
+ log_err("pthread_join: %s", strerror(err));
+ log_info("join success %d", i);
+ }
+}
+
+void
+daemon_fork(struct daemon* daemon)
+{
+ log_assert(daemon);
+ log_info("daemon_fork");
+ /* first create all the worker structures, so we can pass
+ * them to the newly created threads.
+ */
+ daemon_create_workers(daemon);
+
+ /* Now create the threads and init the workers.
+ * By the way, this is thread #0 (the main thread).
+ */
+ daemon_start_others(daemon);
+
+ /* Special handling for the main thread. This is the thread
+ * that handles signals.
+ */
+ if(!worker_init(daemon->workers[0], daemon->cfg, daemon->ports,
+ BUFSZ, 1))
+ fatal_exit("Could not initialize main thread");
+ /* see if other threads have started correctly? */
+
+ /* Start resolver service on main thread. */
log_info("start of service (%s).", PACKAGE_STRING);
worker_work(daemon->workers[0]);
+
+ /* we exited! a signal happened! Stop other threads */
+ daemon_stop_others(daemon);
+
if(daemon->workers[0]->need_to_restart)
daemon->need_to_exit = 0;
else daemon->need_to_exit = 1;
/** the size of ID and flags, opcode, rcode in dns packet */
#define ID_AND_FLAGS 4
+void
+worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
+ enum worker_commands cmd)
+{
+ ldns_buffer_clear(buffer);
+ /* like DNS message, length data */
+ ldns_buffer_write_u16(buffer, sizeof(uint32_t));
+ ldns_buffer_write_u32(buffer, (uint32_t)cmd);
+ ldns_buffer_flip(buffer);
+ if(!write_socket(worker->cmd_send_fd, ldns_buffer_begin(buffer),
+ ldns_buffer_limit(buffer)))
+ log_err("write socket: %s", strerror(errno));
+}
+
/** reply to query with given error code */
static void
replyerror(int r, struct worker* worker)
return 0;
}
+/** process control messages from the main thread. */
+static int
+worker_handle_control_cmd(struct comm_point* c, void* arg, int error,
+ struct comm_reply* ATTR_UNUSED(reply_info))
+{
+ struct worker* worker = (struct worker*)arg;
+ enum worker_commands cmd;
+ if(error != NETEVENT_NOERROR) {
+ if(error == NETEVENT_CLOSED)
+ comm_base_exit(worker->base);
+ else log_info("control cmd: %d", error);
+ return 0;
+ }
+ log_info("control cmd");
+ if(ldns_buffer_limit(c->buffer) != sizeof(uint32_t)) {
+ fatal_exit("bad control msg length %d",
+ (int)ldns_buffer_limit(c->buffer));
+ }
+ cmd = ldns_buffer_read_u32(c->buffer);
+ switch(cmd) {
+ case worker_cmd_quit:
+ comm_base_exit(worker->base);
+ break;
+ default:
+ log_err("bad command %d", (int)cmd);
+ break;
+ }
+ return 0;
+}
+
/** handles callbacks from listening event interface */
static int
worker_handle_request(struct comm_point* c, void* arg, int error,
}
struct worker*
-worker_init(struct config_file *cfg, struct listen_port* ports,
- size_t buffer_size)
+worker_create(struct daemon* daemon, int id)
{
struct worker* worker = (struct worker*)calloc(1,
sizeof(struct worker));
- unsigned int seed;
+ int sv[2];
if(!worker)
return NULL;
+ worker->daemon = daemon;
+ worker->thread_num = id;
+ worker->cmd_send_fd = -1;
+ worker->cmd_recv_fd = -1;
+ /* create socketpair to communicate with worker */
+ if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
+ free(worker);
+ log_err("socketpair: %s", strerror(errno));
+ return NULL;
+ }
+ worker->cmd_send_fd = sv[0];
+ worker->cmd_recv_fd = sv[1];
+ return worker;
+}
+
+int
+worker_init(struct worker* worker, struct config_file *cfg,
+ struct listen_port* ports, size_t buffer_size, int do_sigs)
+{
+ unsigned int seed;
+ int startport;
worker->need_to_restart = 0;
worker->base = comm_base_create();
if(!worker->base) {
log_err("could not create event handling base");
worker_delete(worker);
- return NULL;
+ return 0;
}
- worker->comsig = comm_signal_create(worker->base, worker_sighandler,
- worker);
- if(!worker->comsig || !comm_signal_bind(worker->comsig, SIGHUP)
- || !comm_signal_bind(worker->comsig, SIGINT)
- || !comm_signal_bind(worker->comsig, SIGQUIT)) {
- log_err("could not create signal handlers");
- worker_delete(worker);
- return NULL;
+ if(do_sigs) {
+ worker->comsig = comm_signal_create(worker->base,
+ worker_sighandler, worker);
+ if(!worker->comsig || !comm_signal_bind(worker->comsig, SIGHUP)
+ || !comm_signal_bind(worker->comsig, SIGINT)
+ || !comm_signal_bind(worker->comsig, SIGQUIT)) {
+ log_err("could not create signal handlers");
+ worker_delete(worker);
+ return 0;
+ }
+ ub_thread_sig_unblock(SIGHUP);
+ ub_thread_sig_unblock(SIGINT);
+ ub_thread_sig_unblock(SIGQUIT);
+ } else { /* !do_sigs */
+ worker->comsig = 0;
}
worker->front = listen_create(worker->base, ports,
buffer_size, worker_handle_request, worker);
if(!worker->front) {
log_err("could not create listening sockets");
worker_delete(worker);
- return NULL;
+ return 0;
}
+ startport = cfg->outgoing_base_port +
+ cfg->outgoing_num_ports * worker->thread_num;
worker->back = outside_network_create(worker->base,
buffer_size, (size_t)cfg->outgoing_num_ports, cfg->ifs,
- cfg->num_ifs, cfg->do_ip4, cfg->do_ip6,
- cfg->outgoing_base_port);
+ cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport);
if(!worker->back) {
log_err("could not create outgoing sockets");
worker_delete(worker);
- return NULL;
+ return 0;
}
/* init random(), large table size. */
if(!(worker->rndstate = (struct ub_randstate*)calloc(1,
sizeof(struct ub_randstate)))) {
log_err("malloc rndtable failed.");
worker_delete(worker);
- return NULL;
+ return 0;
}
- seed = (unsigned int)time(NULL) ^ (unsigned int)getpid();
+ seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^
+ (unsigned int)worker->thread_num;
if(!ub_initstate(seed, worker->rndstate, RND_STATE_SIZE)) {
log_err("could not init random numbers.");
worker_delete(worker);
- return NULL;
+ return 0;
}
+ /* start listening to commands */
+ if(!(worker->cmd_com=comm_point_create_local(worker->base,
+ worker->cmd_recv_fd, buffer_size, worker_handle_control_cmd,
+ worker))) {
+ log_err("could not create control compt.");
+ worker_delete(worker);
+ return 0;
+ }
+
/* set forwarder address */
if(cfg->fwd_address && cfg->fwd_address[0]) {
if(!worker_set_fwd(worker, cfg->fwd_address, cfg->fwd_port)) {
fatal_exit("could not set forwarder address");
}
}
- return worker;
+ return 1;
}
void
{
if(!worker)
return;
+ close(worker->cmd_send_fd);
+ worker->cmd_send_fd = -1;
+ close(worker->cmd_recv_fd);
+ worker->cmd_recv_fd = -1;
listen_delete(worker->front);
outside_network_delete(worker->back);
comm_signal_delete(worker->comsig);
/** size of table used for random numbers. large to be more secure. */
#define RND_STATE_SIZE 256
+/** worker commands */
+enum worker_commands {
+ /** make the worker quit */
+ worker_cmd_quit
+};
+
+
/**
* Structure holding working information for unbound.
* Holds globally visible information.
struct daemon* daemon;
/** the thread number (in daemon array). */
int thread_num;
+ /** thread id */
+ ub_thread_t thr_id;
+ /** fd 0 of socketpair, write commands for worker to this one */
+ int cmd_send_fd;
+ /** fd 1 of socketpair, worker listens on this one */
+ int cmd_recv_fd;
/** the event base this worker works with */
struct comm_base* base;
/** the frontside listening interface where request events come in */
/** the backside outside network interface to the auth servers */
struct outside_network* back;
/** the signal handler */
- struct comm_signal *comsig;
+ struct comm_signal* comsig;
+ /** commpoint to listen to commands. */
+ struct comm_point* cmd_com;
/** number of requests currently active */
int num_requests;
int need_to_restart;
};
+/**
+ * Create the worker structure. Bare bones version, zeroed struct,
+ * with backpointers only. Use worker_init on it later.
+ * @param daemon: the daemon that this worker thread is part of.
+ * @param id: the thread number from 0.. numthreads-1.
+ * @return: the new worker or NULL on alloc failure.
+ */
+struct worker* worker_create(struct daemon* daemon, int id);
+
/**
* Initialize worker.
* Allocates event base, listens to ports
+ * @param worker: worker to initialize, created with worker_create.
* @param cfg: configuration settings.
* @param ports: list of shared query ports.
* @param buffer_size: size of datagram buffer.
- * @return: The worker, or NULL on error.
+ * @param do_sigs: if true, worker installs signal handlers.
+ * @return: false on error.
*/
-struct worker* worker_init(struct config_file *cfg, struct listen_port* ports,
- size_t buffer_size);
+int worker_init(struct worker* worker, struct config_file *cfg,
+ struct listen_port* ports, size_t buffer_size, int do_sigs);
/**
* Make worker work.
*/
int worker_set_fwd(struct worker* worker, const char* ip, int port);
+/**
+ * Send a command to a worker. Uses blocking writes.
+ * @param worker: worker to send command to.
+ * @param buffer: an empty buffer to use.
+ * @param cmd: command to send.
+ */
+void worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
+ enum worker_commands cmd);
+
#endif /* DAEMON_WORKER_H */
26 February 2007: Wouter
- ub_random code used to select ID and port.
+ - log code prints thread id.
+ - unbound can thread itself, with reload(HUP) and quit working
+ correctly.
23 February 2007: Wouter
- Can do reloads on sigHUP. Everything is stopped, and freed,
server:
# whitespace is not necessary, but looks cleaner.
- # verbosity number, 0 is least verbose.
+ # verbosity number, 0 is least verbose. 1 is default.
verbosity: 2
# number of threads to create. 1 disables threading.
The verbosity number, level 0 means no verbosity, only errors. Level 1
gives operational information. Level 2 gives query level information,
output per query. Level 3 gives algorithm level information.
+Default is level 1. The verbosity can also be increased from the commandline,
+see
+.Xr unbound 8 .
.It \fBnum-threads:\fR <number>
The number of threads to create to serve clients. Use 1 for no threading.
.It \fBport:\fR <port number>
.It \fBlogfile:\fR <filename>
If "" is given, logging goes to stderr, or nowhere once daemonized.
The logfile is appended to, in the following format:
-[seconds since 1970] unbound[pid]: type: message.
+[seconds since 1970] unbound[pid:tid]: type: message.
.It \fBpidfile:\fR <filename>
The process id is written to the file. Default is "unbound.pid". So,
kill -HUP `cat /etc/unbound/unbound.pid` will trigger a reload,
#include "config.h"
#include "util/locks.h"
+#include <signal.h>
+
+void
+ub_thread_blocksigs()
+{
+#ifdef HAVE_PTHREAD
+ int err;
+ sigset_t sigset;
+ sigfillset(&sigset);
+ log_info("blocking signals");
+ if((err=pthread_sigmask(SIG_SETMASK, &sigset, NULL)))
+ fatal_exit("pthread_sigmask: %s", strerror(err));
+#else
+# ifdef HAVE_SOLARIS_THREADS
+ int err;
+ sigset_t sigset;
+ sigfillset(&sigset);
+ if((err=thr_sigsetmask(SIG_SETMASK, &sigset, NULL)))
+ fatal_exit("thr_sigsetmask: %s", strerror(err));
+# else
+ /* have nothing, do nothing */
+# endif /* HAVE_SOLARIS_THREADS */
+#endif /* HAVE_PTHREAD */
+}
+
+void ub_thread_sig_unblock(int sig)
+{
+#ifdef HAVE_PTHREAD
+ int err;
+ sigset_t sigset;
+ sigemptyset(&sigset);
+ sigaddset(&sigset, sig);
+ log_info("unblocking signal %d", sig);
+ if((err=pthread_sigmask(SIG_UNBLOCK, &sigset, NULL)))
+ fatal_exit("pthread_sigmask: %s", strerror(err));
+#else
+# ifdef HAVE_SOLARIS_THREADS
+ int err;
+ sigset_t sigset;
+ sigemptyset(&sigset);
+ sigaddset(&sigset, sig);
+ if((err=thr_sigsetmask(SIG_UNBLOCK, &sigset, NULL)))
+ fatal_exit("thr_sigsetmask: %s", strerror(err));
+# else
+ /* have nothing, do nothing */
+# endif /* HAVE_SOLARIS_THREADS */
+#endif /* HAVE_PTHREAD */
+}
typedef pthread_t ub_thread_t;
/** Pass where to store tread_t in thr. Use default NULL attributes. */
#define ub_thread_create(thr, func, arg) LOCKRET(pthread_create(thr, NULL, func, arg))
+/** get self id. */
+#define ub_thread_self() pthread_self()
#else /* we do not HAVE_PTHREAD */
#ifdef HAVE_SOLARIS_THREADS
/** Thread creation, create a default thread. */
typedef thread_t ub_thread_t;
#define ub_thread_create(thr, func, arg) LOCKRET(thr_create(NULL, NULL, func, arg, NULL, thr))
+#define ub_thread_self() thr_self()
#else /* we do not HAVE_SOLARIS_THREADS and no PTHREADS */
#define ub_thread_create(thr, func, arg) \
fatal_exit("%s %d called thread create, but no thread support " \
"has been compiled in.", __FILE__, __LINE__)
+#define ub_thread_self() 0
#endif /* HAVE_SOLARIS_THREADS */
#endif /* HAVE_PTHREAD */
+
+/**
+ * Block all signals for this thread.
+ * fatal exit on error.
+ */
+void ub_thread_blocksigs();
+
+/**
+ * unblock one signal for this thread.
+ */
+void ub_thread_sig_unblock(int sig);
+
#endif /* UTIL_LOCKS_H */
#include "config.h"
#include "util/log.h"
+#include "util/locks.h"
#ifdef HAVE_TIME_H
#include <time.h>
#endif
strerror(errno));
return;
}
+ verbose(VERB_DETAIL, "switching to logfile %s", filename);
if(logfile && logfile != stderr)
fclose(logfile);
logfile = f;
char message[MAXSYSLOGMSGLEN];
const char* ident="unbound";
vsnprintf(message, sizeof(message), format, args);
- fprintf(logfile, "[%d] %s[%d] %s: %s\n",
- (int)time(NULL), ident, (int)getpid(), type, message);
+ fprintf(logfile, "[%d] %s[%d:%x] %s: %s\n",
+ (int)time(NULL), ident, (int)getpid(),
+ (int)ub_thread_self(),
+ type, message);
fflush(logfile);
}
else return 0;
}
+int
+write_socket(int s, const void *buf, size_t size)
+{
+ const char* data = (const char*)buf;
+ size_t total_count = 0;
+
+ while (total_count < size) {
+ ssize_t count
+ = write(s, data + total_count, size - total_count);
+ if (count == -1) {
+ if (errno != EAGAIN && errno != EINTR) {
+ return 0;
+ } else {
+ continue;
+ }
+ }
+ total_count += count;
+ }
+ return 1;
+}
*/
int str_is_ip6(const char* str);
+/**
+ * Write (blocking) to a nonblocking socket.
+ * @param s: fd.
+ * @param buf: data buffer.
+ * @param size: length of data to send.
+ * @return: 0 on error. errno is set.
+ */
+int
+write_socket(int s, const void *buf, size_t size);
+
#endif /* NET_HELP_H */
*/
static void comm_signal_callback(int fd, short event, void* arg);
+/** libevent callback for AF_UNIX fds. */
+static void comm_point_local_handle_callback(int fd, short event, void* arg);
+
/** create a tcp handler with a parent */
static struct comm_point* comm_point_create_tcp_handler(
struct comm_base *base, struct comm_point* parent, size_t bufsize,
{
log_assert(c->type == comm_tcp);
ldns_buffer_clear(c->buffer);
- c->tcp_is_reading = 1;
+ if(c->tcp_do_toggle_rw)
+ c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
comm_point_stop_listening(c);
/* for listening socket */
tcp_callback_reader(struct comm_point* c)
{
struct comm_reply rep;
- log_assert(c->type == comm_tcp);
+ log_assert(c->type == comm_tcp || c->type == comm_local);
ldns_buffer_flip(c->buffer);
- c->tcp_is_reading = 0;
+ if(c->tcp_do_toggle_rw)
+ c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
- comm_point_stop_listening(c);
+ if(c->type == comm_tcp)
+ comm_point_stop_listening(c);
rep.c = c;
rep.addrlen = 0;
if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &rep) ) {
/** Handle tcp reading callback.
* @param fd: file descriptor of socket.
* @param c: comm point to read from into buffer.
+ * @param short_ok: if true, very short packets are OK (for comm_local).
* @return: 0 on error
*/
static int
-comm_point_tcp_handle_read(int fd, struct comm_point* c)
+comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
{
ssize_t r;
- log_assert(c->type == comm_tcp);
+ log_assert(c->type == comm_tcp || c->type == comm_local);
if(!c->tcp_is_reading)
return 0;
}
ldns_buffer_set_limit(c->buffer,
ldns_buffer_read_u16_at(c->buffer, 0));
- if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
+ if(!short_ok &&
+ ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
verbose(VERB_DETAIL, "tcp: dropped bogus too short.");
return 0;
}
log_assert(c->type == comm_tcp);
if(event&EV_READ) {
- if(!comm_point_tcp_handle_read(fd, c)) {
+ if(!comm_point_tcp_handle_read(fd, c, 0)) {
reclaim_tcp_handler(c);
if(!c->tcp_do_close)
(void)(*c->callback)(c, c->cb_arg,
log_err("Ignored event %d for tcphdl.", event);
}
+static void comm_point_local_handle_callback(int fd, short event, void* arg)
+{
+ struct comm_point* c = (struct comm_point*)arg;
+ log_assert(c->type == comm_local);
+
+ if(event&EV_READ) {
+ if(!comm_point_tcp_handle_read(fd, c, 1)) {
+ log_err("error in localhdl");
+ }
+ return;
+ }
+ log_err("Ignored event %d for localhdl.", event);
+}
+
struct comm_point*
comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
comm_point_callback_t* callback, void* callback_arg)
c->type = comm_tcp;
c->tcp_do_close = 0;
c->do_not_close = 0;
- c->tcp_do_toggle_rw = 0;
+ c->tcp_do_toggle_rw = 1;
c->callback = callback;
c->cb_arg = callback_arg;
/* add to parent free list */
return c;
}
+struct comm_point*
+comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
+ comm_point_callback_t* callback, void* callback_arg)
+{
+ struct comm_point* c = (struct comm_point*)calloc(1,
+ sizeof(struct comm_point));
+ short evbits;
+ if(!c)
+ return NULL;
+ c->ev = (struct internal_event*)calloc(1,
+ sizeof(struct internal_event));
+ if(!c->ev) {
+ free(c);
+ return NULL;
+ }
+ c->fd = fd;
+ c->buffer = ldns_buffer_new(bufsize);
+ if(!c->buffer) {
+ free(c->ev);
+ free(c);
+ return NULL;
+ }
+ c->timeout = NULL;
+ c->tcp_is_reading = 1;
+ c->tcp_byte_count = 0;
+ c->tcp_parent = NULL;
+ c->max_tcp_count = 0;
+ c->tcp_handlers = NULL;
+ c->tcp_free = NULL;
+ c->type = comm_local;
+ c->tcp_do_close = 0;
+ c->do_not_close = 1;
+ c->tcp_do_toggle_rw = 0;
+ c->callback = callback;
+ c->cb_arg = callback_arg;
+ /* libevent stuff */
+ evbits = EV_PERSIST | EV_READ;
+ event_set(&c->ev->ev, c->fd, evbits, comm_point_local_handle_callback,
+ c);
+ if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
+ event_add(&c->ev->ev, c->timeout) != 0 )
+ {
+ log_err("could not add tcphdl event");
+ free(c->ev);
+ free(c);
+ return NULL;
+ }
+ return c;
+}
+
void
comm_point_close(struct comm_point* c)
{
/** TCP accept socket - only creates handlers if readable. */
comm_tcp_accept,
/** TCP handler socket - handle byteperbyte readwrite. */
- comm_tcp
+ comm_tcp,
+ /** AF_UNIX socket - for internal commands. */
+ comm_local
} type;
/* ---------- Behaviour ----------- */
int fd, int num, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg);
+/**
+ * Create commpoint to listen to a local domain file descriptor.
+ * @param base: in which base to alloc the commpoint.
+ * @param fd: file descriptor of open AF_UNIX socket set to listen nonblocking.
+ * @param bufsize: size of buffer to create for handlers.
+ * @param callback: callback function pointer for the handler.
+ * @param callback_arg: will be passed to your callback function.
+ * @return: the commpoint or NULL on error.
+ */
+struct comm_point* comm_point_create_local(struct comm_base* base,
+ int fd, size_t bufsize,
+ comm_point_callback_t* callback, void* callback_arg);
+
/**
* Close a comm point fd.
* @param c: comm point to close.