From 05f9d35f006f72eec2b80b7e737e6b067426a979 Mon Sep 17 00:00:00 2001 From: Wouter Wijngaards Date: Mon, 21 Jan 2008 16:03:59 +0000 Subject: [PATCH] serialize, deserialize, raw commpoints. case preserve note. git-svn-id: file:///svn/unbound/trunk@881 be551aaa-1e26-0410-a405-d3ace91eadb9 --- doc/Changelog | 3 + doc/requirements.txt | 6 ++ libunbound/context.c | 156 ++++++++++++++++++++++++++++++++ libunbound/context.h | 97 ++++++++++++++++++++ libunbound/worker.c | 206 +++++++++++++++++++++++++++++++++++-------- libunbound/worker.h | 62 ++++++++++++- util/fptr_wlist.c | 1 + util/netevent.c | 59 ++++++++++++- util/netevent.h | 26 +++++- 9 files changed, 574 insertions(+), 42 deletions(-) diff --git a/doc/Changelog b/doc/Changelog index 2935af0a1..adfc97c44 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,3 +1,6 @@ +21 January 2008: Wouter + - libworker work, netevent raw commpoints, write_msg, serialize. + 18 January 2008: Wouter - touch up of manpage for libunbound. - support for IP_RECVDSTADDR (for *BSD ip4). diff --git a/doc/requirements.txt b/doc/requirements.txt index c9a3d43bf..49842aacc 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -200,3 +200,9 @@ o If a client makes a query without RD bit, in the case of a returned ascertains that RRSIGs are OK (and not omitted), but does not check NSEC/NSEC3. +o Case preservation + Unbound preserves the casing received from authority servers as best + as possible. It compresses without case, so case can get lost there. + The casing from the authority server is used in preference to the casing + of the query name. This is different from BIND. RFC4343 allows either + behaviour. diff --git a/libunbound/context.c b/libunbound/context.c index dc51b1f82..5c6914427 100644 --- a/libunbound/context.c +++ b/libunbound/context.c @@ -42,6 +42,7 @@ #include "libunbound/context.h" #include "util/module.h" #include "util/config_file.h" +#include "util/net_help.h" #include "services/modstack.h" #include "services/localzone.h" #include "services/cache/rrset.h" @@ -183,3 +184,158 @@ context_release_alloc(struct ub_val_ctx* ctx, struct alloc_cache* alloc) ctx->alloc_list = alloc; lock_basic_unlock(&ctx->cfglock); } + +uint8_t* +context_serialize_new_query(struct ctx_query* q, uint32_t* len) +{ + /* format for new query is + * o uint32 cmd + * o uint32 id + * o uint32 type + * o uint32 class + * o rest queryname (string) + */ + uint8_t* p; + size_t slen = strlen(q->res->qname) + 1/*end of string*/; + *len = sizeof(uint32_t)*4 + slen; + p = (uint8_t*)malloc(*len); + if(!p) return NULL; + ldns_write_uint32(p, UB_LIBCMD_NEWQUERY); + ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum); + ldns_write_uint32(p+2*sizeof(uint32_t), (uint32_t)q->res->qtype); + ldns_write_uint32(p+3*sizeof(uint32_t), (uint32_t)q->res->qclass); + memmove(p+4*sizeof(uint32_t), q->res->qname, slen); + return p; +} + +struct ctx_query* +context_deserialize_new_query(struct ub_val_ctx* ctx, uint8_t* p, uint32_t len) +{ + struct ctx_query* q = (struct ctx_query*)calloc(1, sizeof(*q)); + if(!q) return NULL; + if(len < 4*sizeof(uint32_t)+1) { + free(q); + return NULL; + } + log_assert( ldns_read_uint32(p) == UB_LIBCMD_NEWQUERY); + q->querynum = (int)ldns_read_uint32(p+sizeof(uint32_t)); + q->node.key = &q->querynum; + q->async = 1; + q->res = (struct ub_val_result*)calloc(1, sizeof(*q->res)); + if(!q->res) { + free(q); + return NULL; + } + q->res->qtype = (int)ldns_read_uint32(p+2*sizeof(uint32_t)); + q->res->qclass = (int)ldns_read_uint32(p+3*sizeof(uint32_t)); + q->res->qname = strdup((char*)(p+4*sizeof(uint32_t))); + if(!q->res->qname) { + free(q->res); + free(q); + return NULL; + } + + /** add to query list */ + ctx->num_async++; + (void)rbtree_insert(&ctx->queries, &q->node); + return q; +} + +uint8_t* +context_serialize_answer(struct ctx_query* q, int err, uint32_t* len) +{ + /* answer format + * o uint32 cmd + * o uint32 id + * o uint32 error_code + * o uint32 msg_security + * o the remainder is the answer msg from resolver lookup. + * remainder can be length 0. + */ + uint8_t* p; + *len = sizeof(uint32_t)*4 + q->msg_len; + p = (uint8_t*)malloc(*len); + if(!p) return NULL; + ldns_write_uint32(p, UB_LIBCMD_ANSWER); + ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum); + ldns_write_uint32(p+2*sizeof(uint32_t), (uint32_t)err); + ldns_write_uint32(p+3*sizeof(uint32_t), (uint32_t)q->msg_security); + memmove(p+4*sizeof(uint32_t), q->msg, q->msg_len); + return p; +} + +struct ctx_query* +context_deserialize_answer(struct ub_val_ctx* ctx, + uint8_t* p, uint32_t len, int* err) +{ + struct ctx_query* q = NULL ; + int id; + if(len < 4*sizeof(uint32_t)) return NULL; + log_assert( ldns_read_uint32(p) == UB_LIBCMD_ANSWER); + id = (int)ldns_read_uint32(p+sizeof(uint32_t)); + q = (struct ctx_query*)rbtree_search(&ctx->queries, &id); + if(!q) return NULL; + *err = (int)ldns_read_uint32(p+2*sizeof(uint32_t)); + q->msg_security = ldns_read_uint32(p+3*sizeof(uint32_t)); + if(len > 4*sizeof(uint32_t)) { + q->msg_len = len - 4*sizeof(uint32_t); + q->msg = (uint8_t*)memdup(p+4*sizeof(uint32_t), q->msg_len); + if(!q->msg) { + /* pass malloc failure to the user callback */ + q->msg_len = 0; + *err = UB_NOMEM; + return q; + } + } else { + q->msg_len = 0; + free(q->msg); + q->msg = NULL; + } + return q; +} + +uint8_t* +context_serialize_cancel(struct ctx_query* q, uint32_t* len) +{ + /* format of cancel: + * o uint32 cmd + * o uint32 async-id */ + uint8_t* p = (uint8_t*)malloc(2*sizeof(uint32_t)); + if(!p) return NULL; + *len = 2*sizeof(uint32_t); + ldns_write_uint32(p, UB_LIBCMD_CANCEL); + ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum); + return p; +} + +struct ctx_query* context_deserialize_cancel(struct ub_val_ctx* ctx, + uint8_t* p, uint32_t len) +{ + struct ctx_query* q; + int id; + if(len != 2*sizeof(uint32_t)) return NULL; + log_assert( ldns_read_uint32(p) == UB_LIBCMD_CANCEL); + id = (int)ldns_read_uint32(p+sizeof(uint32_t)); + q = (struct ctx_query*)rbtree_search(&ctx->queries, &id); + return q; +} + +uint8_t* +context_serialize_quit(uint32_t* len) +{ + uint8_t* p = (uint8_t*)malloc(sizeof(uint32_t)); + if(!p) + return NULL; + *len = sizeof(uint32_t); + ldns_write_uint32(p, UB_LIBCMD_QUIT); + return p; +} + +enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len) +{ + uint32_t v; + if((size_t)len < sizeof(v)) + return UB_LIBCMD_QUIT; + v = ldns_read_uint32(p); + return v; +} diff --git a/libunbound/context.h b/libunbound/context.h index 1362ad01e..15c6a39fc 100644 --- a/libunbound/context.h +++ b/libunbound/context.h @@ -130,6 +130,8 @@ struct ctx_query { int querynum; /** was this an async query? */ int async; + /** has this query been cancelled? (for bg thread) */ + int cancelled; /** for async query, the callback function */ ub_val_callback_t cb; @@ -171,6 +173,25 @@ enum ub_ctx_err { UB_INITFAIL = -7 }; +/** + * Command codes for libunbound pipe. + * + * Serialization looks like this: + * o length (of remainder) uint32. + * o uint32 command code. + * o per command format. + */ +enum ub_ctx_cmd { + /** QUIT */ + UB_LIBCMD_QUIT = 0, + /** New query, sent to bg worker */ + UB_LIBCMD_NEWQUERY, + /** Cancel query, sent to bg worker */ + UB_LIBCMD_CANCEL, + /** Query result, originates from bg worker */ + UB_LIBCMD_ANSWER +}; + /** * finalize a context. * @param ctx: context to finalize. creates shared data. @@ -208,4 +229,80 @@ struct alloc_cache* context_obtain_alloc(struct ub_val_ctx* ctx); */ void context_release_alloc(struct ub_val_ctx* ctx, struct alloc_cache* alloc); +/** + * Serialize a context query that questions data. + * This serializes the query name, type, ... + * As well as command code 'new_query'. + * @param q: context query + * @param len: the length of the allocation is returned. + * @return: an alloc, or NULL on mem error. + */ +uint8_t* context_serialize_new_query(struct ctx_query* q, uint32_t* len); + +/** + * Serialize a context_query result to hand back to user. + * This serializes the query name, type, ..., and result. + * As well as command code 'answer'. + * @param q: context query + * @param err: error code to pass to client. + * @param len: the length of the allocation is returned. + * @return: an alloc, or NULL on mem error. + */ +uint8_t* context_serialize_answer(struct ctx_query* q, int err, uint32_t* len); + +/** + * Serialize a query cancellation. Serializes query async id + * as well as command code 'cancel' + * @param q: context query + * @param len: the length of the allocation is returned. + * @return: an alloc, or NULL on mem error. + */ +uint8_t* context_serialize_cancel(struct ctx_query* q, uint32_t* len); + +/** + * Serialize a 'quit' command. + * @param len: the length of the allocation is returned. + * @return: an alloc, or NULL on mem error. + */ +uint8_t* context_serialize_quit(uint32_t* len); + +/** + * Obtain command code from serialized buffer + * @param p: buffer serialized. + * @param len: length of buffer. + * @return command code or QUIT on error. + */ +enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len); + +/** + * Deserialize a new_query buffer. + * @param ctx: context + * @param p: buffer serialized. + * @param len: length of buffer. + * @return new ctx_query or NULL for malloc failure. + */ +struct ctx_query* context_deserialize_new_query(struct ub_val_ctx* ctx, + uint8_t* p, uint32_t len); + +/** + * Deserialize an answer buffer. + * @param ctx: context + * @param p: buffer serialized. + * @param len: length of buffer. + * @param err: error code to be returned to client is passed. + * @return ctx_query with answer added or NULL for malloc failure. + */ +struct ctx_query* context_deserialize_answer(struct ub_val_ctx* ctx, + uint8_t* p, uint32_t len, int* err); + +/** + * Deserialize a cancel buffer. + * @param ctx: context + * @param p: buffer serialized. + * @param len: length of buffer. + * @return ctx_query to cancel or NULL for failure. + */ +struct ctx_query* context_deserialize_cancel(struct ub_val_ctx* ctx, + uint8_t* p, uint32_t len); + #endif /* LIBUNBOUND_CONTEXT_H */ diff --git a/libunbound/worker.c b/libunbound/worker.c index f2a48da93..455a8a38a 100644 --- a/libunbound/worker.c +++ b/libunbound/worker.c @@ -63,6 +63,9 @@ /** size of table used for random numbers. large to be more secure. */ #define RND_STATE_SIZE 256 +/** handle new query command for bg worker */ +static void handle_newq(struct libworker* w, uint8_t* buf, uint32_t len); + /** delete libworker struct */ static void libworker_delete(struct libworker* w) @@ -155,49 +158,87 @@ libworker_setup(struct ub_val_ctx* ctx) return w; } -static int -libworker_handle_control_cmd(struct comm_point* c, void* arg, - int err, struct comm_reply* ATTR_UNUSED(rep)) +/** handle cancel command for bg worker */ +static void +handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len) { - /*struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;*/ - if(err != NETEVENT_NOERROR) { - if(err == NETEVENT_CLOSED) { - /* parent closed pipe, must have exited somehow */ - /* it is of no use to go on, exit */ - exit(0); - } - log_err("internal error: control cmd err %d", err); - exit(0); + struct ctx_query* q = context_deserialize_cancel(w->ctx, buf, len); + if(!q) { + log_err("deserialize cancel failed"); + return; } - return 0; + q->cancelled = 1; } -static int -libworker_handle_result_write(struct comm_point* c, void* arg, - int err, struct comm_reply* ATTR_UNUSED(rep)) +/** handle control command coming into server */ +int +libworker_handle_control_cmd(struct comm_point* c, void* arg, + int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep)) { - /*struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;*/ - if(err != NETEVENT_NOERROR) { - if(err == NETEVENT_CLOSED) { - /* parent closed pipe, must have exited somehow */ - /* it is of no use to go on, exit */ - exit(0); - } - log_err("internal error: pipe comm err %d", err); - exit(0); + struct libworker* w = (struct libworker*)arg; + uint32_t len = 0; + uint8_t* buf = NULL; + int r = libworker_read_msg(c->fd, &buf, &len, 1); + if(r==0) { + /* error has happened or */ + /* parent closed pipe, must have exited somehow */ + /* it is of no use to go on, exit */ + comm_base_exit(w->base); + return 0; + } + if(r==-1) /* nothing to read now, try later */ + return 0; + + switch(context_serial_getcmd(buf, len)) { + default: + case UB_LIBCMD_ANSWER: + log_err("unknown command for bg worker %d", + (int)context_serial_getcmd(buf, len)); + /* and fall through to quit */ + case UB_LIBCMD_QUIT: + comm_base_exit(w->base); + break; + case UB_LIBCMD_NEWQUERY: + handle_newq(w, buf, len); + break; + case UB_LIBCMD_CANCEL: + handle_cancel(w, buf, len); + break; } return 0; } -/** get bufsize for cfg */ -static size_t -getbufsz(struct ub_val_ctx* ctx) +/** handle opportunity to write result back */ +int +libworker_handle_result_write(struct comm_point* c, void* arg, + int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep)) { - size_t s = 65535; - lock_basic_lock(&ctx->cfglock); - s = ctx->env->cfg->msg_buffer_size; - lock_basic_unlock(&ctx->cfglock); - return s; + struct libworker* w = (struct libworker*)arg; + struct libworker_res_list* item = w->res_list; + int r; + if(!item) { + comm_point_stop_listening(c); + return 0; + } + r = libworker_write_msg(c->fd, item->buf, item->len, 1); + if(r == -1) + return 0; /* try again later */ + if(r == 0) { + /* error on pipe, must have exited somehow */ + /* it is of no use to go on, exit */ + comm_base_exit(w->base); + return 0; + } + /* done this result, remove it */ + free(item->buf); + item->buf = NULL; + w->res_list = w->res_list->next; + free(item); + if(!w->res_list) { + w->res_last = NULL; + comm_point_stop_listening(c); + } + return 0; } /** the background thread func */ @@ -207,24 +248,22 @@ libworker_dobg(void* arg) /* setup */ struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg; struct libworker* w = libworker_setup(ctx); - size_t bufsz = getbufsz(ctx); log_thread_set(&w->thread_num); if(!w) { log_err("libunbound bg worker init failed, nomem"); return NULL; } lock_basic_lock(&ctx->qqpipe_lock); - if(!(w->cmd_com=comm_point_create_local(w->base, ctx->qqpipe[0], - bufsz, libworker_handle_control_cmd, w))) { + if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0, + libworker_handle_control_cmd, w))) { lock_basic_unlock(&ctx->qqpipe_lock); log_err("libunbound bg worker init failed, no cmdcom"); return NULL; } lock_basic_unlock(&ctx->qqpipe_lock); lock_basic_lock(&ctx->rrpipe_lock); - /* TODO create writing local commpoint */ - if(!(w->res_com=comm_point_create_local(w->base, ctx->rrpipe[1], - bufsz, libworker_handle_result_write, w))) { + if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1, + libworker_handle_result_write, w))) { lock_basic_unlock(&ctx->qqpipe_lock); log_err("libunbound bg worker init failed, no cmdcom"); return NULL; @@ -441,6 +480,18 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q) return UB_NOERROR; } +/** handle new query command for bg worker */ +static void +handle_newq(struct libworker* w, uint8_t* buf, uint32_t len) +{ + struct ctx_query* q = context_deserialize_new_query(w->ctx, buf, len); + if(!q) { + log_err("failed to deserialize newq"); + return; + } + /* TODO start new query in bg mode */ +} + void libworker_alloc_cleanup(void* arg) { struct libworker* w = (struct libworker*)arg; @@ -547,6 +598,83 @@ libworker_handle_service_reply(struct comm_point* c, void* arg, int error, return 0; } +int +libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock) +{ + ssize_t r; + /* test */ + if(nonblock) { + r = write(fd, &len, sizeof(len)); + if(r == -1) { + if(errno==EINTR || errno==EAGAIN) + return -1; + log_err("msg write failed: %s", strerror(errno)); + return -1; /* can still continue, perhaps */ + } + } else r = 0; + if(!fd_set_block(fd)) + return 0; + /* write remainder */ + if(r != (ssize_t)sizeof(len)) { + if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) { + log_err("msg write failed: %s", strerror(errno)); + return 0; + } + } + if(write(fd, buf, len) == -1) { + log_err("msg write failed: %s", strerror(errno)); + return 0; + } + if(!fd_set_nonblock(fd)) + return 0; + return 1; +} + +int +libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock) +{ + ssize_t r; + + /* test */ + *len = 0; + if(nonblock) { + r = read(fd, len, sizeof(*len)); + if(r == -1) { + if(errno==EINTR || errno==EAGAIN) + return -1; + log_err("msg read failed: %s", strerror(errno)); + return -1; /* we can still continue, perhaps */ + } + if(r == 0) /* EOF */ + return 0; + } else r = 0; + if(!fd_set_block(fd)) + return 0; + /* read remainder */ + if(r != (ssize_t)sizeof(*len)) { + if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) { + log_err("msg read failed: %s", strerror(errno)); + return 0; + } + if(r == 0) /* EOF */ + return 0; + } + *buf = (uint8_t*)malloc(*len); + if(!*buf) { + log_err("out of memory"); + return 0; + } + if((r=read(fd, *buf, *len)) == -1) { + log_err("msg read failed: %s", strerror(errno)); + return 0; + } + if(r == 0) /* EOF */ + return 0; + if(!fd_set_nonblock(fd)) + return 0; + return 1; +} + /* --- fake callbacks for fptr_wlist to work --- */ int worker_handle_control_cmd(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), int ATTR_UNUSED(error), diff --git a/libunbound/worker.h b/libunbound/worker.h index 3382b6dd9..036d71692 100644 --- a/libunbound/worker.h +++ b/libunbound/worker.h @@ -53,6 +53,7 @@ struct outbound_entry; struct module_qstate; struct comm_point; struct comm_reply; +struct libworker_res_list; /** * The library-worker status structure @@ -78,8 +79,13 @@ struct libworker { struct ub_randstate* rndstate; /** commpoint to listen to commands */ struct comm_point* cmd_com; - /** commpoint to write results back (nonblocking) */ + /** commpoint to write results back */ struct comm_point* res_com; + + /** list of outstanding results to be written back */ + struct libworker_res_list* res_list; + /** last in list */ + struct libworker_res_list* res_last; }; /** @@ -92,6 +98,18 @@ struct libworker_fg_data { struct ctx_query* q; }; +/** + * List of results (arbitrary command serializations) to write back + */ +struct libworker_res_list { + /** next in list */ + struct libworker_res_list* next; + /** serialized buffer to write */ + uint8_t* buf; + /** length to write */ + uint32_t len; +}; + /** * Create a background worker * @param ctx: is updated with pid/tid of the background worker. @@ -157,4 +175,46 @@ int libworker_handle_reply(struct comm_point* c, void* arg, int error, int libworker_handle_service_reply(struct comm_point* c, void* arg, int error, struct comm_reply* reply_info); +/** handle control command coming into server */ +int libworker_handle_control_cmd(struct comm_point* c, void* arg, + int err, struct comm_reply* rep); + +/** handle opportunity to write result back */ +int libworker_handle_result_write(struct comm_point* c, void* arg, + int err, struct comm_reply* rep); + +/** + * Write length bytes followed by message. + * @param fd: the socket to write on. Is nonblocking. + * Set to blocking by the function, + * and back to non-blocking at exit of function. + * @param buf: the message. + * @param len: length of message. + * @param nonblock: if set to true, the first write is nonblocking. + * If the first write fails the function returns -1. + * If set false, the first write is blocking. + * @return: all remainder writes are nonblocking. + * return 0 on error, in that case blocking/nonblocking of socket is + * unknown. + * return 1 if all OK. + */ +int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock); + +/** + * Read length bytes followed by message. + * @param fd: the socket to write on. Is nonblocking. + * Set to blocking by the function, + * and back to non-blocking at exit of function. + * @param buf: the message, malloced. + * @param len: length of message, returned. + * @param nonblock: if set to true, the first read is nonblocking. + * If the first read fails the function returns -1. + * If set false, the first read is blocking. + * @return: all remainder reads are nonblocking. + * return 0 on error, in that case blocking/nonblocking of socket is + * unknown. On EOF 0 is returned. + * return 1 if all OK. + */ +int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock); + #endif /* LIBUNBOUND_WORKER_H */ diff --git a/util/fptr_wlist.c b/util/fptr_wlist.c index 83da3be0a..84b33f2c1 100644 --- a/util/fptr_wlist.c +++ b/util/fptr_wlist.c @@ -105,6 +105,7 @@ fptr_whitelist_event(void (*fptr)(int, short, void *)) else if(fptr == &comm_timer_callback) return 1; else if(fptr == &comm_signal_callback) return 1; else if(fptr == &comm_point_local_handle_callback) return 1; + else if(fptr == &comm_point_raw_handle_callback) return 1; return 0; } diff --git a/util/netevent.c b/util/netevent.c index c8921a50a..6a0a3d10e 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -743,6 +743,15 @@ void comm_point_local_handle_callback(int fd, short event, void* arg) log_err("Ignored event %d for localhdl.", event); } +void comm_point_raw_handle_callback(int ATTR_UNUSED(fd), + short ATTR_UNUSED(event), void* arg) +{ + struct comm_point* c = (struct comm_point*)arg; + log_assert(c->type == comm_raw); + + (void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL); +} + struct comm_point* comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer, comm_point_callback_t* callback, void* callback_arg) @@ -1048,7 +1057,55 @@ comm_point_create_local(struct comm_base *base, int fd, size_t bufsize, if(event_base_set(base->eb->base, &c->ev->ev) != 0 || event_add(&c->ev->ev, c->timeout) != 0 ) { - log_err("could not add tcphdl event"); + log_err("could not add localhdl event"); + free(c->ev); + free(c); + return NULL; + } + return c; +} + +struct comm_point* +comm_point_create_raw(struct comm_base* base, int fd, int writing, + comm_point_callback_t* callback, void* callback_arg) +{ + struct comm_point* c = (struct comm_point*)calloc(1, + sizeof(struct comm_point)); + short evbits; + if(!c) + return NULL; + c->ev = (struct internal_event*)calloc(1, + sizeof(struct internal_event)); + if(!c->ev) { + free(c); + return NULL; + } + c->fd = fd; + c->buffer = NULL; + c->timeout = NULL; + c->tcp_is_reading = 0; + c->tcp_byte_count = 0; + c->tcp_parent = NULL; + c->max_tcp_count = 0; + c->tcp_handlers = NULL; + c->tcp_free = NULL; + c->type = comm_raw; + c->tcp_do_close = 0; + c->do_not_close = 1; + c->tcp_do_toggle_rw = 0; + c->tcp_check_nb_connect = 0; + c->callback = callback; + c->cb_arg = callback_arg; + /* libevent stuff */ + if(writing) + evbits = EV_PERSIST | EV_WRITE; + else evbits = EV_PERSIST | EV_READ; + event_set(&c->ev->ev, c->fd, evbits, comm_point_raw_handle_callback, + c); + if(event_base_set(base->eb->base, &c->ev->ev) != 0 || + event_add(&c->ev->ev, c->timeout) != 0 ) + { + log_err("could not add rawhdl event"); free(c->ev); free(c); return NULL; diff --git a/util/netevent.h b/util/netevent.h index 6a5a22f6c..fb77eb9e9 100644 --- a/util/netevent.h +++ b/util/netevent.h @@ -166,7 +166,9 @@ struct comm_point { /** TCP handler socket - handle byteperbyte readwrite. */ comm_tcp, /** AF_UNIX socket - for internal commands. */ - comm_local + comm_local, + /** raw - not DNS format - for pipe readers and writers */ + comm_raw } /** variable with type of socket, UDP,TCP-accept,TCP,pipe */ type; @@ -350,6 +352,19 @@ struct comm_point* comm_point_create_local(struct comm_base* base, int fd, size_t bufsize, comm_point_callback_t* callback, void* callback_arg); +/** + * Create commpoint to listen to a local domain pipe descriptor. + * @param base: in which base to alloc the commpoint. + * @param fd: file descriptor. + * @param writing: true if you want to listen to writes, false for reads. + * @param callback: callback function pointer for the handler. + * @param callback_arg: will be passed to your callback function. + * @return: the commpoint or NULL on error. + */ +struct comm_point* comm_point_create_raw(struct comm_base* base, + int fd, int writing, + comm_point_callback_t* callback, void* callback_arg); + /** * Close a comm point fd. * @param c: comm point to close. @@ -569,5 +584,14 @@ void comm_signal_callback(int fd, short event, void* arg); */ void comm_point_local_handle_callback(int fd, short event, void* arg); +/** + * This routine is published for checks and tests, and is only used internally. + * libevent callback for raw fd access. + * @param fd: file descriptor. + * @param event: event bits from libevent: + * EV_READ, EV_WRITE, EV_SIGNAL, EV_TIMEOUT. + * @param arg: the comm_point structure. + */ +void comm_point_raw_handle_callback(int fd, short event, void* arg); #endif /* NET_EVENT_H */ -- 2.47.2