/* skip i=0, is this thread */
/* use i=0 buffer for sending cmds; because we are #0 */
for(i=1; i<daemon->num; i++) {
- worker_send_cmd(daemon->workers[i],
- daemon->workers[0]->front->udp_buff, worker_cmd_quit);
+ worker_send_cmd(daemon->workers[i], worker_cmd_quit);
}
/* wait for them to quit */
for(i=1; i<daemon->num; i++) {
}
void
-worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
- enum worker_commands cmd)
+worker_send_cmd(struct worker* worker, enum worker_commands cmd)
{
- ldns_buffer_clear(buffer);
- /* like DNS message, length data */
- ldns_buffer_write_u16(buffer, sizeof(uint32_t));
- ldns_buffer_write_u32(buffer, (uint32_t)cmd);
- ldns_buffer_flip(buffer);
- if(!tube_send_cmd(worker->cmd, buffer)) {
+ uint32_t c = (uint32_t)cmd;
+ if(!tube_write_msg(worker->cmd, (uint8_t*)&c, sizeof(c), 0)) {
log_err("worker send cmd %d failed", (int)cmd);
}
}
}
void
-worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), ldns_buffer* buffer,
- int error, void* arg)
+worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), uint8_t* msg,
+ size_t len, int error, void* arg)
{
struct worker* worker = (struct worker*)arg;
enum worker_commands cmd;
if(error != NETEVENT_NOERROR) {
+ free(msg);
if(error == NETEVENT_CLOSED)
comm_base_exit(worker->base);
else log_info("control event: %d", error);
return;
}
- if(ldns_buffer_limit(buffer) != sizeof(uint32_t)) {
- fatal_exit("bad control msg length %d",
- (int)ldns_buffer_limit(buffer));
+ if(len != sizeof(uint32_t)) {
+ fatal_exit("bad control msg length %d", (int)len);
}
- cmd = ldns_buffer_read_u32(buffer);
+ cmd = ldns_read_uint32(msg);
+ free(msg);
switch(cmd) {
case worker_cmd_quit:
verbose(VERB_ALGO, "got control cmd quit");
}
if(worker->thread_num != 0) {
/* start listening to commands */
- if(!tube_listen_cmd(worker->cmd, worker->base,
- cfg->msg_buffer_size, &worker_handle_control_cmd,
- worker)) {
+ if(!tube_setup_bg_listen(worker->cmd, worker->base,
+ &worker_handle_control_cmd, worker)) {
log_err("could not create control compt.");
worker_delete(worker);
return 0;
return 0;
}
+void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
+ uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+ int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
{
log_assert(0);
/**
* Send a command to a worker. Uses blocking writes.
* @param worker: worker to send command to.
- * @param buffer: an empty buffer to use.
* @param cmd: command to send.
*/
-void worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
- enum worker_commands cmd);
+void worker_send_cmd(struct worker* worker, enum worker_commands cmd);
/**
* Worker signal handler function. User argument is the worker itself.
/**
* process control messages from the main thread.
* @param tube: tube control message came on.
- * @param buffer: buffer with message in it.
+ * @param msg: message contents.
+ * @param len: length of message.
* @param error: if error (NETEVENT_*) happened.
* @param arg: user argument
*/
-void worker_handle_control_cmd(struct tube* tube, ldns_buffer* buffer,
+void worker_handle_control_cmd(struct tube* tube, uint8_t* msg, size_t len,
int error, void* arg);
/** handles callbacks from listening event interface */
+22 July 2008: Wouter
+ - moved pipe actions to util/tube.c. easier porting and shared code.
+ - check _raw() commpoint callbacks with fptr_wlist.
+ - iana port update.
+
21 July 2008: Wouter
- #198: nicer entropy warning message. manpage OS hints.
#include "libunbound/unbound.h"
#include "util/data/packed_rrset.h"
struct libworker;
+struct tube;
/**
* The context structure
/* --- pipes --- */
/** mutex on query write pipe */
lock_basic_t qqpipe_lock;
- /** the query write pipe, [0] read from, [1] write on */
- int qqpipe[2];
+ /** the query write pipe */
+ struct tube* qq_pipe;
/** mutex on result read pipe */
lock_basic_t rrpipe_lock;
- /** the result read pipe, [0] read from, [1] write on */
- int rrpipe[2];
+ /** the result read pipe */
+ struct tube* rr_pipe;
/* --- shared data --- */
/** mutex for access to env.cfg, finalized and dothread */
#include "util/log.h"
#include "util/random.h"
#include "util/net_help.h"
+#include "util/tube.h"
#include "services/modstack.h"
#include "services/localzone.h"
#include "services/cache/infra.h"
return NULL;
}
seed = 0;
- if(socketpair(AF_UNIX, SOCK_STREAM, 0, ctx->qqpipe) == -1) {
- ub_randfree(ctx->seed_rnd);
- free(ctx);
- return NULL;
- }
- if(socketpair(AF_UNIX, SOCK_STREAM, 0, ctx->rrpipe) == -1) {
+ if((ctx->qq_pipe = tube_create()) == NULL) {
int e = errno;
- close(ctx->qqpipe[0]);
- close(ctx->qqpipe[1]);
ub_randfree(ctx->seed_rnd);
free(ctx);
errno = e;
return NULL;
}
-#ifndef USE_WINSOCK
- if(!fd_set_nonblock(ctx->rrpipe[0]) ||
- !fd_set_nonblock(ctx->rrpipe[1]) ||
- !fd_set_nonblock(ctx->qqpipe[0]) ||
- !fd_set_nonblock(ctx->qqpipe[1])) {
+ if((ctx->rr_pipe = tube_create()) == NULL) {
int e = errno;
- close(ctx->rrpipe[0]);
- close(ctx->rrpipe[1]);
- close(ctx->qqpipe[0]);
- close(ctx->qqpipe[1]);
+ tube_delete(ctx->qq_pipe);
ub_randfree(ctx->seed_rnd);
free(ctx);
errno = e;
return NULL;
}
-#endif /* !USE_WINSOCK - it is a pipe(nonsocket) on windows) */
lock_basic_init(&ctx->qqpipe_lock);
lock_basic_init(&ctx->rrpipe_lock);
lock_basic_init(&ctx->cfglock);
ctx->env = (struct module_env*)calloc(1, sizeof(*ctx->env));
if(!ctx->env) {
- close(ctx->rrpipe[0]);
- close(ctx->rrpipe[1]);
- close(ctx->qqpipe[0]);
- close(ctx->qqpipe[1]);
+ tube_delete(ctx->qq_pipe);
+ tube_delete(ctx->rr_pipe);
ub_randfree(ctx->seed_rnd);
free(ctx);
errno = ENOMEM;
}
ctx->env->cfg = config_create_forlib();
if(!ctx->env->cfg) {
- close(ctx->rrpipe[0]);
- close(ctx->rrpipe[1]);
- close(ctx->qqpipe[0]);
- close(ctx->qqpipe[1]);
+ tube_delete(ctx->qq_pipe);
+ tube_delete(ctx->rr_pipe);
free(ctx->env);
ub_randfree(ctx->seed_rnd);
free(ctx);
uint32_t cmd = UB_LIBCMD_QUIT;
lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->qqpipe_lock);
- (void)libworker_write_msg(ctx->qqpipe[1], (uint8_t*)&cmd,
+ (void)tube_write_msg(ctx->qq_pipe, (uint8_t*)&cmd,
(uint32_t)sizeof(cmd), 0);
lock_basic_unlock(&ctx->qqpipe_lock);
lock_basic_lock(&ctx->rrpipe_lock);
- while(libworker_read_msg(ctx->rrpipe[0], &msg, &len, 0)) {
+ while(tube_read_msg(ctx->rr_pipe, &msg, &len, 0)) {
/* discard all results except a quit confirm */
if(context_serial_getcmd(msg, len) == UB_LIBCMD_QUIT) {
free(msg);
lock_basic_destroy(&ctx->qqpipe_lock);
lock_basic_destroy(&ctx->rrpipe_lock);
lock_basic_destroy(&ctx->cfglock);
- if(ctx->qqpipe[0] != -1)
- close(ctx->qqpipe[0]);
- if(ctx->qqpipe[1] != -1)
- close(ctx->qqpipe[1]);
- if(ctx->rrpipe[0] != -1)
- close(ctx->rrpipe[0]);
- if(ctx->rrpipe[1] != -1)
- close(ctx->rrpipe[1]);
- ctx->qqpipe[0] = -1;
- ctx->qqpipe[1] = -1;
- ctx->rrpipe[0] = -1;
- ctx->rrpipe[1] = -1;
+ tube_delete(ctx->qq_pipe);
+ tube_delete(ctx->rr_pipe);
if(ctx->env) {
slabhash_delete(ctx->env->msg_cache);
rrset_cache_delete(ctx->env->rrset_cache);
return UB_NOERROR;
}
-/** perform a select() on the result read pipe */
-static int
-pollit(struct ub_ctx* ctx, struct timeval* t)
-{
- fd_set r;
-#ifndef S_SPLINT_S
- FD_ZERO(&r);
- FD_SET(FD_SET_T ctx->rrpipe[0], &r);
-#endif
- if(select(ctx->rrpipe[0]+1, &r, NULL, NULL, t) == -1) {
- return 0;
- }
- errno = 0;
- return FD_ISSET(ctx->rrpipe[0], &r);
-}
-
int
ub_poll(struct ub_ctx* ctx)
{
- struct timeval t;
- memset(&t, 0, sizeof(t));
/* no need to hold lock while testing for readability. */
- return pollit(ctx, &t);
+ return tube_poll(ctx->rr_pipe);
}
int
ub_fd(struct ub_ctx* ctx)
{
- return ctx->rrpipe[0];
+ return tube_read_fd(ctx->rr_pipe);
}
/** process answer from bg worker */
while(1) {
msg = NULL;
lock_basic_lock(&ctx->rrpipe_lock);
- r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
+ r = tube_read_msg(ctx->rr_pipe, &msg, &len, 1);
lock_basic_unlock(&ctx->rrpipe_lock);
if(r == 0)
return UB_PIPE;
uint8_t* msg;
uint32_t len;
/* this is basically the same loop as _process(), but with changes.
- * holds the rrpipe lock and waits with pollit */
+ * holds the rrpipe lock and waits with tube_wait */
while(1) {
lock_basic_lock(&ctx->rrpipe_lock);
lock_basic_lock(&ctx->cfglock);
* o possibly decrementing num_async
* do callback without lock
*/
- r = pollit(ctx, NULL);
+ r = tube_wait(ctx->rr_pipe);
if(r) {
- r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
+ r = tube_read_msg(ctx->rr_pipe, &msg, &len, 1);
if(r == 0) {
lock_basic_unlock(&ctx->rrpipe_lock);
return UB_PIPE;
lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->qqpipe_lock);
- if(!libworker_write_msg(ctx->qqpipe[1], msg, len, 0)) {
+ if(!tube_write_msg(ctx->qq_pipe, msg, len, 0)) {
lock_basic_unlock(&ctx->qqpipe_lock);
free(msg);
return UB_PIPE;
}
/* send cancel to background worker */
lock_basic_lock(&ctx->qqpipe_lock);
- if(!libworker_write_msg(ctx->qqpipe[1], msg, len, 0)) {
+ if(!tube_write_msg(ctx->qq_pipe, msg, len, 0)) {
lock_basic_unlock(&ctx->qqpipe_lock);
free(msg);
return UB_PIPE;
ub_randfree(w->env->rnd);
free(w->env);
}
- free(w->cmd_msg);
outside_network_delete(w->back);
- comm_point_delete(w->cmd_com);
- comm_point_delete(w->res_com);
comm_base_delete(w->base);
free(w);
}
}
/** handle control command coming into server */
-int
-libworker_handle_control_cmd(struct comm_point* c, void* arg,
- int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
+void
+libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
+ uint8_t* msg, size_t len, int err, void* arg)
{
struct libworker* w = (struct libworker*)arg;
- ssize_t r;
- if(w->cmd_read < sizeof(w->cmd_len)) {
- /* complete reading the length of control msg */
- r = read(c->fd, ((uint8_t*)&w->cmd_len) + w->cmd_read,
- sizeof(w->cmd_len) - w->cmd_read);
- if(r==0) {
- /* error has happened or */
- /* parent closed pipe, must have exited somehow */
- /* it is of no use to go on, exit */
- comm_base_exit(w->base);
- return 0;
- }
- if(r==-1) {
- if(errno != EAGAIN && errno != EINTR) {
- log_err("rpipe error: %s", strerror(errno));
- }
- /* nothing to read now, try later */
- return 0;
- }
- w->cmd_read += r;
- if(w->cmd_read < sizeof(w->cmd_len)) {
- /* not complete, try later */
- return 0;
- }
- w->cmd_msg = (uint8_t*)calloc(1, w->cmd_len);
- if(!w->cmd_msg) {
- log_err("malloc failure");
- w->cmd_read = 0;
- return 0;
- }
- }
- /* cmd_len has been read, read remainder */
- r = read(c->fd, w->cmd_msg + w->cmd_read - sizeof(w->cmd_len),
- w->cmd_len - (w->cmd_read - sizeof(w->cmd_len)));
- if(r==0) {
- /* error has happened or */
- /* parent closed pipe, must have exited somehow */
- /* it is of no use to go on, exit */
- comm_base_exit(w->base);
- return 0;
- }
- if(r==-1) {
- /* nothing to read now, try later */
- if(errno != EAGAIN && errno != EINTR) {
- log_err("rpipe error: %s", strerror(errno));
- }
- return 0;
- }
- w->cmd_read += r;
- if(w->cmd_read < sizeof(w->cmd_len) + w->cmd_len) {
- /* not complete, try later */
- return 0;
- }
- w->cmd_read = 0;
- libworker_do_cmd(w, w->cmd_msg, w->cmd_len); /* also frees the buf */
- w->cmd_msg = NULL;
- return 0;
-}
-
-/** handle opportunity to write result back */
-int
-libworker_handle_result_write(struct comm_point* c, void* arg,
- int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
-{
- struct libworker* w = (struct libworker*)arg;
- struct libworker_res_list* item = w->res_list;
- ssize_t r;
- if(!item) {
- comm_point_stop_listening(c);
- return 0;
- }
- if(w->res_write < sizeof(item->len)) {
- r = write(c->fd, ((uint8_t*)&item->len) + w->res_write,
- sizeof(item->len) - w->res_write);
- if(r == -1) {
- if(errno != EAGAIN && errno != EINTR) {
- log_err("wpipe error: %s", strerror(errno));
- }
- return 0; /* try again later */
- }
- if(r == 0) {
- /* error on pipe, must have exited somehow */
- /* it is of no use to go on, exit */
- comm_base_exit(w->base);
- return 0;
- }
- w->res_write += r;
- if(w->res_write < sizeof(item->len))
- return 0;
- }
- r = write(c->fd, item->buf + w->res_write - sizeof(item->len),
- item->len - (w->res_write - sizeof(item->len)));
- if(r == -1) {
- if(errno != EAGAIN && errno != EINTR) {
- log_err("wpipe error: %s", strerror(errno));
- }
- return 0; /* try again later */
- }
- if(r == 0) {
- /* error on pipe, must have exited somehow */
+ if(err != 0) {
+ free(msg);
/* it is of no use to go on, exit */
comm_base_exit(w->base);
- return 0;
- }
- w->res_write += r;
- if(w->res_write < sizeof(item->len) + item->len)
- return 0;
- /* done this result, remove it */
- free(item->buf);
- item->buf = NULL;
- w->res_list = w->res_list->next;
- free(item);
- if(!w->res_list) {
- w->res_last = NULL;
- comm_point_stop_listening(c);
+ return;
}
- w->res_write = 0;
- return 0;
+ libworker_do_cmd(w, msg, len); /* also frees the buf */
}
/** the background thread func */
{
/* setup */
uint32_t m;
- int fd;
struct libworker* w = (struct libworker*)arg;
struct ub_ctx* ctx = w->ctx;
log_thread_set(&w->thread_num);
/* we are forked */
w->is_bg_thread = 0;
/* close non-used parts of the pipes */
- if(ctx->qqpipe[1] != -1) {
- close(ctx->qqpipe[1]);
- ctx->qqpipe[1] = -1;
- }
- if(ctx->rrpipe[0] != -1) {
- close(ctx->rrpipe[0]);
- ctx->rrpipe[0] = -1;
- }
+ tube_close_write(ctx->qq_pipe);
+ tube_close_read(ctx->rr_pipe);
#endif
if(!w) {
log_err("libunbound bg worker init failed, nomem");
return NULL;
}
- if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0,
- libworker_handle_control_cmd, w))) {
- log_err("libunbound bg worker init failed, no cmdcom");
+ if(!tube_setup_bg_listen(ctx->qq_pipe, w->base,
+ libworker_handle_control_cmd, w)) {
+ log_err("libunbound bg worker init failed, no bglisten");
return NULL;
}
- if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1,
- libworker_handle_result_write, w))) {
- log_err("libunbound bg worker init failed, no rescom");
+ if(!tube_setup_bg_write(ctx->rr_pipe, w->base)) {
+ log_err("libunbound bg worker init failed, no bgwrite");
return NULL;
}
comm_base_dispatch(w->base);
/* cleanup */
- fd = ctx->rrpipe[1];
- ctx->rrpipe[1] = -1;
m = UB_LIBCMD_QUIT;
+ tube_remove_bg_listen(w->ctx->qq_pipe);
+ tube_remove_bg_write(w->ctx->rr_pipe);
libworker_delete(w);
- close(ctx->qqpipe[0]);
- ctx->qqpipe[0] = -1;
- (void)libworker_write_msg(fd, (uint8_t*)&m, (uint32_t)sizeof(m), 0);
- close(fd);
+ (void)tube_write_msg(ctx->rr_pipe, (uint8_t*)&m,
+ (uint32_t)sizeof(m), 0);
+#ifdef THREADS_DISABLED
+ /* close pipes from forked process before exit */
+ tube_close_read(ctx->qq_pipe);
+ tube_close_write(ctx->rr_pipe);
+#endif
return NULL;
}
w = libworker_setup(ctx, 1);
if(!w) fatal_exit("out of memory");
/* close non-used parts of the pipes */
- close(ctx->qqpipe[1]);
- close(ctx->rrpipe[0]);
- ctx->qqpipe[1] = -1;
- ctx->rrpipe[0] = -1;
+ tube_close_write(ctx->qq_pipe);
+ tube_close_read(ctx->rr_pipe);
(void)libworker_dobg(w);
exit(0);
break;
{
uint8_t* msg = NULL;
uint32_t len = 0;
- struct libworker_res_list* item;
/* serialize and delete unneeded q */
if(w->is_bg_thread) {
log_err("out of memory for async answer");
return;
}
- item = (struct libworker_res_list*)malloc(sizeof(*item));
- if(!item) {
- free(msg);
+ if(!tube_queue_item(w->ctx->rr_pipe, msg, len)) {
log_err("out of memory for async answer");
return;
}
- item->buf = msg;
- item->len = len;
- item->next = NULL;
- /* add at back of list, since the first one may be partially written */
- if(w->res_last)
- w->res_last->next = item;
- else w->res_list = item;
- w->res_last = item;
- if(w->res_list == w->res_last) {
- /* first added item, start the write process */
- comm_point_start_listening(w->res_com, -1, -1);
- }
}
/** callback with bg results */
return 0;
}
-int
-libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock)
-{
- ssize_t r;
- /* test */
- if(nonblock) {
- r = write(fd, &len, sizeof(len));
- if(r == -1) {
- if(errno==EINTR || errno==EAGAIN)
- return -1;
- log_err("msg write failed: %s", strerror(errno));
- return -1; /* can still continue, perhaps */
- }
- } else r = 0;
- if(!fd_set_block(fd))
- return 0;
- /* write remainder */
- if(r != (ssize_t)sizeof(len)) {
- if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
- log_err("msg write failed: %s", strerror(errno));
- (void)fd_set_nonblock(fd);
- return 0;
- }
- }
- if(write(fd, buf, len) == -1) {
- log_err("msg write failed: %s", strerror(errno));
- (void)fd_set_nonblock(fd);
- return 0;
- }
- if(!fd_set_nonblock(fd))
- return 0;
- return 1;
-}
-
-int
-libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock)
-{
- ssize_t r;
-
- /* test */
- *len = 0;
- if(nonblock) {
- r = read(fd, len, sizeof(*len));
- if(r == -1) {
- if(errno==EINTR || errno==EAGAIN)
- return -1;
- log_err("msg read failed: %s", strerror(errno));
- return -1; /* we can still continue, perhaps */
- }
- if(r == 0) /* EOF */
- return 0;
- } else r = 0;
- if(!fd_set_block(fd))
- return 0;
- /* read remainder */
- if(r != (ssize_t)sizeof(*len)) {
- if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
- log_err("msg read failed: %s", strerror(errno));
- (void)fd_set_nonblock(fd);
- return 0;
- }
- if(r == 0) /* EOF */ {
- (void)fd_set_nonblock(fd);
- return 0;
- }
- }
- *buf = (uint8_t*)malloc(*len);
- if(!*buf) {
- log_err("out of memory");
- (void)fd_set_nonblock(fd);
- return 0;
- }
- if((r=read(fd, *buf, *len)) == -1) {
- log_err("msg read failed: %s", strerror(errno));
- (void)fd_set_nonblock(fd);
- free(*buf);
- return 0;
- }
- if(r == 0) { /* EOF */
- (void)fd_set_nonblock(fd);
- free(*buf);
- return 0;
- }
- if(!fd_set_nonblock(fd)) {
- free(*buf);
- return 0;
- }
- return 1;
-}
-
/* --- fake callbacks for fptr_wlist to work --- */
void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
- ldns_buffer* ATTR_UNUSED(buffer), int ATTR_UNUSED(error),
- void* ATTR_UNUSED(arg))
+ uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+ int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
{
log_assert(0);
}
struct module_qstate;
struct comm_point;
struct comm_reply;
-struct libworker_res_list;
struct regional;
+struct tube;
/**
* The library-worker status structure
struct outside_network* back;
/** random() table for this worker. */
struct ub_randstate* rndstate;
-
- /** commpoint to listen to commands */
- struct comm_point* cmd_com;
- /** are we currently reading a command, 0 if not, else bytecount */
- size_t cmd_read;
- /** size of current read command, may be partially read */
- uint32_t cmd_len;
- /** the current read command content, malloced, can be partially read*/
- uint8_t* cmd_msg;
-
- /** commpoint to write results back */
- struct comm_point* res_com;
- /** are we curently writing a result, 0 if not, else bytecount into
- * the res_list first entry. */
- size_t res_write;
- /** list of outstanding results to be written back */
- struct libworker_res_list* res_list;
- /** last in list */
- struct libworker_res_list* res_last;
-};
-
-/**
- * List of results (arbitrary command serializations) to write back
- */
-struct libworker_res_list {
- /** next in list */
- struct libworker_res_list* next;
- /** serialized buffer to write */
- uint8_t* buf;
- /** length to write */
- uint32_t len;
};
/**
struct comm_reply* reply_info);
/** handle control command coming into server */
-int libworker_handle_control_cmd(struct comm_point* c, void* arg,
- int err, struct comm_reply* rep);
+void libworker_handle_control_cmd(struct tube* tube, uint8_t* msg, size_t len,
+ int err, void* arg);
/** handle opportunity to write result back */
-int libworker_handle_result_write(struct comm_point* c, void* arg,
- int err, struct comm_reply* rep);
-
-/**
- * Write length bytes followed by message.
- * @param fd: the socket to write on. Is nonblocking.
- * Set to blocking by the function,
- * and back to non-blocking at exit of function.
- * @param buf: the message.
- * @param len: length of message.
- * @param nonblock: if set to true, the first write is nonblocking.
- * If the first write fails the function returns -1.
- * If set false, the first write is blocking.
- * @return: all remainder writes are nonblocking.
- * return 0 on error, in that case blocking/nonblocking of socket is
- * unknown.
- * return 1 if all OK.
- */
-int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock);
-
-/**
- * Read length bytes followed by message.
- * @param fd: the socket to write on. Is nonblocking.
- * Set to blocking by the function,
- * and back to non-blocking at exit of function.
- * @param buf: the message, malloced.
- * @param len: length of message, returned.
- * @param nonblock: if set to true, the first read is nonblocking.
- * If the first read fails the function returns -1.
- * If set false, the first read is blocking.
- * @return: all remainder reads are nonblocking.
- * return 0 on error, in that case blocking/nonblocking of socket is
- * unknown. On EOF 0 is returned.
- * return 1 if all OK.
- */
-int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock);
+void libworker_handle_result_write(struct tube* tube, uint8_t* msg, size_t len,
+ int err, void* arg);
/**
* fill result from parsed message, on error fills servfail
struct tube;
void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
- ldns_buffer* ATTR_UNUSED(buffer), int ATTR_UNUSED(error),
- void* ATTR_UNUSED(arg))
+ uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+ int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
{
log_assert(0);
}
return 0;
}
+void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
+ uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+ int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
+{
+ log_assert(0);
+}
+
int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
{
log_assert(0);
return calloc(1, 1);
}
+struct comm_point* comm_point_create_raw(struct comm_base* ATTR_UNUSED(base),
+ int ATTR_UNUSED(fd), int ATTR_UNUSED(writing),
+ comm_point_callback_t* ATTR_UNUSED(callback),
+ void* ATTR_UNUSED(callback_arg))
+{
+ /* no pipe comm possible */
+ return calloc(1, 1);
+}
+
+void comm_point_start_listening(struct comm_point* ATTR_UNUSED(c),
+ int ATTR_UNUSED(newfd), int ATTR_UNUSED(sec))
+{
+ /* no bg write pipe comm possible */
+}
+
+void comm_point_stop_listening(struct comm_point* ATTR_UNUSED(c))
+{
+ /* no bg write pipe comm possible */
+}
+
/* only cmd com _local gets deleted */
void comm_point_delete(struct comm_point* c)
{
return 0;
}
+int
+fptr_whitelist_comm_point_raw(comm_point_callback_t *fptr)
+{
+ if(fptr == &tube_handle_listen) return 1;
+ else if(fptr == &tube_handle_write) return 1;
+ return 0;
+}
+
int
fptr_whitelist_comm_timer(void (*fptr)(void*))
{
int fptr_whitelist_tube_listen(tube_callback_t* fptr)
{
if(fptr == &worker_handle_control_cmd) return 1;
+ else if(fptr == &libworker_handle_control_cmd) return 1;
return 0;
}
*/
int fptr_whitelist_comm_point(comm_point_callback_t *fptr);
+/**
+ * Check function pointer whitelist for raw comm_point callback values.
+ *
+ * @param fptr: function pointer to check.
+ * @return false if not in whitelist.
+ */
+int fptr_whitelist_comm_point_raw(comm_point_callback_t *fptr);
+
/**
* Check function pointer whitelist for comm_timer callback values.
*
24249,
24321,
24386,
+24465,
24554,
24677,
24678,
log_assert(c->type == comm_raw);
comm_base_now(c->ev->base);
+ fptr_ok(fptr_whitelist_comm_point_raw(c->callback));
(void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL);
}
{
struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
int sv[2];
- if(!tube) return 0;
+ if(!tube) {
+ int err = errno;
+ log_err("tube_create: out of memory");
+ errno = err;
+ return NULL;
+ }
tube->sr = -1;
tube->sw = -1;
if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
+ int err = errno;
log_err("socketpair: %s", strerror(errno));
free(tube);
+ errno = err;
return NULL;
}
tube->sr = sv[0];
tube->sw = sv[1];
if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
+ int err = errno;
log_err("tube: cannot set nonblocking");
tube_delete(tube);
+ errno = err;
return NULL;
}
return tube;
void tube_delete(struct tube* tube)
{
if(!tube) return;
- if(tube->listen_com) {
- comm_point_delete(tube->listen_com);
- }
+ tube_remove_bg_listen(tube);
+ tube_remove_bg_write(tube);
/* close fds after deleting commpoints, to be sure.
* Also epoll does not like closing fd before event_del */
- if(tube->sr != -1) close(tube->sr);
- if(tube->sw != -1) close(tube->sw);
- tube->sr = -1;
- tube->sw = -1;
+ tube_close_read(tube);
+ tube_close_write(tube);
free(tube);
}
+void tube_close_read(struct tube* tube)
+{
+ if(tube->sr != -1) {
+ close(tube->sr);
+ tube->sr = -1;
+ }
+}
+
+void tube_close_write(struct tube* tube)
+{
+ if(tube->sw != -1) {
+ close(tube->sw);
+ tube->sw = -1;
+ }
+}
+
+void tube_remove_bg_listen(struct tube* tube)
+{
+ if(tube->listen_com) {
+ comm_point_delete(tube->listen_com);
+ tube->listen_com = NULL;
+ }
+ if(tube->cmd_msg) {
+ free(tube->cmd_msg);
+ tube->cmd_msg = NULL;
+ }
+}
+
+void tube_remove_bg_write(struct tube* tube)
+{
+ if(tube->res_com) {
+ comm_point_delete(tube->res_com);
+ tube->res_com = NULL;
+ }
+ if(tube->res_list) {
+ struct tube_res_list* np, *p = tube->res_list;
+ tube->res_list = NULL;
+ tube->res_last = NULL;
+ while(p) {
+ np = p->next;
+ free(p->buf);
+ free(p);
+ p = np;
+ }
+ }
+}
+
int
tube_handle_listen(struct comm_point* c, void* arg, int error,
struct comm_reply* ATTR_UNUSED(reply_info))
{
struct tube* tube = (struct tube*)arg;
+ ssize_t r;
if(error != NETEVENT_NOERROR) {
fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
- (*tube->listen_cb)(tube, NULL, error, tube->listen_arg);
+ (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
return 0;
}
+
+ if(tube->cmd_read < sizeof(tube->cmd_len)) {
+ /* complete reading the length of control msg */
+ r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
+ sizeof(tube->cmd_len) - tube->cmd_read);
+ if(r==0) {
+ /* error has happened or */
+ /* parent closed pipe, must have exited somehow */
+ fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
+ (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
+ tube->listen_arg);
+ return 0;
+ }
+ if(r==-1) {
+ if(errno != EAGAIN && errno != EINTR) {
+ log_err("rpipe error: %s", strerror(errno));
+ }
+ /* nothing to read now, try later */
+ return 0;
+ }
+ tube->cmd_read += r;
+ if(tube->cmd_read < sizeof(tube->cmd_len)) {
+ /* not complete, try later */
+ return 0;
+ }
+ tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
+ if(!tube->cmd_msg) {
+ log_err("malloc failure");
+ tube->cmd_read = 0;
+ return 0;
+ }
+ }
+ /* cmd_len has been read, read remainder */
+ r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
+ tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
+ if(r==0) {
+ /* error has happened or */
+ /* parent closed pipe, must have exited somehow */
+ fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
+ (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
+ tube->listen_arg);
+ return 0;
+ }
+ if(r==-1) {
+ /* nothing to read now, try later */
+ if(errno != EAGAIN && errno != EINTR) {
+ log_err("rpipe error: %s", strerror(errno));
+ }
+ return 0;
+ }
+ tube->cmd_read += r;
+ if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
+ /* not complete, try later */
+ return 0;
+ }
+ tube->cmd_read = 0;
+
fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
- (*tube->listen_cb)(tube, c->buffer, error, tube->listen_arg);
+ (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
+ NETEVENT_NOERROR, tube->listen_arg);
+ /* also frees the buf */
+ tube->cmd_msg = NULL;
return 0;
}
-int tube_listen_cmd(struct tube* tube, struct comm_base* base,
- size_t msg_buffer_sz, tube_callback_t* cb, void* arg)
+int
+tube_handle_write(struct comm_point* c, void* arg, int error,
+ struct comm_reply* ATTR_UNUSED(reply_info))
+{
+ struct tube* tube = (struct tube*)arg;
+ struct tube_res_list* item = tube->res_list;
+ ssize_t r;
+ if(error != NETEVENT_NOERROR) {
+ log_err("tube_handle_write net error %d", error);
+ return 0;
+ }
+
+ if(!item) {
+ comm_point_stop_listening(c);
+ return 0;
+ }
+
+ if(tube->res_write < sizeof(item->len)) {
+ r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
+ sizeof(item->len) - tube->res_write);
+ if(r == -1) {
+ if(errno != EAGAIN && errno != EINTR) {
+ log_err("wpipe error: %s", strerror(errno));
+ }
+ return 0; /* try again later */
+ }
+ if(r == 0) {
+ /* error on pipe, must have exited somehow */
+ /* cannot signal this to pipe user */
+ return 0;
+ }
+ tube->res_write += r;
+ if(tube->res_write < sizeof(item->len))
+ return 0;
+ }
+ r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
+ item->len - (tube->res_write - sizeof(item->len)));
+ if(r == -1) {
+ if(errno != EAGAIN && errno != EINTR) {
+ log_err("wpipe error: %s", strerror(errno));
+ }
+ return 0; /* try again later */
+ }
+ if(r == 0) {
+ /* error on pipe, must have exited somehow */
+ /* cannot signal this to pipe user */
+ return 0;
+ }
+ tube->res_write += r;
+ if(tube->res_write < sizeof(item->len) + item->len)
+ return 0;
+ /* done this result, remove it */
+ free(item->buf);
+ item->buf = NULL;
+ tube->res_list = tube->res_list->next;
+ free(item);
+ if(!tube->res_list) {
+ tube->res_last = NULL;
+ comm_point_stop_listening(c);
+ }
+ tube->res_write = 0;
+ return 0;
+}
+
+int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
+ int nonblock)
+{
+ ssize_t r;
+ int fd = tube->sw;
+
+ /* test */
+ if(nonblock) {
+ r = write(fd, &len, sizeof(len));
+ if(r == -1) {
+ if(errno==EINTR || errno==EAGAIN)
+ return -1;
+ log_err("tube msg write failed: %s", strerror(errno));
+ return -1; /* can still continue, perhaps */
+ }
+ } else r = 0;
+ if(!fd_set_block(fd))
+ return 0;
+ /* write remainder */
+ if(r != (ssize_t)sizeof(len)) {
+ if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
+ log_err("tube msg write failed: %s", strerror(errno));
+ (void)fd_set_nonblock(fd);
+ return 0;
+ }
+ }
+ if(write(fd, buf, len) == -1) {
+ log_err("tube msg write failed: %s", strerror(errno));
+ (void)fd_set_nonblock(fd);
+ return 0;
+ }
+ if(!fd_set_nonblock(fd))
+ return 0;
+ return 1;
+}
+
+int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
+ int nonblock)
+{
+ ssize_t r;
+ int fd = tube->sr;
+
+ /* test */
+ *len = 0;
+ if(nonblock) {
+ r = read(fd, len, sizeof(*len));
+ if(r == -1) {
+ if(errno==EINTR || errno==EAGAIN)
+ return -1;
+ log_err("tube msg read failed: %s", strerror(errno));
+ return -1; /* we can still continue, perhaps */
+ }
+ if(r == 0) /* EOF */
+ return 0;
+ } else r = 0;
+ if(!fd_set_block(fd))
+ return 0;
+ /* read remainder */
+ if(r != (ssize_t)sizeof(*len)) {
+ if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
+ log_err("tube msg read failed: %s", strerror(errno));
+ (void)fd_set_nonblock(fd);
+ return 0;
+ }
+ if(r == 0) /* EOF */ {
+ (void)fd_set_nonblock(fd);
+ return 0;
+ }
+ }
+ *buf = (uint8_t*)malloc(*len);
+ if(!*buf) {
+ log_err("tube read out of memory");
+ (void)fd_set_nonblock(fd);
+ return 0;
+ }
+ if((r=read(fd, *buf, *len)) == -1) {
+ log_err("tube msg read failed: %s", strerror(errno));
+ (void)fd_set_nonblock(fd);
+ free(*buf);
+ return 0;
+ }
+ if(r == 0) { /* EOF */
+ (void)fd_set_nonblock(fd);
+ free(*buf);
+ return 0;
+ }
+ if(!fd_set_nonblock(fd)) {
+ free(*buf);
+ return 0;
+ }
+ return 1;
+}
+
+/** perform a select() on the fd */
+static int
+pollit(int fd, struct timeval* t)
+{
+ fd_set r;
+#ifndef S_SPLINT_S
+ FD_ZERO(&r);
+ FD_SET(FD_SET_T fd, &r);
+#endif
+ if(select(fd+1, &r, NULL, NULL, t) == -1) {
+ return 0;
+ }
+ errno = 0;
+ return FD_ISSET(fd, &r);
+}
+
+int tube_poll(struct tube* tube)
+{
+ struct timeval t;
+ memset(&t, 0, sizeof(t));
+ return pollit(tube->sr, &t);
+}
+
+int tube_wait(struct tube* tube)
+{
+ return pollit(tube->sr, NULL);
+}
+
+int tube_read_fd(struct tube* tube)
+{
+ return tube->sr;
+}
+
+int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
+ tube_callback_t* cb, void* arg)
{
tube->listen_cb = cb;
tube->listen_arg = arg;
- if(!(tube->listen_com = comm_point_create_local(base, tube->sr,
- msg_buffer_sz, tube_handle_listen, tube))) {
- log_err("tube_listen_cmd: commpoint creation failed");
+ if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
+ 0, tube_handle_listen, tube))) {
+ int err = errno;
+ log_err("tube_setup_bg_l: commpoint creation failed");
+ errno = err;
+ return 0;
+ }
+ return 1;
+}
+
+int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
+{
+ if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
+ 1, tube_handle_write, tube))) {
+ int err = errno;
+ log_err("tube_setup_bg_w: commpoint creation failed");
+ errno = err;
return 0;
}
return 1;
}
-int tube_send_cmd(struct tube* tube, ldns_buffer* buffer)
+int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
{
- if(!write_socket(tube->sw, ldns_buffer_begin(buffer),
- ldns_buffer_limit(buffer))) {
- log_err("write socket: %s", strerror(errno));
+ struct tube_res_list* item =
+ (struct tube_res_list*)malloc(sizeof(*item));
+ if(!item) {
+ free(msg);
+ log_err("out of memory for async answer");
return 0;
}
+ item->buf = msg;
+ item->len = len;
+ item->next = NULL;
+ /* add at back of list, since the first one may be partially written */
+ if(tube->res_last)
+ tube->res_last->next = item;
+ else tube->res_list = item;
+ tube->res_last = item;
+ if(tube->res_list == tube->res_last) {
+ /* first added item, start the write process */
+ comm_point_start_listening(tube->res_com, -1, -1);
+ }
return 1;
}
struct comm_reply;
struct comm_base;
struct tube;
+struct tube_res_list;
/**
* Callback from pipe listen function
- * void mycallback(tube, buffer, error, argument);
- * if error is true (NETEVENT_*), buffer is probably NULL.
+ * void mycallback(tube, msg, len, error, user_argument);
+ * if error is true (NETEVENT_*), msg is probably NULL.
*/
-typedef void tube_callback_t(struct tube*, ldns_buffer*, int, void*);
+typedef void tube_callback_t(struct tube*, uint8_t*, size_t, int, void*);
/**
* A pipe
tube_callback_t* listen_cb;
/** listen callback user arg */
void* listen_arg;
+ /** are we currently reading a command, 0 if not, else bytecount */
+ size_t cmd_read;
+ /** size of current read command, may be partially read */
+ uint32_t cmd_len;
+ /** the current read command content, malloced, can be partially read*/
+ uint8_t* cmd_msg;
+
+ /** background write queue, commpoint to write results back */
+ struct comm_point* res_com;
+ /** are we curently writing a result, 0 if not, else bytecount into
+ * the res_list first entry. */
+ size_t res_write;
+ /** list of outstanding results to be written back */
+ struct tube_res_list* res_list;
+ /** last in list */
+ struct tube_res_list* res_last;
+};
+
+/**
+ * List of results (arbitrary command serializations) to write back
+ */
+struct tube_res_list {
+ /** next in list */
+ struct tube_res_list* next;
+ /** serialized buffer to write */
+ uint8_t* buf;
+ /** length to write */
+ uint32_t len;
};
/**
void tube_delete(struct tube* tube);
/**
- * Start listening for information over the pipe
+ * Write length bytes followed by message.
+ * @param tube: the tube to write on.
+ * If that tube is a pipe, its write fd is used as
+ * the socket to write on. Is nonblocking.
+ * Set to blocking by the function,
+ * and back to non-blocking at exit of function.
+ * @param buf: the message.
+ * @param len: length of message.
+ * @param nonblock: if set to true, the first write is nonblocking.
+ * If the first write fails the function returns -1.
+ * If set false, the first write is blocking.
+ * @return: all remainder writes are nonblocking.
+ * return 0 on error, in that case blocking/nonblocking of socket is
+ * unknown.
+ * return 1 if all OK.
+ */
+int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
+ int nonblock);
+
+/**
+ * Read length bytes followed by message.
+ * @param tube: The tube to read on.
+ * If that tube is a pipe, its read fd is used as
+ * the socket to read on. Is nonblocking.
+ * Set to blocking by the function,
+ * and back to non-blocking at exit of function.
+ * @param buf: the message, malloced.
+ * @param len: length of message, returned.
+ * @param nonblock: if set to true, the first read is nonblocking.
+ * If the first read fails the function returns -1.
+ * If set false, the first read is blocking.
+ * @return: all remainder reads are nonblocking.
+ * return 0 on error, in that case blocking/nonblocking of socket is
+ * unknown. On EOF 0 is returned.
+ * return 1 if all OK.
+ */
+int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
+ int nonblock);
+
+/**
+ * Close read part of the pipe.
+ * The tube can no longer be read from.
+ * @param tube: tube to operate on.
+ */
+void tube_close_read(struct tube* tube);
+
+/**
+ * Close write part of the pipe.
+ * The tube can no longer be written to.
+ * @param tube: tube to operate on.
+ */
+void tube_close_write(struct tube* tube);
+
+/**
+ * See if data is ready for reading on the tube without blocking.
+ * @param tube: tube to check for readable items
+ * @return true if readable items are present. False if not (or error).
+ * true on pipe_closed.
+ */
+int tube_poll(struct tube* tube);
+
+/**
+ * Wait for data to be ready for reading on the tube. is blocking.
+ * No timeout.
+ * @param tube: the tube to wait on.
+ * @return: if there was something to read (false on error).
+ * true on pipe_closed.
+ */
+int tube_wait(struct tube* tube);
+
+/**
+ * Get FD that is readable when new information arrives.
+ * @param tube
+ * @return file descriptor.
+ */
+int tube_read_fd(struct tube* tube);
+
+/**
+ * Start listening for information over the pipe.
+ * Background registration of a read listener, callback when read completed.
+ * Do not mix with tube_read_msg style direct reads from the pipe.
* @param tube: tube to listen on
* @param base: what base to register event callback.
- * @param msg_buffer_sz: what message buffer size to use.
* @param cb: callback routine.
* @param arg: user argument for callback routine.
* @return true if successful, false on error.
*/
-int tube_listen_cmd(struct tube* tube, struct comm_base* base,
- size_t msg_buffer_sz, tube_callback_t* cb, void* arg);
+int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
+ tube_callback_t* cb, void* arg);
/**
- * Send a command over a pipe, blocking operation.
- * @param tube: tube to send the info on.
- * @param buffer: buffer to send. starts with network order uint16 with
- * length of remainder of buffer.
- * The receiver does not receive the length uint16 in the buffer
- * (the buffer is sized appropriately).
- * @return 0 on error, true on success.
+ * Remove bg listen setup from event base.
+ * @param tube: what tube to cleanup
+ */
+void tube_remove_bg_listen(struct tube* tube);
+
+/**
+ * Start background write handler for the pipe.
+ * Do not mix with tube_write_msg style direct writes to the pipe.
+ * @param tube: tube to write on
+ * @param base: what base to register event handler on.
+ * @return true if successful, false on error.
*/
-int tube_send_cmd(struct tube* tube, ldns_buffer* buffer);
+int tube_setup_bg_write(struct tube* tube, struct comm_base* base);
-/** decl for fptr_wlist of tube pipe listen handler */
+/**
+ * Remove bg write setup from event base.
+ * @param tube: what tube to cleanup
+ */
+void tube_remove_bg_write(struct tube* tube);
+
+
+/**
+ * Append data item to background list of writes.
+ * Mallocs a list entry behind the scenes.
+ * Not locked behind the scenes, call from one thread or lock on outside.
+ * @param tube: what tube to queue on.
+ * @param msg: memory message to send. Is free()d after use.
+ * Put at the end of the to-send queue.
+ * @param len: length of item.
+ * @return 0 on failure (msg freed).
+ */
+int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len);
+
+/** for fptr wlist, callback function */
int tube_handle_listen(struct comm_point* c, void* arg, int error,
struct comm_reply* reply_info);
+/** for fptr wlist, callback function */
+int tube_handle_write(struct comm_point* c, void* arg, int error,
+ struct comm_reply* reply_info);
+
#endif /* UTIL_TUBE_H */