]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
serialize, deserialize, raw commpoints.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Mon, 21 Jan 2008 16:03:59 +0000 (16:03 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Mon, 21 Jan 2008 16:03:59 +0000 (16:03 +0000)
case preserve note.

git-svn-id: file:///svn/unbound/trunk@881 be551aaa-1e26-0410-a405-d3ace91eadb9

doc/Changelog
doc/requirements.txt
libunbound/context.c
libunbound/context.h
libunbound/worker.c
libunbound/worker.h
util/fptr_wlist.c
util/netevent.c
util/netevent.h

index 2935af0a10394daabb15c49c6265e57cc1c32ff6..adfc97c449820dad3cab595f1eefe6039adfcd94 100644 (file)
@@ -1,3 +1,6 @@
+21 January 2008: Wouter
+       - libworker work, netevent raw commpoints, write_msg, serialize.
+
 18 January 2008: Wouter
        - touch up of manpage for libunbound.
        - support for IP_RECVDSTADDR (for *BSD ip4).
index c9a3d43bf996b3137b5b803f400801c2d168146f..49842aacc48ccaad054d28f158ebd9e2df143d04 100644 (file)
@@ -200,3 +200,9 @@ o If a client makes a query without RD bit, in the case of a returned
   ascertains that RRSIGs are OK (and not omitted), but does not
   check NSEC/NSEC3. 
 
+o Case preservation
+  Unbound preserves the casing received from authority servers as best 
+  as possible. It compresses without case, so case can get lost there.
+  The casing from the authority server is used in preference to the casing
+  of the query name. This is different from BIND. RFC4343 allows either 
+  behaviour.
index dc51b1f822e59320f8d5ac2569619b8c33d30182..5c6914427305ad11720ce426dce3878659aa4aaf 100644 (file)
@@ -42,6 +42,7 @@
 #include "libunbound/context.h"
 #include "util/module.h"
 #include "util/config_file.h"
+#include "util/net_help.h"
 #include "services/modstack.h"
 #include "services/localzone.h"
 #include "services/cache/rrset.h"
@@ -183,3 +184,158 @@ context_release_alloc(struct ub_val_ctx* ctx, struct alloc_cache* alloc)
        ctx->alloc_list = alloc;
        lock_basic_unlock(&ctx->cfglock);
 }
