]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
- moved pipe actions to util/tube.c. easier porting and shared code.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Wed, 23 Jul 2008 09:23:03 +0000 (09:23 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Wed, 23 Jul 2008 09:23:03 +0000 (09:23 +0000)
- check _raw() commpoint callbacks with fptr_wlist.
- iana port update.

git-svn-id: file:///svn/unbound/trunk@1163 be551aaa-1e26-0410-a405-d3ace91eadb9

16 files changed:
daemon/daemon.c
daemon/worker.c
daemon/worker.h
doc/Changelog
libunbound/context.h
libunbound/libunbound.c
libunbound/libworker.c
libunbound/libworker.h
smallapp/worker_cb.c
testcode/fake_event.c
util/fptr_wlist.c
util/fptr_wlist.h
util/iana_ports.inc
util/netevent.c
util/tube.c
util/tube.h

index f34a1c0d7b51985371344b53223a45b393e5a8bb..00a32d3c506250790f3e176b5a649eaac7521d93 100644 (file)
@@ -357,8 +357,7 @@ daemon_stop_others(struct daemon* daemon)
        /* skip i=0, is this thread */
        /* use i=0 buffer for sending cmds; because we are #0 */
        for(i=1; i<daemon->num; i++) {
-               worker_send_cmd(daemon->workers[i], 
-                       daemon->workers[0]->front->udp_buff, worker_cmd_quit);
+               worker_send_cmd(daemon->workers[i], worker_cmd_quit);
        }
        /* wait for them to quit */
        for(i=1; i<daemon->num; i++) {
index d0e276c5704901e47456e3d3f9e58a1f87fd441e..910ca1eccdb2a998c95f24b1e3cdebd47565ff58 100644 (file)
@@ -191,15 +191,10 @@ worker_mem_report(struct worker* ATTR_UNUSED(worker),
 }
 
 void 
-worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
-       enum worker_commands cmd)
+worker_send_cmd(struct worker* worker, enum worker_commands cmd)
 {
-       ldns_buffer_clear(buffer);
-       /* like DNS message, length data */
-       ldns_buffer_write_u16(buffer, sizeof(uint32_t));
-       ldns_buffer_write_u32(buffer, (uint32_t)cmd);
-       ldns_buffer_flip(buffer);
-       if(!tube_send_cmd(worker->cmd, buffer)) {
+       uint32_t c = (uint32_t)cmd;
+       if(!tube_write_msg(worker->cmd, (uint8_t*)&c, sizeof(c), 0)) {
                log_err("worker send cmd %d failed", (int)cmd);
        }
 }
@@ -320,22 +315,23 @@ worker_check_request(ldns_buffer* pkt, struct worker* worker)
 }
 
 void 
-worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), ldns_buffer* buffer,
-       int error, void* arg)
+worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), uint8_t* msg,
+       size_t len, int error, void* arg)
 {
        struct worker* worker = (struct worker*)arg;
        enum worker_commands cmd;
        if(error != NETEVENT_NOERROR) {
+               free(msg);
                if(error == NETEVENT_CLOSED)
                        comm_base_exit(worker->base);
                else    log_info("control event: %d", error);
                return;
        }
-       if(ldns_buffer_limit(buffer) != sizeof(uint32_t)) {
-               fatal_exit("bad control msg length %d", 
-                       (int)ldns_buffer_limit(buffer));
+       if(len != sizeof(uint32_t)) {
+               fatal_exit("bad control msg length %d", (int)len);
        }
-       cmd = ldns_buffer_read_u32(buffer);
+       cmd = ldns_read_uint32(msg);
+       free(msg);
        switch(cmd) {
        case worker_cmd_quit:
                verbose(VERB_ALGO, "got control cmd quit");
@@ -998,9 +994,8 @@ worker_init(struct worker* worker, struct config_file *cfg,
        }
        if(worker->thread_num != 0) {
                /* start listening to commands */
-               if(!tube_listen_cmd(worker->cmd, worker->base,
-                       cfg->msg_buffer_size, &worker_handle_control_cmd,
-                       worker)) {
+               if(!tube_setup_bg_listen(worker->cmd, worker->base,
+                       &worker_handle_control_cmd, worker)) {
                        log_err("could not create control compt.");
                        worker_delete(worker);
                        return 0;
@@ -1179,6 +1174,13 @@ int libworker_handle_service_reply(struct comm_point* ATTR_UNUSED(c),
        return 0;
 }
 
+void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
+        uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+        int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
+{
+       log_assert(0);
+}
+
 int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
 {
        log_assert(0);
index 2823d400383c4b40ba872097892b83d7ad459715..20e11002fc27e98f19398c51f723d9ac9532d9bd 100644 (file)
@@ -150,11 +150,9 @@ void worker_delete(struct worker* worker);
 /**
  * Send a command to a worker. Uses blocking writes.
  * @param worker: worker to send command to.
- * @param buffer: an empty buffer to use.
  * @param cmd: command to send.
  */
-void worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
-        enum worker_commands cmd);
+void worker_send_cmd(struct worker* worker, enum worker_commands cmd);
 
 /**
  * Worker signal handler function. User argument is the worker itself.
@@ -199,11 +197,12 @@ struct outbound_entry* worker_send_query(uint8_t* qname, size_t qnamelen,
 /** 
  * process control messages from the main thread. 
  * @param tube: tube control message came on.
- * @param buffer: buffer with message in it.
+ * @param msg: message contents.
+ * @param len: length of message.
  * @param error: if error (NETEVENT_*) happened.
  * @param arg: user argument
  */
-void worker_handle_control_cmd(struct tube* tube, ldns_buffer* buffer,
+void worker_handle_control_cmd(struct tube* tube, uint8_t* msg, size_t len,
        int error, void* arg);
 
 /** handles callbacks from listening event interface */
index 5a277aa7ff00b2cb64110fdcb78d86584da1548a..054d390b820847d3969647560f8ba811ecd0e88b 100644 (file)
@@ -1,3 +1,8 @@
+22 July 2008: Wouter
+       - moved pipe actions to util/tube.c. easier porting and shared code.
+       - check _raw() commpoint callbacks with fptr_wlist.
+       - iana port update.
+
 21 July 2008: Wouter
        - #198: nicer entropy warning message. manpage OS hints.
 
index 13a6c4f36945741bb33f528a174973e763193f58..e1542a6ea634a12b5f931bf91a01af6dd1d73abc 100644 (file)
@@ -47,6 +47,7 @@
 #include "libunbound/unbound.h"
 #include "util/data/packed_rrset.h"
 struct libworker;
+struct tube;
 
 /**
  * The context structure
@@ -59,12 +60,12 @@ struct ub_ctx {
        /* --- pipes --- */
        /** mutex on query write pipe */
        lock_basic_t qqpipe_lock;
-       /** the query write pipe, [0] read from, [1] write on */
-       int qqpipe[2];
+       /** the query write pipe */
+       struct tube* qq_pipe;
        /** mutex on result read pipe */
        lock_basic_t rrpipe_lock;
-       /** the result read pipe, [0] read from, [1] write on */
-       int rrpipe[2];
+       /** the result read pipe */
+       struct tube* rr_pipe;
 
        /* --- shared data --- */
        /** mutex for access to env.cfg, finalized and dothread */
index 901e87033c53560c5680d69ef0071e8eeb249ffd..380fc9e3dde51708429f789aa6924a8a7ad39f0b 100644 (file)
@@ -54,6 +54,7 @@
 #include "util/log.h"
 #include "util/random.h"
 #include "util/net_help.h"
+#include "util/tube.h"
 #include "services/modstack.h"
 #include "services/localzone.h"
 #include "services/cache/infra.h"
@@ -95,45 +96,28 @@ ub_ctx_create()
                return NULL;
        }
        seed = 0;
-       if(socketpair(AF_UNIX, SOCK_STREAM, 0, ctx->qqpipe) == -1) {
-               ub_randfree(ctx->seed_rnd);
-               free(ctx);
-               return NULL;
-       }
-       if(socketpair(AF_UNIX, SOCK_STREAM, 0, ctx->rrpipe) == -1) {
+       if((ctx->qq_pipe = tube_create()) == NULL) {
                int e = errno;
-               close(ctx->qqpipe[0]);
-               close(ctx->qqpipe[1]);
                ub_randfree(ctx->seed_rnd);
                free(ctx);
                errno = e;
                return NULL;
        }
-#ifndef USE_WINSOCK
-       if(!fd_set_nonblock(ctx->rrpipe[0]) ||
-          !fd_set_nonblock(ctx->rrpipe[1]) ||
-          !fd_set_nonblock(ctx->qqpipe[0]) ||
-          !fd_set_nonblock(ctx->qqpipe[1])) {
+       if((ctx->rr_pipe = tube_create()) == NULL) {
                int e = errno;
-               close(ctx->rrpipe[0]);
-               close(ctx->rrpipe[1]);
-               close(ctx->qqpipe[0]);
-               close(ctx->qqpipe[1]);
+               tube_delete(ctx->qq_pipe);
                ub_randfree(ctx->seed_rnd);
                free(ctx);
                errno = e;
                return NULL;
        }
-#endif /* !USE_WINSOCK - it is a pipe(nonsocket) on windows) */
        lock_basic_init(&ctx->qqpipe_lock);
        lock_basic_init(&ctx->rrpipe_lock);
        lock_basic_init(&ctx->cfglock);
        ctx->env = (struct module_env*)calloc(1, sizeof(*ctx->env));
        if(!ctx->env) {
-               close(ctx->rrpipe[0]);
-               close(ctx->rrpipe[1]);
-               close(ctx->qqpipe[0]);
-               close(ctx->qqpipe[1]);
+               tube_delete(ctx->qq_pipe);
+               tube_delete(ctx->rr_pipe);
                ub_randfree(ctx->seed_rnd);
                free(ctx);
                errno = ENOMEM;
@@ -141,10 +125,8 @@ ub_ctx_create()
        }
        ctx->env->cfg = config_create_forlib();
        if(!ctx->env->cfg) {
-               close(ctx->rrpipe[0]);
-               close(ctx->rrpipe[1]);
-               close(ctx->qqpipe[0]);
-               close(ctx->qqpipe[1]);
+               tube_delete(ctx->qq_pipe);
+               tube_delete(ctx->rr_pipe);
                free(ctx->env);
                ub_randfree(ctx->seed_rnd);
                free(ctx);
@@ -180,11 +162,11 @@ ub_ctx_delete(struct ub_ctx* ctx)
                uint32_t cmd = UB_LIBCMD_QUIT;
                lock_basic_unlock(&ctx->cfglock);
                lock_basic_lock(&ctx->qqpipe_lock);
-               (void)libworker_write_msg(ctx->qqpipe[1], (uint8_t*)&cmd, 
+               (void)tube_write_msg(ctx->qq_pipe, (uint8_t*)&cmd, 
                        (uint32_t)sizeof(cmd), 0);
                lock_basic_unlock(&ctx->qqpipe_lock);
                lock_basic_lock(&ctx->rrpipe_lock);
-               while(libworker_read_msg(ctx->rrpipe[0], &msg, &len, 0)) {
+               while(tube_read_msg(ctx->rr_pipe, &msg, &len, 0)) {
                        /* discard all results except a quit confirm */
                        if(context_serial_getcmd(msg, len) == UB_LIBCMD_QUIT) {
                                free(msg);
@@ -222,18 +204,8 @@ ub_ctx_delete(struct ub_ctx* ctx)
        lock_basic_destroy(&ctx->qqpipe_lock);
        lock_basic_destroy(&ctx->rrpipe_lock);
        lock_basic_destroy(&ctx->cfglock);
-       if(ctx->qqpipe[0] != -1)
-               close(ctx->qqpipe[0]);
-       if(ctx->qqpipe[1] != -1)
-               close(ctx->qqpipe[1]);
-       if(ctx->rrpipe[0] != -1)
-               close(ctx->rrpipe[0]);
-       if(ctx->rrpipe[1] != -1)
-               close(ctx->rrpipe[1]);
-       ctx->qqpipe[0] = -1;
-       ctx->qqpipe[1] = -1;
-       ctx->rrpipe[0] = -1;
-       ctx->rrpipe[1] = -1;
+       tube_delete(ctx->qq_pipe);
+       tube_delete(ctx->rr_pipe);
        if(ctx->env) {
                slabhash_delete(ctx->env->msg_cache);
                rrset_cache_delete(ctx->env->rrset_cache);
@@ -376,35 +348,17 @@ ub_ctx_async(struct ub_ctx* ctx, int dothread)
        return UB_NOERROR;
 }
 
-/** perform a select() on the result read pipe */
-static int 
-pollit(struct ub_ctx* ctx, struct timeval* t)
-{
-       fd_set r;
-#ifndef S_SPLINT_S
-       FD_ZERO(&r);
-       FD_SET(FD_SET_T ctx->rrpipe[0], &r);
-#endif
-       if(select(ctx->rrpipe[0]+1, &r, NULL, NULL, t) == -1) {
-               return 0;
-       }
-       errno = 0;
-       return FD_ISSET(ctx->rrpipe[0], &r);
-}
-
 int 
 ub_poll(struct ub_ctx* ctx)
 {
-       struct timeval t;
-       memset(&t, 0, sizeof(t));
        /* no need to hold lock while testing for readability. */
-       return pollit(ctx, &t);
+       return tube_poll(ctx->rr_pipe);
 }
 
 int 
 ub_fd(struct ub_ctx* ctx)
 {
-       return ctx->rrpipe[0];
+       return tube_read_fd(ctx->rr_pipe);
 }
 
 /** process answer from bg worker */
@@ -501,7 +455,7 @@ ub_process(struct ub_ctx* ctx)
        while(1) {
                msg = NULL;
                lock_basic_lock(&ctx->rrpipe_lock);
-               r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
+               r = tube_read_msg(ctx->rr_pipe, &msg, &len, 1);
                lock_basic_unlock(&ctx->rrpipe_lock);
                if(r == 0)
                        return UB_PIPE;
@@ -527,7 +481,7 @@ ub_wait(struct ub_ctx* ctx)
        uint8_t* msg;
        uint32_t len;
        /* this is basically the same loop as _process(), but with changes.
-        * holds the rrpipe lock and waits with pollit */
+        * holds the rrpipe lock and waits with tube_wait */
        while(1) {
                lock_basic_lock(&ctx->rrpipe_lock);
                lock_basic_lock(&ctx->cfglock);
@@ -544,9 +498,9 @@ ub_wait(struct ub_ctx* ctx)
                 *      o possibly decrementing num_async
                 * do callback without lock
                 */
-               r = pollit(ctx, NULL);
+               r = tube_wait(ctx->rr_pipe);
                if(r) {
-                       r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
+                       r = tube_read_msg(ctx->rr_pipe, &msg, &len, 1);
                        if(r == 0) {
                                lock_basic_unlock(&ctx->rrpipe_lock);
                                return UB_PIPE;
@@ -667,7 +621,7 @@ ub_resolve_async(struct ub_ctx* ctx, char* name, int rrtype,
        lock_basic_unlock(&ctx->cfglock);
        
        lock_basic_lock(&ctx->qqpipe_lock);
-       if(!libworker_write_msg(ctx->qqpipe[1], msg, len, 0)) {
+       if(!tube_write_msg(ctx->qq_pipe, msg, len, 0)) {
                lock_basic_unlock(&ctx->qqpipe_lock);
                free(msg);
                return UB_PIPE;
@@ -705,7 +659,7 @@ ub_cancel(struct ub_ctx* ctx, int async_id)
                }
                /* send cancel to background worker */
                lock_basic_lock(&ctx->qqpipe_lock);
-               if(!libworker_write_msg(ctx->qqpipe[1], msg, len, 0)) {
+               if(!tube_write_msg(ctx->qq_pipe, msg, len, 0)) {
                        lock_basic_unlock(&ctx->qqpipe_lock);
                        free(msg);
                        return UB_PIPE;
index 629eb3f6b513577720c32f860d631438ed84e66e..a4fe7a74b764c13cdc1d7d255b53d405712d22e0 100644 (file)
@@ -79,10 +79,7 @@ libworker_delete(struct libworker* w)
                ub_randfree(w->env->rnd);
                free(w->env);
        }
-       free(w->cmd_msg);
        outside_network_delete(w->back);
-       comm_point_delete(w->cmd_com);
-       comm_point_delete(w->res_com);
        comm_base_delete(w->base);
        free(w);
 }
@@ -231,130 +228,19 @@ libworker_do_cmd(struct libworker* w, uint8_t* msg, uint32_t len)
 }
 
 /** handle control command coming into server */
-int 
-libworker_handle_control_cmd(struct comm_point* c, void* arg
-       int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
+void 
+libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube)
+       uint8_t* msg, size_t len, int err, void* arg)
 {
        struct libworker* w = (struct libworker*)arg;
-       ssize_t r;
 
-       if(w->cmd_read < sizeof(w->cmd_len)) {
-               /* complete reading the length of control msg */
-               r = read(c->fd, ((uint8_t*)&w->cmd_len) + w->cmd_read,
-                       sizeof(w->cmd_len) - w->cmd_read);
-               if(r==0) {
-                       /* error has happened or */
-                       /* parent closed pipe, must have exited somehow */
-                       /* it is of no use to go on, exit */
-                       comm_base_exit(w->base);
-                       return 0;
-               }
-               if(r==-1) {
-                       if(errno != EAGAIN && errno != EINTR) {
-                               log_err("rpipe error: %s", strerror(errno));
-                       }
-                       /* nothing to read now, try later */
-                       return 0;
-               }
-               w->cmd_read += r;
-               if(w->cmd_read < sizeof(w->cmd_len)) {
-                       /* not complete, try later */
-                       return 0;
-               }
-               w->cmd_msg = (uint8_t*)calloc(1, w->cmd_len);
-               if(!w->cmd_msg) {
-                       log_err("malloc failure");
-                       w->cmd_read = 0;
-                       return 0;
-               }
-       }
-       /* cmd_len has been read, read remainder */
-       r = read(c->fd, w->cmd_msg + w->cmd_read - sizeof(w->cmd_len),
-               w->cmd_len - (w->cmd_read - sizeof(w->cmd_len)));
-       if(r==0) {
-               /* error has happened or */
-               /* parent closed pipe, must have exited somehow */
-               /* it is of no use to go on, exit */
-               comm_base_exit(w->base);
-               return 0;
-       }
-       if(r==-1) {
-               /* nothing to read now, try later */
-               if(errno != EAGAIN && errno != EINTR) {
-                       log_err("rpipe error: %s", strerror(errno));
-               }
-               return 0;
-       }
-       w->cmd_read += r;
-       if(w->cmd_read < sizeof(w->cmd_len) + w->cmd_len) {
-               /* not complete, try later */
-               return 0;
-       }
-       w->cmd_read = 0;
-       libworker_do_cmd(w, w->cmd_msg, w->cmd_len); /* also frees the buf */
-       w->cmd_msg = NULL;
-       return 0;
-}
-
-/** handle opportunity to write result back */
-int 
-libworker_handle_result_write(struct comm_point* c, void* arg, 
-       int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
-{
-       struct libworker* w = (struct libworker*)arg;
-       struct libworker_res_list* item = w->res_list;
-       ssize_t r;
-       if(!item) {
-               comm_point_stop_listening(c);
-               return 0;
-       }
-       if(w->res_write < sizeof(item->len)) {
-               r = write(c->fd, ((uint8_t*)&item->len) + w->res_write,
-                       sizeof(item->len) - w->res_write);
-               if(r == -1) {
-                       if(errno != EAGAIN && errno != EINTR) {
-                               log_err("wpipe error: %s", strerror(errno));
-                       }
-                       return 0; /* try again later */
-               }
-               if(r == 0) {
-                       /* error on pipe, must have exited somehow */
-                       /* it is of no use to go on, exit */
-                       comm_base_exit(w->base);
-                       return 0;
-               }
-               w->res_write += r;
-               if(w->res_write < sizeof(item->len))
-                       return 0;
-       }
-       r = write(c->fd, item->buf + w->res_write - sizeof(item->len),
-               item->len - (w->res_write - sizeof(item->len)));
-       if(r == -1) {
-               if(errno != EAGAIN && errno != EINTR) {
-                       log_err("wpipe error: %s", strerror(errno));
-               }
-               return 0; /* try again later */
-       }
-       if(r == 0) {
-               /* error on pipe, must have exited somehow */
+       if(err != 0) {
+               free(msg);
                /* it is of no use to go on, exit */
                comm_base_exit(w->base);
-               return 0;
-       }
-       w->res_write += r;
-       if(w->res_write < sizeof(item->len) + item->len)
-               return 0;
-       /* done this result, remove it */
-       free(item->buf);
-       item->buf = NULL;
-       w->res_list = w->res_list->next;
-       free(item);
-       if(!w->res_list) {
-               w->res_last = NULL;
-               comm_point_stop_listening(c);
+               return;
        }
-       w->res_write = 0;
-       return 0;
+       libworker_do_cmd(w, msg, len); /* also frees the buf */
 }
 
 /** the background thread func */
@@ -363,7 +249,6 @@ libworker_dobg(void* arg)
 {
        /* setup */
        uint32_t m;
-       int fd;
        struct libworker* w = (struct libworker*)arg;
        struct ub_ctx* ctx = w->ctx;
        log_thread_set(&w->thread_num);
@@ -371,27 +256,20 @@ libworker_dobg(void* arg)
        /* we are forked */
        w->is_bg_thread = 0;
        /* close non-used parts of the pipes */
-       if(ctx->qqpipe[1] != -1) {
-               close(ctx->qqpipe[1]);
-               ctx->qqpipe[1] = -1;
-       }
-       if(ctx->rrpipe[0] != -1) {
-               close(ctx->rrpipe[0]);
-               ctx->rrpipe[0] = -1;
-       }
+       tube_close_write(ctx->qq_pipe);
+       tube_close_read(ctx->rr_pipe);
 #endif
        if(!w) {
                log_err("libunbound bg worker init failed, nomem");
                return NULL;
        }
-       if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0
-               libworker_handle_control_cmd, w))) {
-               log_err("libunbound bg worker init failed, no cmdcom");
+       if(!tube_setup_bg_listen(ctx->qq_pipe, w->base
+               libworker_handle_control_cmd, w)) {
+               log_err("libunbound bg worker init failed, no bglisten");
                return NULL;
        }
-       if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1,
-               libworker_handle_result_write, w))) {
-               log_err("libunbound bg worker init failed, no rescom");
+       if(!tube_setup_bg_write(ctx->rr_pipe, w->base)) {
+               log_err("libunbound bg worker init failed, no bgwrite");
                return NULL;
        }
 
@@ -399,14 +277,17 @@ libworker_dobg(void* arg)
        comm_base_dispatch(w->base);
 
        /* cleanup */
-       fd = ctx->rrpipe[1];
-       ctx->rrpipe[1] = -1;
        m = UB_LIBCMD_QUIT;
+       tube_remove_bg_listen(w->ctx->qq_pipe);
+       tube_remove_bg_write(w->ctx->rr_pipe);
        libworker_delete(w);
-       close(ctx->qqpipe[0]);
-       ctx->qqpipe[0] = -1;
-       (void)libworker_write_msg(fd, (uint8_t*)&m, (uint32_t)sizeof(m), 0);
-       close(fd);
+       (void)tube_write_msg(ctx->rr_pipe, (uint8_t*)&m, 
+               (uint32_t)sizeof(m), 0);
+#ifdef THREADS_DISABLED
+       /* close pipes from forked process before exit */
+       tube_close_read(ctx->qq_pipe);
+       tube_close_write(ctx->rr_pipe);
+#endif
        return NULL;
 }
 
@@ -435,10 +316,8 @@ int libworker_bg(struct ub_ctx* ctx)
                                w = libworker_setup(ctx, 1);
                                if(!w) fatal_exit("out of memory");
                                /* close non-used parts of the pipes */
-                               close(ctx->qqpipe[1]);
-                               close(ctx->rrpipe[0]);
-                               ctx->qqpipe[1] = -1;
-                               ctx->rrpipe[0] = -1;
+                               tube_close_write(ctx->qq_pipe);
+                               tube_close_read(ctx->rr_pipe);
                                (void)libworker_dobg(w);
                                exit(0);
                                break;
@@ -655,7 +534,6 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt,
 {
        uint8_t* msg = NULL;
        uint32_t len = 0;
-       struct libworker_res_list* item;
 
        /* serialize and delete unneeded q */
        if(w->is_bg_thread) {
@@ -677,24 +555,10 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt,
                log_err("out of memory for async answer");
                return;
        }
-       item = (struct libworker_res_list*)malloc(sizeof(*item));
-       if(!item) {
-               free(msg);
+       if(!tube_queue_item(w->ctx->rr_pipe, msg, len)) {
                log_err("out of memory for async answer");
                return;
        }
-       item->buf = msg;
-       item->len = len;
-       item->next = NULL;
-       /* add at back of list, since the first one may be partially written */
-       if(w->res_last)
-               w->res_last->next = item;
-       else    w->res_list = item;
-       w->res_last = item;
-       if(w->res_list == w->res_last) {
-               /* first added item, start the write process */
-               comm_point_start_listening(w->res_com, -1, -1);
-       }
 }
 
 /** callback with bg results */
@@ -873,100 +737,10 @@ libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
        return 0;
 }
 
-int 
-libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock)
-{
-       ssize_t r;
-       /* test */
-       if(nonblock) {
-               r = write(fd, &len, sizeof(len));
-               if(r == -1) {
-                       if(errno==EINTR || errno==EAGAIN)
-                               return -1;
-                       log_err("msg write failed: %s", strerror(errno));
-                       return -1; /* can still continue, perhaps */
-               }
-       } else r = 0;
-       if(!fd_set_block(fd))
-               return 0;
-       /* write remainder */
-       if(r != (ssize_t)sizeof(len)) {
-               if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
-                       log_err("msg write failed: %s", strerror(errno));
-                       (void)fd_set_nonblock(fd);
-                       return 0;
-               }
-       }
-       if(write(fd, buf, len) == -1) {
-               log_err("msg write failed: %s", strerror(errno));
-               (void)fd_set_nonblock(fd);
-               return 0;
-       }
-       if(!fd_set_nonblock(fd))
-               return 0;
-       return 1;
-}
-
-int 
-libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock)
-{
-       ssize_t r;
-
-       /* test */
-       *len = 0;
-       if(nonblock) {
-               r = read(fd, len, sizeof(*len));
-               if(r == -1) {
-                       if(errno==EINTR || errno==EAGAIN)
-                               return -1;
-                       log_err("msg read failed: %s", strerror(errno));
-                       return -1; /* we can still continue, perhaps */
-               }
-               if(r == 0) /* EOF */
-                       return 0;
-       } else r = 0;
-       if(!fd_set_block(fd))
-               return 0;
-       /* read remainder */
-       if(r != (ssize_t)sizeof(*len)) {
-               if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
-                       log_err("msg read failed: %s", strerror(errno));
-                       (void)fd_set_nonblock(fd);
-                       return 0;
-               }
-               if(r == 0) /* EOF */ {
-                       (void)fd_set_nonblock(fd);
-                       return 0;
-               }
-       }
-       *buf = (uint8_t*)malloc(*len);
-       if(!*buf) {
-               log_err("out of memory");
-               (void)fd_set_nonblock(fd);
-               return 0;
-       }
-       if((r=read(fd, *buf, *len)) == -1) {
-               log_err("msg read failed: %s", strerror(errno));
-               (void)fd_set_nonblock(fd);
-               free(*buf);
-               return 0;
-       }
-       if(r == 0) { /* EOF */
-               (void)fd_set_nonblock(fd);
-               free(*buf);
-               return 0;
-       }
-       if(!fd_set_nonblock(fd)) {
-               free(*buf);
-               return 0;
-       }
-       return 1;
-}
-
 /* --- fake callbacks for fptr_wlist to work --- */
 void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), 
-       ldns_buffer* ATTR_UNUSED(buffer), int ATTR_UNUSED(error), 
-       void* ATTR_UNUSED(arg))
+       uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+       int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
 {
        log_assert(0);
 }
index 6b2543717a1eeadde83fa5e7391bf9c6ba6ef924..224f26775bc4a09e58a229c4387b9e266041bb75 100644 (file)
@@ -55,8 +55,8 @@ struct outbound_entry;
 struct module_qstate;
 struct comm_point;
 struct comm_reply;
-struct libworker_res_list;
 struct regional;
+struct tube;
 
 /** 
  * The library-worker status structure
@@ -81,37 +81,6 @@ struct libworker {
        struct outside_network* back;
        /** random() table for this worker. */
        struct ub_randstate* rndstate;
-       
-       /** commpoint to listen to commands */
-       struct comm_point* cmd_com;
-       /** are we currently reading a command, 0 if not, else bytecount */
-       size_t cmd_read;
-       /** size of current read command, may be partially read */
-       uint32_t cmd_len;
-       /** the current read command content, malloced, can be partially read*/
-       uint8_t* cmd_msg;
-
-       /** commpoint to write results back */
-       struct comm_point* res_com;
-       /** are we curently writing a result, 0 if not, else bytecount into
-        * the res_list first entry. */
-       size_t res_write;
-       /** list of outstanding results to be written back */
-       struct libworker_res_list* res_list;
-       /** last in list */
-       struct libworker_res_list* res_last;
-};
-
-/**
- * List of results (arbitrary command serializations) to write back
- */
-struct libworker_res_list {
-       /** next in list */
-       struct libworker_res_list* next;
-       /** serialized buffer to write */
-       uint8_t* buf;
-       /** length to write */
-       uint32_t len;
 };
 
 /**
@@ -180,46 +149,12 @@ int libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
         struct comm_reply* reply_info);
 
 /** handle control command coming into server */
-int libworker_handle_control_cmd(struct comm_point* c, void* arg, 
-       int err, struct comm_reply* rep);
+void libworker_handle_control_cmd(struct tube* tube, uint8_t* msg, size_t len,
+       int err, void* arg);
 
 /** handle opportunity to write result back */
-int libworker_handle_result_write(struct comm_point* c, void* arg, 
-       int err, struct comm_reply* rep);
-
-/**
- * Write length bytes followed by message.
- * @param fd: the socket to write on. Is nonblocking.
- *     Set to blocking by the function,
- *     and back to non-blocking at exit of function.
- * @param buf: the message.
- * @param len: length of message.
- * @param nonblock: if set to true, the first write is nonblocking.
- *     If the first write fails the function returns -1.
- *     If set false, the first write is blocking.
- * @return: all remainder writes are nonblocking.
- *     return 0 on error, in that case blocking/nonblocking of socket is
- *             unknown.
- *     return 1 if all OK.
- */
-int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock);
-
-/**
- * Read length bytes followed by message.
- * @param fd: the socket to write on. Is nonblocking.
- *     Set to blocking by the function,
- *     and back to non-blocking at exit of function.
- * @param buf: the message, malloced.
- * @param len: length of message, returned.
- * @param nonblock: if set to true, the first read is nonblocking.
- *     If the first read fails the function returns -1.
- *     If set false, the first read is blocking.
- * @return: all remainder reads are nonblocking.
- *     return 0 on error, in that case blocking/nonblocking of socket is 
- *             unknown. On EOF 0 is returned.
- *     return 1 if all OK.
- */
-int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock);
+void libworker_handle_result_write(struct tube* tube, uint8_t* msg, size_t len,
+       int err, void* arg);
 
 /** 
  * fill result from parsed message, on error fills servfail 
index 54d1f1fc81e17808550dcfb0bb18cb5d63783665..fa371466648c22930188b5caa97357bdc51aaf54 100644 (file)
@@ -48,8 +48,8 @@ struct module_qstate;
 struct tube;
 
 void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
-       ldns_buffer* ATTR_UNUSED(buffer), int ATTR_UNUSED(error),
-       void* ATTR_UNUSED(arg))
+       uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+       int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
 {
        log_assert(0);
 }
@@ -150,6 +150,13 @@ int libworker_handle_service_reply(struct comm_point* ATTR_UNUSED(c),
        return 0;
 }
 
+void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
+        uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
+        int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
+{
+        log_assert(0);
+}
+
 int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
 {
        log_assert(0);
index f68772918625b79208576a621a708989678de047..dfc8cc8a2f3a80d9ebf4e4b3016ff577fb7905e2 100644 (file)
@@ -945,6 +945,26 @@ struct comm_point* comm_point_create_local(struct comm_base* ATTR_UNUSED(base),
        return calloc(1, 1);
 }
 
+struct comm_point* comm_point_create_raw(struct comm_base* ATTR_UNUSED(base),
+        int ATTR_UNUSED(fd), int ATTR_UNUSED(writing),
+        comm_point_callback_t* ATTR_UNUSED(callback), 
+       void* ATTR_UNUSED(callback_arg))
+{
+       /* no pipe comm possible */
+       return calloc(1, 1);
+}
+
+void comm_point_start_listening(struct comm_point* ATTR_UNUSED(c), 
+       int ATTR_UNUSED(newfd), int ATTR_UNUSED(sec))
+{
+       /* no bg write pipe comm possible */
+}
+
+void comm_point_stop_listening(struct comm_point* ATTR_UNUSED(c))
+{
+       /* no bg write pipe comm possible */
+}
+
 /* only cmd com _local gets deleted */
 void comm_point_delete(struct comm_point* c)
 {
index 98caef554229f67b26ea9f3b478167f4852e83ba..23013386a389a5fc2442537b7b493d43e0838d94 100644 (file)
@@ -80,6 +80,14 @@ fptr_whitelist_comm_point(comm_point_callback_t *fptr)
        return 0;
 }
 
+int 
+fptr_whitelist_comm_point_raw(comm_point_callback_t *fptr)
+{
+       if(fptr == &tube_handle_listen) return 1;
+       else if(fptr == &tube_handle_write) return 1;
+       return 0;
+}
+
 int 
 fptr_whitelist_comm_timer(void (*fptr)(void*))
 {
@@ -330,5 +338,6 @@ fptr_whitelist_alloc_cleanup(void (*fptr)(void*))
 int fptr_whitelist_tube_listen(tube_callback_t* fptr)
 {
        if(fptr == &worker_handle_control_cmd) return 1;
+       else if(fptr == &libworker_handle_control_cmd) return 1;
        return 0;
 }
index b8451260ee300cfe128bb2133e766d69aea58eac..28507b5a291f7aed19569242c021ddb32d185f34 100644 (file)
  */
 int fptr_whitelist_comm_point(comm_point_callback_t *fptr);
 
+/**
+ * Check function pointer whitelist for raw comm_point callback values.
+ *
+ * @param fptr: function pointer to check.
+ * @return false if not in whitelist.
+ */
+int fptr_whitelist_comm_point_raw(comm_point_callback_t *fptr);
+
 /**
  * Check function pointer whitelist for comm_timer callback values.
  *
index ae9c8a967d77985c739d2e7e6e6c962b726608bc..3c41790f89c14691053bfa5a1087ec110379ffdc 100644 (file)
 24249,
 24321,
 24386,
+24465,
 24554,
 24677,
 24678,
index 47bec1863a69e4d456975ad13f954b6200ac909d..8ca541109083660232ff573478c9430bf7b03cdf 100644 (file)
@@ -943,6 +943,7 @@ void comm_point_raw_handle_callback(int ATTR_UNUSED(fd),
        log_assert(c->type == comm_raw);
        comm_base_now(c->ev->base);
 
+       fptr_ok(fptr_whitelist_comm_point_raw(c->callback));
        (void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL);
 }
 
index 4f58054f5ab9904be85de0272bf1faf70b2ee552..5022ee65f61246ea016ff6a5510a3de850fd5ff0 100644 (file)
@@ -49,19 +49,28 @@ struct tube* tube_create(void)
 {
        struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
        int sv[2];
-       if(!tube) return 0;
+       if(!tube) {
+               int err = errno;
+               log_err("tube_create: out of memory");
+               errno = err;
+               return NULL;
+       }
        tube->sr = -1;
        tube->sw = -1;
        if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
+               int err = errno;
                log_err("socketpair: %s", strerror(errno));
                free(tube);
+               errno = err;
                return NULL;
        }
        tube->sr = sv[0];
        tube->sw = sv[1];
        if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
+               int err = errno;
                log_err("tube: cannot set nonblocking");
                tube_delete(tube);
+               errno = err;
                return NULL;
        }
        return tube;
@@ -70,52 +79,375 @@ struct tube* tube_create(void)
 void tube_delete(struct tube* tube)
 {
        if(!tube) return;
-       if(tube->listen_com) {
-               comm_point_delete(tube->listen_com);
-       }
+       tube_remove_bg_listen(tube);
+       tube_remove_bg_write(tube);
        /* close fds after deleting commpoints, to be sure.
         *            Also epoll does not like closing fd before event_del */
-       if(tube->sr != -1) close(tube->sr);
-       if(tube->sw != -1) close(tube->sw);
-       tube->sr = -1;
-       tube->sw = -1;
+       tube_close_read(tube);
+       tube_close_write(tube);
        free(tube);
 }
 
+void tube_close_read(struct tube* tube)
+{
+       if(tube->sr != -1) {
+               close(tube->sr);
+               tube->sr = -1;
+       }
+}
+
+void tube_close_write(struct tube* tube)
+{
+       if(tube->sw != -1) {
+               close(tube->sw);
+               tube->sw = -1;
+       }
+}
+
+void tube_remove_bg_listen(struct tube* tube)
+{
+       if(tube->listen_com) {
+               comm_point_delete(tube->listen_com);
+               tube->listen_com = NULL;
+       }
+       if(tube->cmd_msg) {
+               free(tube->cmd_msg);
+               tube->cmd_msg = NULL;
+       }
+}
+
+void tube_remove_bg_write(struct tube* tube)
+{
+       if(tube->res_com) {
+               comm_point_delete(tube->res_com);
+               tube->res_com = NULL;
+       }
+       if(tube->res_list) {
+               struct tube_res_list* np, *p = tube->res_list;
+               tube->res_list = NULL;
+               tube->res_last = NULL;
+               while(p) {
+                       np = p->next;
+                       free(p->buf);
+                       free(p);
+                       p = np;
+               }
+       }
+}
+
 int
 tube_handle_listen(struct comm_point* c, void* arg, int error,
         struct comm_reply* ATTR_UNUSED(reply_info))
 {
        struct tube* tube = (struct tube*)arg;
+       ssize_t r;
        if(error != NETEVENT_NOERROR) {
                fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
-               (*tube->listen_cb)(tube, NULL, error, tube->listen_arg);
+               (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
                return 0;
        }
+
+       if(tube->cmd_read < sizeof(tube->cmd_len)) {
+               /* complete reading the length of control msg */
+               r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
+                       sizeof(tube->cmd_len) - tube->cmd_read);
+               if(r==0) {
+                       /* error has happened or */
+                       /* parent closed pipe, must have exited somehow */
+                       fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
+                       (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 
+                               tube->listen_arg);
+                       return 0;
+               }
+               if(r==-1) {
+                       if(errno != EAGAIN && errno != EINTR) {
+                               log_err("rpipe error: %s", strerror(errno));
+                       }
+                       /* nothing to read now, try later */
+                       return 0;
+               }
+               tube->cmd_read += r;
+               if(tube->cmd_read < sizeof(tube->cmd_len)) {
+                       /* not complete, try later */
+                       return 0;
+               }
+               tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
+               if(!tube->cmd_msg) {
+                       log_err("malloc failure");
+                       tube->cmd_read = 0;
+                       return 0;
+               }
+       }
+       /* cmd_len has been read, read remainder */
+       r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
+               tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
+       if(r==0) {
+               /* error has happened or */
+               /* parent closed pipe, must have exited somehow */
+               fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
+               (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 
+                       tube->listen_arg);
+               return 0;
+       }
+       if(r==-1) {
+               /* nothing to read now, try later */
+               if(errno != EAGAIN && errno != EINTR) {
+                       log_err("rpipe error: %s", strerror(errno));
+               }
+               return 0;
+       }
+       tube->cmd_read += r;
+       if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
+               /* not complete, try later */
+               return 0;
+       }
+       tube->cmd_read = 0;
+
        fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
-       (*tube->listen_cb)(tube, c->buffer, error, tube->listen_arg);
+       (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 
+               NETEVENT_NOERROR, tube->listen_arg);
+               /* also frees the buf */
+       tube->cmd_msg = NULL;
        return 0;
 }
 
-int tube_listen_cmd(struct tube* tube, struct comm_base* base,
-        size_t msg_buffer_sz, tube_callback_t* cb, void* arg)
+int
+tube_handle_write(struct comm_point* c, void* arg, int error,
+        struct comm_reply* ATTR_UNUSED(reply_info))
+{
+       struct tube* tube = (struct tube*)arg;
+       struct tube_res_list* item = tube->res_list;
+       ssize_t r;
+       if(error != NETEVENT_NOERROR) {
+               log_err("tube_handle_write net error %d", error);
+               return 0;
+       }
+
+       if(!item) {
+               comm_point_stop_listening(c);
+               return 0;
+       }
+
+       if(tube->res_write < sizeof(item->len)) {
+               r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
+                       sizeof(item->len) - tube->res_write);
+               if(r == -1) {
+                       if(errno != EAGAIN && errno != EINTR) {
+                               log_err("wpipe error: %s", strerror(errno));
+                       }
+                       return 0; /* try again later */
+               }
+               if(r == 0) {
+                       /* error on pipe, must have exited somehow */
+                       /* cannot signal this to pipe user */
+                       return 0;
+               }
+               tube->res_write += r;
+               if(tube->res_write < sizeof(item->len))
+                       return 0;
+       }
+       r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
+               item->len - (tube->res_write - sizeof(item->len)));
+       if(r == -1) {
+               if(errno != EAGAIN && errno != EINTR) {
+                       log_err("wpipe error: %s", strerror(errno));
+               }
+               return 0; /* try again later */
+       }
+       if(r == 0) {
+               /* error on pipe, must have exited somehow */
+               /* cannot signal this to pipe user */
+               return 0;
+       }
+       tube->res_write += r;
+       if(tube->res_write < sizeof(item->len) + item->len)
+               return 0;
+       /* done this result, remove it */
+       free(item->buf);
+       item->buf = NULL;
+       tube->res_list = tube->res_list->next;
+       free(item);
+       if(!tube->res_list) {
+               tube->res_last = NULL;
+               comm_point_stop_listening(c);
+       }
+       tube->res_write = 0;
+       return 0;
+}
+
+int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 
+        int nonblock)
+{
+       ssize_t r;
+       int fd = tube->sw;
+
+       /* test */
+       if(nonblock) {
+               r = write(fd, &len, sizeof(len));
+               if(r == -1) {
+                       if(errno==EINTR || errno==EAGAIN)
+                               return -1;
+                       log_err("tube msg write failed: %s", strerror(errno));
+                       return -1; /* can still continue, perhaps */
+               }
+       } else r = 0;
+       if(!fd_set_block(fd))
+               return 0;
+       /* write remainder */
+       if(r != (ssize_t)sizeof(len)) {
+               if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
+                       log_err("tube msg write failed: %s", strerror(errno));
+                       (void)fd_set_nonblock(fd);
+                       return 0;
+               }
+       }
+       if(write(fd, buf, len) == -1) {
+               log_err("tube msg write failed: %s", strerror(errno));
+               (void)fd_set_nonblock(fd);
+               return 0;
+       }
+       if(!fd_set_nonblock(fd))
+               return 0;
+       return 1;
+}
+
+int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 
+        int nonblock)
+{
+       ssize_t r;
+       int fd = tube->sr;
+
+       /* test */
+       *len = 0;
+       if(nonblock) {
+               r = read(fd, len, sizeof(*len));
+               if(r == -1) {
+                       if(errno==EINTR || errno==EAGAIN)
+                               return -1;
+                       log_err("tube msg read failed: %s", strerror(errno));
+                       return -1; /* we can still continue, perhaps */
+               }
+               if(r == 0) /* EOF */
+                       return 0;
+       } else r = 0;
+       if(!fd_set_block(fd))
+               return 0;
+       /* read remainder */
+       if(r != (ssize_t)sizeof(*len)) {
+               if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
+                       log_err("tube msg read failed: %s", strerror(errno));
+                       (void)fd_set_nonblock(fd);
+                       return 0;
+               }
+               if(r == 0) /* EOF */ {
+                       (void)fd_set_nonblock(fd);
+                       return 0;
+               }
+       }
+       *buf = (uint8_t*)malloc(*len);
+       if(!*buf) {
+               log_err("tube read out of memory");
+               (void)fd_set_nonblock(fd);
+               return 0;
+       }
+       if((r=read(fd, *buf, *len)) == -1) {
+               log_err("tube msg read failed: %s", strerror(errno));
+               (void)fd_set_nonblock(fd);
+               free(*buf);
+               return 0;
+       }
+       if(r == 0) { /* EOF */
+               (void)fd_set_nonblock(fd);
+               free(*buf);
+               return 0;
+       }
+       if(!fd_set_nonblock(fd)) {
+               free(*buf);
+               return 0;
+       }
+       return 1;
+}
+
+/** perform a select() on the fd */
+static int
+pollit(int fd, struct timeval* t)
+{
+       fd_set r;
+#ifndef S_SPLINT_S
+       FD_ZERO(&r);
+       FD_SET(FD_SET_T fd, &r);
+#endif
+       if(select(fd+1, &r, NULL, NULL, t) == -1) {
+               return 0;
+       }
+       errno = 0;
+       return FD_ISSET(fd, &r);
+}
+
+int tube_poll(struct tube* tube)
+{
+       struct timeval t;
+       memset(&t, 0, sizeof(t));
+       return pollit(tube->sr, &t);
+}
+
+int tube_wait(struct tube* tube)
+{
+       return pollit(tube->sr, NULL);
+}
+
+int tube_read_fd(struct tube* tube)
+{
+       return tube->sr;
+}
+
+int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
+        tube_callback_t* cb, void* arg)
 {
        tube->listen_cb = cb;
        tube->listen_arg = arg;
-       if(!(tube->listen_com = comm_point_create_local(base, tube->sr, 
-               msg_buffer_sz, tube_handle_listen, tube))) {
-               log_err("tube_listen_cmd: commpoint creation failed");
+       if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 
+               0, tube_handle_listen, tube))) {
+               int err = errno;
+               log_err("tube_setup_bg_l: commpoint creation failed");
+               errno = err;
+               return 0;
+       }
+       return 1;
+}
+
+int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
+{
+       if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 
+               1, tube_handle_write, tube))) {
+               int err = errno;
+               log_err("tube_setup_bg_w: commpoint creation failed");
+               errno = err;
                return 0;
        }
        return 1;
 }
 
