]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
nonblock bg pipes.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Thu, 31 Jan 2008 10:40:58 +0000 (10:40 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Thu, 31 Jan 2008 10:40:58 +0000 (10:40 +0000)
git-svn-id: file:///svn/unbound/trunk@913 be551aaa-1e26-0410-a405-d3ace91eadb9

doc/Changelog
libunbound/worker.c
libunbound/worker.h

index 63573bee4b7f48b883a28161a3b424a94cec9e28..842985cf0c23de7834b48cd95742df149d726be7 100644 (file)
@@ -1,4 +1,9 @@
-29 January 2008: Wouter
+31 January 2008: Wouter
+       - bg thread/process reads and writes the pipe nonblocking all the time
+         so that even if the pipe is buffered or so, the bg thread does not
+         block, and services both pipes and queries.
+
+30 January 2008: Wouter
        - check trailing / on chrootdir in checkconf.
        - check if root hints and anchor files are in chrootdir.
        - no route to host tcp error is verbosity level 2. 
index 80b17c0f8ae6c12a8c312043460e1874cdd76fa6..632416d67aa3fd360b3a8d395bad55008990e87b 100644 (file)
@@ -78,6 +78,7 @@ libworker_delete(struct libworker* w)
                ub_randfree(w->env->rnd);
                free(w->env);
        }
+       free(w->cmd_msg);
        outside_network_delete(w->back);
        comm_point_delete(w->cmd_com);
        comm_point_delete(w->res_com);
@@ -195,15 +196,70 @@ handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len)
        free(buf);
 }
 