+
+uint8_t* 
+context_serialize_new_query(struct ctx_query* q, uint32_t* len)
+{
+       /* format for new query is
+        *      o uint32 cmd
+        *      o uint32 id
+        *      o uint32 type
+        *      o uint32 class
+        *      o rest queryname (string)
+        */
+       uint8_t* p;
+       size_t slen = strlen(q->res->qname) + 1/*end of string*/;
+       *len = sizeof(uint32_t)*4 + slen;
+       p = (uint8_t*)malloc(*len);
+       if(!p) return NULL;
+       ldns_write_uint32(p, UB_LIBCMD_NEWQUERY);
+       ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
+       ldns_write_uint32(p+2*sizeof(uint32_t), (uint32_t)q->res->qtype);
+       ldns_write_uint32(p+3*sizeof(uint32_t), (uint32_t)q->res->qclass);
+       memmove(p+4*sizeof(uint32_t), q->res->qname, slen);
+       return p;
+}
+
+struct ctx_query* 
+context_deserialize_new_query(struct ub_val_ctx* ctx, uint8_t* p, uint32_t len)
+{
+       struct ctx_query* q = (struct ctx_query*)calloc(1, sizeof(*q));
+       if(!q) return NULL;
+       if(len < 4*sizeof(uint32_t)+1) {
+               free(q);
+               return NULL;
+       }
+       log_assert( ldns_read_uint32(p) == UB_LIBCMD_NEWQUERY);
+       q->querynum = (int)ldns_read_uint32(p+sizeof(uint32_t));
+       q->node.key = &q->querynum;
+       q->async = 1;
+       q->res = (struct ub_val_result*)calloc(1, sizeof(*q->res));
+       if(!q->res) {
+               free(q);
+               return NULL;
+       }
+       q->res->qtype = (int)ldns_read_uint32(p+2*sizeof(uint32_t));
+       q->res->qclass = (int)ldns_read_uint32(p+3*sizeof(uint32_t));
+       q->res->qname = strdup((char*)(p+4*sizeof(uint32_t)));
+       if(!q->res->qname) {
+               free(q->res);
+               free(q);
+               return NULL;
+       }
+
+       /** add to query list */
+       ctx->num_async++;
+       (void)rbtree_insert(&ctx->queries, &q->node);
+       return q;
+}
+
+uint8_t* 
+context_serialize_answer(struct ctx_query* q, int err, uint32_t* len)
+{
+       /* answer format
+        *      o uint32 cmd
+        *      o uint32 id
+        *      o uint32 error_code
+        *      o uint32 msg_security
+        *      o the remainder is the answer msg from resolver lookup.
+        *        remainder can be length 0.
+        */
+       uint8_t* p;
+       *len = sizeof(uint32_t)*4 + q->msg_len;
+       p = (uint8_t*)malloc(*len);
+       if(!p) return NULL;
+       ldns_write_uint32(p, UB_LIBCMD_ANSWER);
+       ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
+       ldns_write_uint32(p+2*sizeof(uint32_t), (uint32_t)err);
+       ldns_write_uint32(p+3*sizeof(uint32_t), (uint32_t)q->msg_security);
+       memmove(p+4*sizeof(uint32_t), q->msg, q->msg_len);
+       return p;
+}
+
+struct ctx_query* 
+context_deserialize_answer(struct ub_val_ctx* ctx,
+        uint8_t* p, uint32_t len, int* err)
+{
+       struct ctx_query* q = NULL ;
+       int id;
+       if(len < 4*sizeof(uint32_t)) return NULL;
+       log_assert( ldns_read_uint32(p) == UB_LIBCMD_ANSWER);
+       id = (int)ldns_read_uint32(p+sizeof(uint32_t));
+       q = (struct ctx_query*)rbtree_search(&ctx->queries, &id);
+       if(!q) return NULL;
+       *err = (int)ldns_read_uint32(p+2*sizeof(uint32_t));
+       q->msg_security = ldns_read_uint32(p+3*sizeof(uint32_t));
+       if(len > 4*sizeof(uint32_t)) {
+               q->msg_len = len - 4*sizeof(uint32_t);
+               q->msg = (uint8_t*)memdup(p+4*sizeof(uint32_t), q->msg_len);
+               if(!q->msg) {
+                       /* pass malloc failure to the user callback */
+                       q->msg_len = 0;
+                       *err = UB_NOMEM;
+                       return q;
+               }
+       } else {
+               q->msg_len = 0;
+               free(q->msg);
+               q->msg = NULL;
+       }
+       return q;
+}
+
+uint8_t* 
+context_serialize_cancel(struct ctx_query* q, uint32_t* len)
+{
+       /* format of cancel:
+        *      o uint32 cmd
+        *      o uint32 async-id */
+       uint8_t* p = (uint8_t*)malloc(2*sizeof(uint32_t));
+       if(!p) return NULL;
+       *len = 2*sizeof(uint32_t);
+       ldns_write_uint32(p, UB_LIBCMD_CANCEL);
+       ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
+       return p;
+}
+
+struct ctx_query* context_deserialize_cancel(struct ub_val_ctx* ctx,
+        uint8_t* p, uint32_t len)
+{
+       struct ctx_query* q;
+       int id;
+       if(len != 2*sizeof(uint32_t)) return NULL;
+       log_assert( ldns_read_uint32(p) == UB_LIBCMD_CANCEL);
+       id = (int)ldns_read_uint32(p+sizeof(uint32_t));
+       q = (struct ctx_query*)rbtree_search(&ctx->queries, &id);
+       return q;
+}
+
+uint8_t* 
+context_serialize_quit(uint32_t* len)
+{
+       uint8_t* p = (uint8_t*)malloc(sizeof(uint32_t));
+       if(!p)
+               return NULL;
+       *len = sizeof(uint32_t);
+       ldns_write_uint32(p, UB_LIBCMD_QUIT);
+       return p;
+}
+
+enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len)
+{
+       uint32_t v;
+       if((size_t)len < sizeof(v))
+               return UB_LIBCMD_QUIT;
+       v = ldns_read_uint32(p);
+       return v;
+}
index 1362ad01e8af3272c1344a7554936fcbaa94ad45..15c6a39fcb6dbb04f0e93fd9b49bb71b02169250 100644 (file)
@@ -130,6 +130,8 @@ struct ctx_query {
        int querynum;
        /** was this an async query? */
        int async;
+       /** has this query been cancelled? (for bg thread) */
+       int cancelled;
 
        /** for async query, the callback function */
        ub_val_callback_t cb;
@@ -171,6 +173,25 @@ enum ub_ctx_err {
        UB_INITFAIL = -7
 };
 
+/**
+ * Command codes for libunbound pipe.
+ *
+ * Serialization looks like this:
+ *     o length (of remainder) uint32.
+ *     o uint32 command code.
+ *     o per command format.
+ */
+enum ub_ctx_cmd {
+       /** QUIT */
+       UB_LIBCMD_QUIT = 0,
+       /** New query, sent to bg worker */
+       UB_LIBCMD_NEWQUERY,
+       /** Cancel query, sent to bg worker */
+       UB_LIBCMD_CANCEL,
+       /** Query result, originates from bg worker */
+       UB_LIBCMD_ANSWER
+};
+
 /** 
  * finalize a context.
  * @param ctx: context to finalize. creates shared data.
@@ -208,4 +229,80 @@ struct alloc_cache* context_obtain_alloc(struct ub_val_ctx* ctx);
  */
 void context_release_alloc(struct ub_val_ctx* ctx, struct alloc_cache* alloc);
 
+/**
+ * Serialize a context query that questions data.
+ * This serializes the query name, type, ...
+ * As well as command code 'new_query'.
+ * @param q: context query
+ * @param len: the length of the allocation is returned.
+ * @return: an alloc, or NULL on mem error.
+ */
+uint8_t* context_serialize_new_query(struct ctx_query* q, uint32_t* len);
+
+/**
+ * Serialize a context_query result to hand back to user.
+ * This serializes the query name, type, ..., and result.
+ * As well as command code 'answer'.
+ * @param q: context query
+ * @param err: error code to pass to client.
+ * @param len: the length of the allocation is returned.
+ * @return: an alloc, or NULL on mem error.
+ */
+uint8_t* context_serialize_answer(struct ctx_query* q, int err, uint32_t* len);
+
+/**
+ * Serialize a query cancellation. Serializes query async id
+ * as well as command code 'cancel'
+ * @param q: context query
+ * @param len: the length of the allocation is returned.
+ * @return: an alloc, or NULL on mem error.
+ */
+uint8_t* context_serialize_cancel(struct ctx_query* q, uint32_t* len);
+
+/**
+ * Serialize a 'quit' command.
+ * @param len: the length of the allocation is returned.
+ * @return: an alloc, or NULL on mem error.
+ */
+uint8_t* context_serialize_quit(uint32_t* len);
+
+/**
+ * Obtain command code from serialized buffer
+ * @param p: buffer serialized.
+ * @param len: length of buffer.
+ * @return command code or QUIT on error.
+ */
+enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len);
+
+/**
+ * Deserialize a new_query buffer.
+ * @param ctx: context
+ * @param p: buffer serialized.
+ * @param len: length of buffer.
+ * @return new ctx_query or NULL for malloc failure.
+ */
+struct ctx_query* context_deserialize_new_query(struct ub_val_ctx* ctx, 
+       uint8_t* p, uint32_t len);
+
+/**
+ * Deserialize an answer buffer.
+ * @param ctx: context
+ * @param p: buffer serialized.
+ * @param len: length of buffer.
+ * @param err: error code to be returned to client is passed.
+ * @return ctx_query with answer added or NULL for malloc failure.
+ */
+struct ctx_query* context_deserialize_answer(struct ub_val_ctx* ctx, 
+       uint8_t* p, uint32_t len, int* err);
+
+/**
+ * Deserialize a cancel buffer.
+ * @param ctx: context
+ * @param p: buffer serialized.
+ * @param len: length of buffer.
+ * @return ctx_query to cancel or NULL for failure.
+ */
+struct ctx_query* context_deserialize_cancel(struct ub_val_ctx* ctx, 
+       uint8_t* p, uint32_t len);
+
 #endif /* LIBUNBOUND_CONTEXT_H */