-int tube_send_cmd(struct tube* tube, ldns_buffer* buffer)
+int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
 {
-       if(!write_socket(tube->sw, ldns_buffer_begin(buffer),
-               ldns_buffer_limit(buffer))) {
-               log_err("write socket: %s", strerror(errno));
+       struct tube_res_list* item = 
+               (struct tube_res_list*)malloc(sizeof(*item));
+       if(!item) {
+               free(msg);
+               log_err("out of memory for async answer");
                return 0;
        }
+       item->buf = msg;
+       item->len = len;
+       item->next = NULL;
+       /* add at back of list, since the first one may be partially written */
+       if(tube->res_last)
+               tube->res_last->next = item;
+       else    tube->res_list = item;
+       tube->res_last = item;
+       if(tube->res_list == tube->res_last) {
+               /* first added item, start the write process */
+               comm_point_start_listening(tube->res_com, -1, -1);
+       }
        return 1;
 }
index fecd1cf330c1ca301cf5b9f1975f5ddfb7577bd4..e3ddc850ca98b934ac864c2ed9d40fafcfa33fa9 100644 (file)
 struct comm_reply;
 struct comm_base;
 struct tube;
+struct tube_res_list;
 
 /**
  * Callback from pipe listen function
- * void mycallback(tube, buffer, error, argument);
- * if error is true (NETEVENT_*), buffer is probably NULL.
+ * void mycallback(tube, msg, len, error, user_argument);
+ * if error is true (NETEVENT_*), msg is probably NULL.
  */
-typedef void tube_callback_t(struct tube*, ldns_buffer*, int, void*);
+typedef void tube_callback_t(struct tube*, uint8_t*, size_t, int, void*);
 
 /**
  * A pipe
@@ -67,6 +68,34 @@ struct tube {
        tube_callback_t* listen_cb;
        /** listen callback user arg */
        void* listen_arg;
+       /** are we currently reading a command, 0 if not, else bytecount */
+       size_t cmd_read;
+       /** size of current read command, may be partially read */
+       uint32_t cmd_len;
+       /** the current read command content, malloced, can be partially read*/
+       uint8_t* cmd_msg;
+
+       /** background write queue, commpoint to write results back */
+       struct comm_point* res_com;
+       /** are we curently writing a result, 0 if not, else bytecount into
+        * the res_list first entry. */
+       size_t res_write;
+       /** list of outstanding results to be written back */
+       struct tube_res_list* res_list;
+       /** last in list */
+       struct tube_res_list* res_last;
+};
+
+/**
+ * List of results (arbitrary command serializations) to write back
+ */
+struct tube_res_list {
+       /** next in list */
+       struct tube_res_list* next;
+       /** serialized buffer to write */
+       uint8_t* buf;
+       /** length to write */
+       uint32_t len;
 };
 
 /**
@@ -82,30 +111,136 @@ struct tube* tube_create(void);
 void tube_delete(struct tube* tube);
 
 /**
- * Start listening for information over the pipe
+ * Write length bytes followed by message.
+ * @param tube: the tube to write on.
+ *     If that tube is a pipe, its write fd is used as
+ *     the socket to write on. Is nonblocking.
+ *      Set to blocking by the function,
+ *      and back to non-blocking at exit of function.
+ * @param buf: the message.
+ * @param len: length of message.
+ * @param nonblock: if set to true, the first write is nonblocking.
+ *      If the first write fails the function returns -1.
+ *      If set false, the first write is blocking.
+ * @return: all remainder writes are nonblocking.
+ *      return 0 on error, in that case blocking/nonblocking of socket is
+ *              unknown.
+ *      return 1 if all OK.
+ */
+int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 
+       int nonblock);
+
+/**
+ * Read length bytes followed by message.
+ * @param tube: The tube to read on.
+ *     If that tube is a pipe, its read fd is used as
+ *     the socket to read on. Is nonblocking.
+ *      Set to blocking by the function,
+ *      and back to non-blocking at exit of function.
+ * @param buf: the message, malloced.
+ * @param len: length of message, returned.
+ * @param nonblock: if set to true, the first read is nonblocking.
+ *      If the first read fails the function returns -1.
+ *      If set false, the first read is blocking.
+ * @return: all remainder reads are nonblocking.
+ *      return 0 on error, in that case blocking/nonblocking of socket is 
+ *              unknown. On EOF 0 is returned.
+ *      return 1 if all OK.
+ */
+int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 
+       int nonblock);
+
+/**
+ * Close read part of the pipe.
+ * The tube can no longer be read from.
+ * @param tube: tube to operate on.
+ */
+void tube_close_read(struct tube* tube);
+
+/**
+ * Close write part of the pipe.
+ * The tube can no longer be written to.
+ * @param tube: tube to operate on.
+ */
+void tube_close_write(struct tube* tube);
+
+/**
+ * See if data is ready for reading on the tube without blocking.
+ * @param tube: tube to check for readable items
+ * @return true if readable items are present. False if not (or error).
+ *     true on pipe_closed.
+ */
+int tube_poll(struct tube* tube);
+
+/**
+ * Wait for data to be ready for reading on the tube. is blocking.
+ * No timeout.
+ * @param tube: the tube to wait on.
+ * @return: if there was something to read (false on error).
+ *     true on pipe_closed.
+ */
+int tube_wait(struct tube* tube);
+
+/**
+ * Get FD that is readable when new information arrives.
+ * @param tube
+ * @return file descriptor.
+ */
+int tube_read_fd(struct tube* tube);
+
+/**
+ * Start listening for information over the pipe.
+ * Background registration of a read listener, callback when read completed.
+ * Do not mix with tube_read_msg style direct reads from the pipe.
  * @param tube: tube to listen on
  * @param base: what base to register event callback.
- * @param msg_buffer_sz: what message buffer size to use.
  * @param cb: callback routine.
  * @param arg: user argument for callback routine.
  * @return true if successful, false on error.
  */