+/** do control command coming into bg server */
+static void
+libworker_do_cmd(struct libworker* w, uint8_t* msg, uint32_t len)
+{
+       switch(context_serial_getcmd(msg, len)) {
+               default:
+               case UB_LIBCMD_ANSWER:
+                       log_err("unknown command for bg worker %d", 
+                               (int)context_serial_getcmd(msg, len));
+                       /* and fall through to quit */
+               case UB_LIBCMD_QUIT:
+                       free(msg);
+                       comm_base_exit(w->base);
+                       break;
+               case UB_LIBCMD_NEWQUERY:
+                       handle_newq(w, msg, len);
+                       break;
+               case UB_LIBCMD_CANCEL:
+                       handle_cancel(w, msg, len);
+                       break;
+       }
+}
+
 /** handle control command coming into server */
 int 
 libworker_handle_control_cmd(struct comm_point* c, void* arg, 
        int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
 {
        struct libworker* w = (struct libworker*)arg;
-       uint32_t len = 0;
-       uint8_t* buf = NULL;
-       int r = libworker_read_msg(c->fd, &buf, &len, 1);
+       ssize_t r;
+
+       if(w->cmd_read < sizeof(w->cmd_len)) {
+               /* complete reading the length of control msg */
+               r = read(c->fd, ((uint8_t*)&w->cmd_len) + w->cmd_read,
+                       sizeof(w->cmd_len) - w->cmd_read);
+               if(r==0) {
+                       /* error has happened or */
+                       /* parent closed pipe, must have exited somehow */
+                       /* it is of no use to go on, exit */
+                       comm_base_exit(w->base);
+                       return 0;
+               }
+               if(r==-1) {
+                       if(errno != EAGAIN && errno != EINTR) {
+                               log_err("rpipe error: %s", strerror(errno));
+                       }
+                       /* nothing to read now, try later */
+                       return 0;
+               }
+               w->cmd_read += r;
+               if(w->cmd_read < sizeof(w->cmd_len)) {
+                       /* not complete, try later */
+                       return 0;
+               }
+               w->cmd_msg = (uint8_t*)calloc(1, w->cmd_len);
+               if(!w->cmd_msg) {
+                       log_err("malloc failure");
+                       w->cmd_read = 0;
+                       return 0;
+               }
+       }
+       /* cmd_len has been read, read remainder */
+       r = read(c->fd, w->cmd_msg + w->cmd_read - sizeof(w->cmd_len),
+               w->cmd_len - (w->cmd_read - sizeof(w->cmd_len)));
        if(r==0) {
                /* error has happened or */
                /* parent closed pipe, must have exited somehow */
@@ -211,26 +267,21 @@ libworker_handle_control_cmd(struct comm_point* c, void* arg,
                comm_base_exit(w->base);
                return 0;
        }
-       if(r==-1) /* nothing to read now, try later */
+       if(r==-1) {
+               /* nothing to read now, try later */
+               if(errno != EAGAIN && errno != EINTR) {
+                       log_err("rpipe error: %s", strerror(errno));
+               }
                return 0;
-       
-       switch(context_serial_getcmd(buf, len)) {
-               default:
-               case UB_LIBCMD_ANSWER:
-                       log_err("unknown command for bg worker %d", 
-                               (int)context_serial_getcmd(buf, len));
-                       /* and fall through to quit */
-               case UB_LIBCMD_QUIT:
-                       free(buf);
-                       comm_base_exit(w->base);
-                       break;
-               case UB_LIBCMD_NEWQUERY:
-                       handle_newq(w, buf, len);
-                       break;
-               case UB_LIBCMD_CANCEL:
-                       handle_cancel(w, buf, len);
-                       break;
        }
+       w->cmd_read += r;
+       if(w->cmd_read < sizeof(w->cmd_len) + w->cmd_len) {
+               /* not complete, try later */
+               return 0;
+       }
+       w->cmd_read = 0;
+       libworker_do_cmd(w, w->cmd_msg, w->cmd_len); /* also frees the buf */
+       w->cmd_msg = NULL;
        return 0;
 }
 
@@ -241,20 +292,47 @@ libworker_handle_result_write(struct comm_point* c, void* arg,
 {
        struct libworker* w = (struct libworker*)arg;
        struct libworker_res_list* item = w->res_list;
-       int r;
+       ssize_t r;
        if(!item) {
                comm_point_stop_listening(c);
                return 0;
        }
-       r = libworker_write_msg(c->fd, item->buf, item->len, 1);
-       if(r == -1)
+       if(w->res_write < sizeof(item->len)) {
+               r = write(c->fd, ((uint8_t*)&item->len) + w->res_write,
+                       sizeof(item->len) - w->res_write);
+               if(r == -1) {
+                       if(errno != EAGAIN && errno != EINTR) {
+                               log_err("wpipe error: %s", strerror(errno));
+                       }
+                       return 0; /* try again later */
+               }
+               if(r == 0) {
+                       /* error on pipe, must have exited somehow */
+                       /* it is of no use to go on, exit */
+                       comm_base_exit(w->base);
+                       return 0;
+               }
+               w->res_write += r;
+               if(w->res_write < sizeof(item->len))
+                       return 0;
+       }
+       r = write(c->fd, item->buf + w->res_write - sizeof(item->len),
+               item->len - (w->res_write - sizeof(item->len)));
+       if(r == -1) {
+               if(errno != EAGAIN && errno != EINTR) {
+                       log_err("wpipe error: %s", strerror(errno));
+               }
                return 0; /* try again later */
+       }
        if(r == 0) {
                /* error on pipe, must have exited somehow */
                /* it is of no use to go on, exit */
                comm_base_exit(w->base);
                return 0;
        }
+       w->res_write += r;
+       if(w->res_write < sizeof(item->len) + item->len)
+               return 0;
        /* done this result, remove it */
        free(item->buf);
        item->buf = NULL;
@@ -264,6 +342,7 @@ libworker_handle_result_write(struct comm_point* c, void* arg,
                w->res_last = NULL;
                comm_point_stop_listening(c);
        }
+       w->res_write = 0;
        return 0;
 }
 
@@ -591,7 +670,7 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt,
        item->buf = msg;
        item->len = len;
        item->next = NULL;
-       /* add at back of list */
+       /* add at back of list, since the first one may be partially written */
        if(w->res_last)
                w->res_last->next = item;
        else    w->res_list = item;
index 275f10463e4c174b54c407837ee0be701fbe8d37..d32b6070941f647c82422b2f2c8d04798592b0e5 100644 (file)
@@ -79,14 +79,23 @@ struct libworker {
        struct comm_base* base;
        /** the backside outside network interface to the auth servers */
        struct outside_network* back;
-
        /** random() table for this worker. */
        struct ub_randstate* rndstate;
+       
        /** commpoint to listen to commands */
        struct comm_point* cmd_com;
+       /** are we currently reading a command, 0 if not, else bytecount */
+       size_t cmd_read;
+       /** size of current read command, may be partially read */
+       uint32_t cmd_len;
+       /** the current read command content, malloced, can be partially read*/
+       uint8_t* cmd_msg;
+
        /** commpoint to write results back */
        struct comm_point* res_com;
-
+       /** are we curently writing a result, 0 if not, else bytecount into
+        * the res_list first entry. */
+       size_t res_write;
        /** list of outstanding results to be written back */
        struct libworker_res_list* res_list;
        /** last in list */