index f2a48da93430933108923bd25ff08b6ceff8dbd6..455a8a38a05a595e93629022d5927b719eaafb80 100644 (file)
@@ -63,6 +63,9 @@
 /** size of table used for random numbers. large to be more secure. */
 #define RND_STATE_SIZE 256
 
+/** handle new query command for bg worker */
+static void handle_newq(struct libworker* w, uint8_t* buf, uint32_t len);
+
 /** delete libworker struct */
 static void
 libworker_delete(struct libworker* w)
@@ -155,49 +158,87 @@ libworker_setup(struct ub_val_ctx* ctx)
        return w;
 }
 
-static int 
-libworker_handle_control_cmd(struct comm_point* c, void* arg, 
-       int err, struct comm_reply* ATTR_UNUSED(rep))
+/** handle cancel command for bg worker */
+static void
+handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len)
 {
-       /*struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;*/
-       if(err != NETEVENT_NOERROR) {
-               if(err == NETEVENT_CLOSED) {
-                       /* parent closed pipe, must have exited somehow */
-                       /* it is of no use to go on, exit */
-                       exit(0);
-               }
-               log_err("internal error: control cmd err %d", err);
-               exit(0);
+       struct ctx_query* q = context_deserialize_cancel(w->ctx, buf, len);
+       if(!q) {
+               log_err("deserialize cancel failed");
+               return;
        }
-       return 0;
+       q->cancelled = 1;
 }
 
-static int 
-libworker_handle_result_write(struct comm_point* c, void* arg, 
-       int err, struct comm_reply* ATTR_UNUSED(rep))
+/** handle control command coming into server */
+int 
+libworker_handle_control_cmd(struct comm_point* c, void* arg, 
+       int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
 {
-       /*struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;*/
-       if(err != NETEVENT_NOERROR) {
-               if(err == NETEVENT_CLOSED) {
-                       /* parent closed pipe, must have exited somehow */
-                       /* it is of no use to go on, exit */
-                       exit(0);
-               }
-               log_err("internal error: pipe comm err %d", err);
-               exit(0);
+       struct libworker* w = (struct libworker*)arg;
+       uint32_t len = 0;
+       uint8_t* buf = NULL;
+       int r = libworker_read_msg(c->fd, &buf, &len, 1);
+       if(r==0) {
+               /* error has happened or */
+               /* parent closed pipe, must have exited somehow */
+               /* it is of no use to go on, exit */
+               comm_base_exit(w->base);
+               return 0;
+       }
+       if(r==-1) /* nothing to read now, try later */
+               return 0;
+       
+       switch(context_serial_getcmd(buf, len)) {
+               default:
+               case UB_LIBCMD_ANSWER:
+                       log_err("unknown command for bg worker %d", 
+                               (int)context_serial_getcmd(buf, len));
+                       /* and fall through to quit */
+               case UB_LIBCMD_QUIT:
+                       comm_base_exit(w->base);
+                       break;
+               case UB_LIBCMD_NEWQUERY:
+                       handle_newq(w, buf, len);
+                       break;
+               case UB_LIBCMD_CANCEL:
+                       handle_cancel(w, buf, len);
+                       break;
        }
        return 0;
 }
 
-/** get bufsize for cfg */
-static size_t
-getbufsz(struct ub_val_ctx* ctx)
+/** handle opportunity to write result back */
+int 
+libworker_handle_result_write(struct comm_point* c, void* arg, 
+       int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
 {
-       size_t s = 65535;
-       lock_basic_lock(&ctx->cfglock);
-       s = ctx->env->cfg->msg_buffer_size;
-       lock_basic_unlock(&ctx->cfglock);
-       return s;
+       struct libworker* w = (struct libworker*)arg;
+       struct libworker_res_list* item = w->res_list;
+       int r;
+       if(!item) {
+               comm_point_stop_listening(c);
+               return 0;
+       }
+       r = libworker_write_msg(c->fd, item->buf, item->len, 1);
+       if(r == -1)
+               return 0; /* try again later */
+       if(r == 0) {
+               /* error on pipe, must have exited somehow */
+               /* it is of no use to go on, exit */
+               comm_base_exit(w->base);
+               return 0;
+       }
+       /* done this result, remove it */
+       free(item->buf);
+       item->buf = NULL;
+       w->res_list = w->res_list->next;
+       free(item);
+       if(!w->res_list) {
+               w->res_last = NULL;
+               comm_point_stop_listening(c);
+       }
+       return 0;
 }
 
 /** the background thread func */
@@ -207,24 +248,22 @@ libworker_dobg(void* arg)
        /* setup */
        struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;
        struct libworker* w = libworker_setup(ctx);
-       size_t bufsz = getbufsz(ctx);
        log_thread_set(&w->thread_num);
        if(!w) {
                log_err("libunbound bg worker init failed, nomem");
                return NULL;
        }
        lock_basic_lock(&ctx->qqpipe_lock);
-       if(!(w->cmd_com=comm_point_create_local(w->base, ctx->qqpipe[0]
-               bufsz, libworker_handle_control_cmd, w))) {
+       if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0
+               libworker_handle_control_cmd, w))) {
                lock_basic_unlock(&ctx->qqpipe_lock);
                log_err("libunbound bg worker init failed, no cmdcom");
                return NULL;
        }
        lock_basic_unlock(&ctx->qqpipe_lock);
        lock_basic_lock(&ctx->rrpipe_lock);
-       /* TODO create writing local commpoint */
-       if(!(w->res_com=comm_point_create_local(w->base, ctx->rrpipe[1], 
-               bufsz, libworker_handle_result_write, w))) {
+       if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1,
+               libworker_handle_result_write, w))) {
                lock_basic_unlock(&ctx->qqpipe_lock);
                log_err("libunbound bg worker init failed, no cmdcom");
                return NULL;
@@ -441,6 +480,18 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q)
        return UB_NOERROR;
 }
 