-int tube_listen_cmd(struct tube* tube, struct comm_base* base, 
-       size_t msg_buffer_sz, tube_callback_t* cb, void* arg);
+int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
+       tube_callback_t* cb, void* arg);
 
 /**
- * Send a command over a pipe, blocking operation.
- * @param tube: tube to send the info on.
- * @param buffer: buffer to send. starts with network order uint16 with 
- *     length of remainder of buffer.
- *     The receiver does not receive the length uint16 in the buffer
- *     (the buffer is sized appropriately).
- * @return 0 on error, true on success.
+ * Remove bg listen setup from event base.
+ * @param tube: what tube to cleanup
+ */
+void tube_remove_bg_listen(struct tube* tube);
+
+/**
+ * Start background write handler for the pipe.
+ * Do not mix with tube_write_msg style direct writes to the pipe.
+ * @param tube: tube to write on
+ * @param base: what base to register event handler on.
+ * @return true if successful, false on error.
  */
-int tube_send_cmd(struct tube* tube, ldns_buffer* buffer);
+int tube_setup_bg_write(struct tube* tube, struct comm_base* base);
 
-/** decl for fptr_wlist of tube pipe listen handler */
+/**
+ * Remove bg write setup from event base.
+ * @param tube: what tube to cleanup
+ */
+void tube_remove_bg_write(struct tube* tube);
+
+
+/**
+ * Append data item to background list of writes.
+ * Mallocs a list entry behind the scenes.
+ * Not locked behind the scenes, call from one thread or lock on outside.
+ * @param tube: what tube to queue on.
+ * @param msg: memory message to send. Is free()d after use.
+ *     Put at the end of the to-send queue.
+ * @param len: length of item.
+ * @return 0 on failure (msg freed).
+ */
+int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len);
+
+/** for fptr wlist, callback function */
 int tube_handle_listen(struct comm_point* c, void* arg, int error, 
        struct comm_reply* reply_info);
 
+/** for fptr wlist, callback function */
+int tube_handle_write(struct comm_point* c, void* arg, int error, 
+       struct comm_reply* reply_info);
+
 #endif /* UTIL_TUBE_H */