+/** handle new query command for bg worker */
+static void
+handle_newq(struct libworker* w, uint8_t* buf, uint32_t len)
+{
+       struct ctx_query* q = context_deserialize_new_query(w->ctx, buf, len);
+       if(!q) {
+               log_err("failed to deserialize newq");
+               return;
+       }
+       /* TODO start new query in bg mode */
+}
+
 void libworker_alloc_cleanup(void* arg)
 {
        struct libworker* w = (struct libworker*)arg;
@@ -547,6 +598,83 @@ libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
        return 0;
 }
 
+int 
+libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock)
+{
+       ssize_t r;
+       /* test */
+       if(nonblock) {
+               r = write(fd, &len, sizeof(len));
+               if(r == -1) {
+                       if(errno==EINTR || errno==EAGAIN)
+                               return -1;
+                       log_err("msg write failed: %s", strerror(errno));
+                       return -1; /* can still continue, perhaps */
+               }
+       } else r = 0;
+       if(!fd_set_block(fd))
+               return 0;
+       /* write remainder */
+       if(r != (ssize_t)sizeof(len)) {
+               if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
+                       log_err("msg write failed: %s", strerror(errno));
+                       return 0;
+               }
+       }
+       if(write(fd, buf, len) == -1) {
+               log_err("msg write failed: %s", strerror(errno));
+               return 0;
+       }
+       if(!fd_set_nonblock(fd))
+               return 0;
+       return 1;
+}
+
+int 
+libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock)
+{
+       ssize_t r;
+
+       /* test */
+       *len = 0;
+       if(nonblock) {
+               r = read(fd, len, sizeof(*len));
+               if(r == -1) {
+                       if(errno==EINTR || errno==EAGAIN)
+                               return -1;
+                       log_err("msg read failed: %s", strerror(errno));
+                       return -1; /* we can still continue, perhaps */
+               }
+               if(r == 0) /* EOF */
+                       return 0;
+       } else r = 0;
+       if(!fd_set_block(fd))
+               return 0;
+       /* read remainder */
+       if(r != (ssize_t)sizeof(*len)) {
+               if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
+                       log_err("msg read failed: %s", strerror(errno));
+                       return 0;
+               }
+               if(r == 0) /* EOF */
+                       return 0;
+       }
+       *buf = (uint8_t*)malloc(*len);
+       if(!*buf) {
+               log_err("out of memory");
+               return 0;
+       }
+       if((r=read(fd, *buf, *len)) == -1) {
+               log_err("msg read failed: %s", strerror(errno));
+               return 0;
+       }
+       if(r == 0) /* EOF */
+               return 0;
+       if(!fd_set_nonblock(fd))
+               return 0;
+       return 1;
+}
+
 /* --- fake callbacks for fptr_wlist to work --- */
 int worker_handle_control_cmd(struct comm_point* ATTR_UNUSED(c), 
        void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
index 3382b6dd92d7c0d29fd0ba320998fd3e14e713b9..036d7169276670a2757b81221ff892d70a13938d 100644 (file)
@@ -53,6 +53,7 @@ struct outbound_entry;
 struct module_qstate;
 struct comm_point;
 struct comm_reply;
+struct libworker_res_list;
 
 /** 
  * The library-worker status structure
@@ -78,8 +79,13 @@ struct libworker {
        struct ub_randstate* rndstate;
        /** commpoint to listen to commands */
        struct comm_point* cmd_com;
-       /** commpoint to write results back (nonblocking) */
+       /** commpoint to write results back */
        struct comm_point* res_com;
+
+       /** list of outstanding results to be written back */
+       struct libworker_res_list* res_list;
+       /** last in list */
+       struct libworker_res_list* res_last;
 };
 
 /**
@@ -92,6 +98,18 @@ struct libworker_fg_data {
        struct ctx_query* q;
 };
 
+/**
+ * List of results (arbitrary command serializations) to write back
+ */
+struct libworker_res_list {
+       /** next in list */
+       struct libworker_res_list* next;
+       /** serialized buffer to write */
+       uint8_t* buf;
+       /** length to write */
+       uint32_t len;
+};
+
 /**
  * Create a background worker
  * @param ctx: is updated with pid/tid of the background worker.
@@ -157,4 +175,46 @@ int libworker_handle_reply(struct comm_point* c, void* arg, int error,
 int libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
         struct comm_reply* reply_info);
 
+/** handle control command coming into server */
+int libworker_handle_control_cmd(struct comm_point* c, void* arg, 
+       int err, struct comm_reply* rep);
+
+/** handle opportunity to write result back */
+int libworker_handle_result_write(struct comm_point* c, void* arg, 
+       int err, struct comm_reply* rep);
+
+/**
+ * Write length bytes followed by message.
+ * @param fd: the socket to write on. Is nonblocking.
+ *     Set to blocking by the function,
+ *     and back to non-blocking at exit of function.
+ * @param buf: the message.
+ * @param len: length of message.
+ * @param nonblock: if set to true, the first write is nonblocking.
+ *     If the first write fails the function returns -1.
+ *     If set false, the first write is blocking.
+ * @return: all remainder writes are nonblocking.
+ *     return 0 on error, in that case blocking/nonblocking of socket is
+ *             unknown.
+ *     return 1 if all OK.
+ */
+int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock);
+
+/**
+ * Read length bytes followed by message.
+ * @param fd: the socket to write on. Is nonblocking.
+ *     Set to blocking by the function,
+ *     and back to non-blocking at exit of function.
+ * @param buf: the message, malloced.
+ * @param len: length of message, returned.
+ * @param nonblock: if set to true, the first read is nonblocking.
+ *     If the first read fails the function returns -1.
+ *     If set false, the first read is blocking.
+ * @return: all remainder reads are nonblocking.
+ *     return 0 on error, in that case blocking/nonblocking of socket is 
+ *             unknown. On EOF 0 is returned.
+ *     return 1 if all OK.
+ */
+int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock);
+
 #endif /* LIBUNBOUND_WORKER_H */
index 83da3be0a32ece69e3e33ae0a461e361d6c744a3..84b33f2c16e7a00388d68e7aa174a5e3e4e0f0e3 100644 (file)
@@ -105,6 +105,7 @@ fptr_whitelist_event(void (*fptr)(int, short, void *))
        else if(fptr == &comm_timer_callback) return 1;
        else if(fptr == &comm_signal_callback) return 1;
        else if(fptr == &comm_point_local_handle_callback) return 1;
+       else if(fptr == &comm_point_raw_handle_callback) return 1;
        return 0;
 }
 
index c8921a50ad76e7e9cedb979aae8bee6a08e8c6f7..6a0a3d10e4695ede1ecd04eab2f2fd231c924be8 100644 (file)
@@ -743,6 +743,15 @@ void comm_point_local_handle_callback(int fd, short event, void* arg)
        log_err("Ignored event %d for localhdl.", event);
 }
 
+void comm_point_raw_handle_callback(int ATTR_UNUSED(fd), 
+       short ATTR_UNUSED(event), void* arg)
+{
+       struct comm_point* c = (struct comm_point*)arg;
+       log_assert(c->type == comm_raw);
+
+       (void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL);
+}
+
 struct comm_point* 
 comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
        comm_point_callback_t* callback, void* callback_arg)
@@ -1048,7 +1057,55 @@ comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
        if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
                event_add(&c->ev->ev, c->timeout) != 0 )
        {
-               log_err("could not add tcphdl event");
+               log_err("could not add localhdl event");
+               free(c->ev);
+               free(c);
+               return NULL;
+       }
+       return c;
+}
+
+struct comm_point* 
+comm_point_create_raw(struct comm_base* base, int fd, int writing, 
+       comm_point_callback_t* callback, void* callback_arg)
+{
+       struct comm_point* c = (struct comm_point*)calloc(1,
+               sizeof(struct comm_point));
+       short evbits;
+       if(!c)
+               return NULL;
+       c->ev = (struct internal_event*)calloc(1,
+               sizeof(struct internal_event));
+       if(!c->ev) {
+               free(c);
+               return NULL;
+       }
+       c->fd = fd;
+       c->buffer = NULL;
+       c->timeout = NULL;
+       c->tcp_is_reading = 0;
+       c->tcp_byte_count = 0;
+       c->tcp_parent = NULL;
+       c->max_tcp_count = 0;
+       c->tcp_handlers = NULL;
+       c->tcp_free = NULL;
+       c->type = comm_raw;
+       c->tcp_do_close = 0;
+       c->do_not_close = 1;
+       c->tcp_do_toggle_rw = 0;
+       c->tcp_check_nb_connect = 0;
+       c->callback = callback;
+       c->cb_arg = callback_arg;
+       /* libevent stuff */
+       if(writing)
+               evbits = EV_PERSIST | EV_WRITE;
+       else    evbits = EV_PERSIST | EV_READ;
+       event_set(&c->ev->ev, c->fd, evbits, comm_point_raw_handle_callback, 
+               c);
+       if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
+               event_add(&c->ev->ev, c->timeout) != 0 )
+       {
+               log_err("could not add rawhdl event");
                free(c->ev);
                free(c);
                return NULL;
index 6a5a22f6c1f8f23f3cac97a5bf235d2f272663a2..fb77eb9e9028f3bd3b53d11dab738b6466a505a4 100644 (file)
@@ -166,7 +166,9 @@ struct comm_point {
                /** TCP handler socket - handle byteperbyte readwrite. */
                comm_tcp,
                /** AF_UNIX socket - for internal commands. */
-               comm_local
+               comm_local,
+               /** raw - not DNS format - for pipe readers and writers */
+               comm_raw
        } 
                /** variable with type of socket, UDP,TCP-accept,TCP,pipe */
                type;
@@ -350,6 +352,19 @@ struct comm_point* comm_point_create_local(struct comm_base* base,
        int fd, size_t bufsize, 
        comm_point_callback_t* callback, void* callback_arg);
 
+/**
+ * Create commpoint to listen to a local domain pipe descriptor.
+ * @param base: in which base to alloc the commpoint.
+ * @param fd: file descriptor.
+ * @param writing: true if you want to listen to writes, false for reads.
+ * @param callback: callback function pointer for the handler.
+ * @param callback_arg: will be passed to your callback function.
+ * @return: the commpoint or NULL on error.
+ */
+struct comm_point* comm_point_create_raw(struct comm_base* base,
+       int fd, int writing, 
+       comm_point_callback_t* callback, void* callback_arg);
+
 /**
  * Close a comm point fd.
  * @param c: comm point to close.
@@ -569,5 +584,14 @@ void comm_signal_callback(int fd, short event, void* arg);
  */
 void comm_point_local_handle_callback(int fd, short event, void* arg);
 
+/**
+ * This routine is published for checks and tests, and is only used internally.
+ * libevent callback for raw fd access.
+ * @param fd: file descriptor.
+ * @param event: event bits from libevent: 
+ *     EV_READ, EV_WRITE, EV_SIGNAL, EV_TIMEOUT.
+ * @param arg: the comm_point structure.
+ */
+void comm_point_raw_handle_callback(int fd, short event, void* arg);
 
 #endif /* NET_EVENT_H */