+21 January 2008: Wouter
+ - libworker work, netevent raw commpoints, write_msg, serialize.
+
18 January 2008: Wouter
- touch up of manpage for libunbound.
- support for IP_RECVDSTADDR (for *BSD ip4).
ascertains that RRSIGs are OK (and not omitted), but does not
check NSEC/NSEC3.
+o Case preservation
+ Unbound preserves the casing received from authority servers as best
+ as possible. It compresses without case, so case can get lost there.
+ The casing from the authority server is used in preference to the casing
+ of the query name. This is different from BIND. RFC4343 allows either
+ behaviour.
#include "libunbound/context.h"
#include "util/module.h"
#include "util/config_file.h"
+#include "util/net_help.h"
#include "services/modstack.h"
#include "services/localzone.h"
#include "services/cache/rrset.h"
ctx->alloc_list = alloc;
lock_basic_unlock(&ctx->cfglock);
}
+
+uint8_t*
+context_serialize_new_query(struct ctx_query* q, uint32_t* len)
+{
+ /* format for new query is
+ * o uint32 cmd
+ * o uint32 id
+ * o uint32 type
+ * o uint32 class
+ * o rest queryname (string)
+ */
+ uint8_t* p;
+ size_t slen = strlen(q->res->qname) + 1/*end of string*/;
+ *len = sizeof(uint32_t)*4 + slen;
+ p = (uint8_t*)malloc(*len);
+ if(!p) return NULL;
+ ldns_write_uint32(p, UB_LIBCMD_NEWQUERY);
+ ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
+ ldns_write_uint32(p+2*sizeof(uint32_t), (uint32_t)q->res->qtype);
+ ldns_write_uint32(p+3*sizeof(uint32_t), (uint32_t)q->res->qclass);
+ memmove(p+4*sizeof(uint32_t), q->res->qname, slen);
+ return p;
+}
+
+struct ctx_query*
+context_deserialize_new_query(struct ub_val_ctx* ctx, uint8_t* p, uint32_t len)
+{
+ struct ctx_query* q = (struct ctx_query*)calloc(1, sizeof(*q));
+ if(!q) return NULL;
+ if(len < 4*sizeof(uint32_t)+1) {
+ free(q);
+ return NULL;
+ }
+ log_assert( ldns_read_uint32(p) == UB_LIBCMD_NEWQUERY);
+ q->querynum = (int)ldns_read_uint32(p+sizeof(uint32_t));
+ q->node.key = &q->querynum;
+ q->async = 1;
+ q->res = (struct ub_val_result*)calloc(1, sizeof(*q->res));
+ if(!q->res) {
+ free(q);
+ return NULL;
+ }
+ q->res->qtype = (int)ldns_read_uint32(p+2*sizeof(uint32_t));
+ q->res->qclass = (int)ldns_read_uint32(p+3*sizeof(uint32_t));
+ q->res->qname = strdup((char*)(p+4*sizeof(uint32_t)));
+ if(!q->res->qname) {
+ free(q->res);
+ free(q);
+ return NULL;
+ }
+
+ /** add to query list */
+ ctx->num_async++;
+ (void)rbtree_insert(&ctx->queries, &q->node);
+ return q;
+}
+
+uint8_t*
+context_serialize_answer(struct ctx_query* q, int err, uint32_t* len)
+{
+ /* answer format
+ * o uint32 cmd
+ * o uint32 id
+ * o uint32 error_code
+ * o uint32 msg_security
+ * o the remainder is the answer msg from resolver lookup.
+ * remainder can be length 0.
+ */
+ uint8_t* p;
+ *len = sizeof(uint32_t)*4 + q->msg_len;
+ p = (uint8_t*)malloc(*len);
+ if(!p) return NULL;
+ ldns_write_uint32(p, UB_LIBCMD_ANSWER);
+ ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
+ ldns_write_uint32(p+2*sizeof(uint32_t), (uint32_t)err);
+ ldns_write_uint32(p+3*sizeof(uint32_t), (uint32_t)q->msg_security);
+ memmove(p+4*sizeof(uint32_t), q->msg, q->msg_len);
+ return p;
+}
+
+struct ctx_query*
+context_deserialize_answer(struct ub_val_ctx* ctx,
+ uint8_t* p, uint32_t len, int* err)
+{
+ struct ctx_query* q = NULL ;
+ int id;
+ if(len < 4*sizeof(uint32_t)) return NULL;
+ log_assert( ldns_read_uint32(p) == UB_LIBCMD_ANSWER);
+ id = (int)ldns_read_uint32(p+sizeof(uint32_t));
+ q = (struct ctx_query*)rbtree_search(&ctx->queries, &id);
+ if(!q) return NULL;
+ *err = (int)ldns_read_uint32(p+2*sizeof(uint32_t));
+ q->msg_security = ldns_read_uint32(p+3*sizeof(uint32_t));
+ if(len > 4*sizeof(uint32_t)) {
+ q->msg_len = len - 4*sizeof(uint32_t);
+ q->msg = (uint8_t*)memdup(p+4*sizeof(uint32_t), q->msg_len);
+ if(!q->msg) {
+ /* pass malloc failure to the user callback */
+ q->msg_len = 0;
+ *err = UB_NOMEM;
+ return q;
+ }
+ } else {
+ q->msg_len = 0;
+ free(q->msg);
+ q->msg = NULL;
+ }
+ return q;
+}
+
+uint8_t*
+context_serialize_cancel(struct ctx_query* q, uint32_t* len)
+{
+ /* format of cancel:
+ * o uint32 cmd
+ * o uint32 async-id */
+ uint8_t* p = (uint8_t*)malloc(2*sizeof(uint32_t));
+ if(!p) return NULL;
+ *len = 2*sizeof(uint32_t);
+ ldns_write_uint32(p, UB_LIBCMD_CANCEL);
+ ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
+ return p;
+}
+
+struct ctx_query* context_deserialize_cancel(struct ub_val_ctx* ctx,
+ uint8_t* p, uint32_t len)
+{
+ struct ctx_query* q;
+ int id;
+ if(len != 2*sizeof(uint32_t)) return NULL;
+ log_assert( ldns_read_uint32(p) == UB_LIBCMD_CANCEL);
+ id = (int)ldns_read_uint32(p+sizeof(uint32_t));
+ q = (struct ctx_query*)rbtree_search(&ctx->queries, &id);
+ return q;
+}
+
+uint8_t*
+context_serialize_quit(uint32_t* len)
+{
+ uint8_t* p = (uint8_t*)malloc(sizeof(uint32_t));
+ if(!p)
+ return NULL;
+ *len = sizeof(uint32_t);
+ ldns_write_uint32(p, UB_LIBCMD_QUIT);
+ return p;
+}
+
+enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len)
+{
+ uint32_t v;
+ if((size_t)len < sizeof(v))
+ return UB_LIBCMD_QUIT;
+ v = ldns_read_uint32(p);
+ return v;
+}
int querynum;
/** was this an async query? */
int async;
+ /** has this query been cancelled? (for bg thread) */
+ int cancelled;
/** for async query, the callback function */
ub_val_callback_t cb;
UB_INITFAIL = -7
};
+/**
+ * Command codes for libunbound pipe.
+ *
+ * Serialization looks like this:
+ * o length (of remainder) uint32.
+ * o uint32 command code.
+ * o per command format.
+ */
+enum ub_ctx_cmd {
+ /** QUIT */
+ UB_LIBCMD_QUIT = 0,
+ /** New query, sent to bg worker */
+ UB_LIBCMD_NEWQUERY,
+ /** Cancel query, sent to bg worker */
+ UB_LIBCMD_CANCEL,
+ /** Query result, originates from bg worker */
+ UB_LIBCMD_ANSWER
+};
+
/**
* finalize a context.
* @param ctx: context to finalize. creates shared data.
*/
void context_release_alloc(struct ub_val_ctx* ctx, struct alloc_cache* alloc);
+/**
+ * Serialize a context query that questions data.
+ * This serializes the query name, type, ...
+ * As well as command code 'new_query'.
+ * @param q: context query
+ * @param len: the length of the allocation is returned.
+ * @return: an alloc, or NULL on mem error.
+ */
+uint8_t* context_serialize_new_query(struct ctx_query* q, uint32_t* len);
+
+/**
+ * Serialize a context_query result to hand back to user.
+ * This serializes the query name, type, ..., and result.
+ * As well as command code 'answer'.
+ * @param q: context query
+ * @param err: error code to pass to client.
+ * @param len: the length of the allocation is returned.
+ * @return: an alloc, or NULL on mem error.
+ */
+uint8_t* context_serialize_answer(struct ctx_query* q, int err, uint32_t* len);
+
+/**
+ * Serialize a query cancellation. Serializes query async id
+ * as well as command code 'cancel'
+ * @param q: context query
+ * @param len: the length of the allocation is returned.
+ * @return: an alloc, or NULL on mem error.
+ */
+uint8_t* context_serialize_cancel(struct ctx_query* q, uint32_t* len);
+
+/**
+ * Serialize a 'quit' command.
+ * @param len: the length of the allocation is returned.
+ * @return: an alloc, or NULL on mem error.
+ */
+uint8_t* context_serialize_quit(uint32_t* len);
+
+/**
+ * Obtain command code from serialized buffer
+ * @param p: buffer serialized.
+ * @param len: length of buffer.
+ * @return command code or QUIT on error.
+ */
+enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len);
+
+/**
+ * Deserialize a new_query buffer.
+ * @param ctx: context
+ * @param p: buffer serialized.
+ * @param len: length of buffer.
+ * @return new ctx_query or NULL for malloc failure.
+ */
+struct ctx_query* context_deserialize_new_query(struct ub_val_ctx* ctx,
+ uint8_t* p, uint32_t len);
+
+/**
+ * Deserialize an answer buffer.
+ * @param ctx: context
+ * @param p: buffer serialized.
+ * @param len: length of buffer.
+ * @param err: error code to be returned to client is passed.
+ * @return ctx_query with answer added or NULL for malloc failure.
+ */
+struct ctx_query* context_deserialize_answer(struct ub_val_ctx* ctx,
+ uint8_t* p, uint32_t len, int* err);
+
+/**
+ * Deserialize a cancel buffer.
+ * @param ctx: context
+ * @param p: buffer serialized.
+ * @param len: length of buffer.
+ * @return ctx_query to cancel or NULL for failure.
+ */
+struct ctx_query* context_deserialize_cancel(struct ub_val_ctx* ctx,
+ uint8_t* p, uint32_t len);
+
#endif /* LIBUNBOUND_CONTEXT_H */
/** size of table used for random numbers. large to be more secure. */
#define RND_STATE_SIZE 256
+/** handle new query command for bg worker */
+static void handle_newq(struct libworker* w, uint8_t* buf, uint32_t len);
+
/** delete libworker struct */
static void
libworker_delete(struct libworker* w)
return w;
}
-static int
-libworker_handle_control_cmd(struct comm_point* c, void* arg,
- int err, struct comm_reply* ATTR_UNUSED(rep))
+/** handle cancel command for bg worker */
+static void
+handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len)
{
- /*struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;*/
- if(err != NETEVENT_NOERROR) {
- if(err == NETEVENT_CLOSED) {
- /* parent closed pipe, must have exited somehow */
- /* it is of no use to go on, exit */
- exit(0);
- }
- log_err("internal error: control cmd err %d", err);
- exit(0);
+ struct ctx_query* q = context_deserialize_cancel(w->ctx, buf, len);
+ if(!q) {
+ log_err("deserialize cancel failed");
+ return;
}
- return 0;
+ q->cancelled = 1;
}
-static int
-libworker_handle_result_write(struct comm_point* c, void* arg,
- int err, struct comm_reply* ATTR_UNUSED(rep))
+/** handle control command coming into server */
+int
+libworker_handle_control_cmd(struct comm_point* c, void* arg,
+ int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
{
- /*struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;*/
- if(err != NETEVENT_NOERROR) {
- if(err == NETEVENT_CLOSED) {
- /* parent closed pipe, must have exited somehow */
- /* it is of no use to go on, exit */
- exit(0);
- }
- log_err("internal error: pipe comm err %d", err);
- exit(0);
+ struct libworker* w = (struct libworker*)arg;
+ uint32_t len = 0;
+ uint8_t* buf = NULL;
+ int r = libworker_read_msg(c->fd, &buf, &len, 1);
+ if(r==0) {
+ /* error has happened or */
+ /* parent closed pipe, must have exited somehow */
+ /* it is of no use to go on, exit */
+ comm_base_exit(w->base);
+ return 0;
+ }
+ if(r==-1) /* nothing to read now, try later */
+ return 0;
+
+ switch(context_serial_getcmd(buf, len)) {
+ default:
+ case UB_LIBCMD_ANSWER:
+ log_err("unknown command for bg worker %d",
+ (int)context_serial_getcmd(buf, len));
+ /* and fall through to quit */
+ case UB_LIBCMD_QUIT:
+ comm_base_exit(w->base);
+ break;
+ case UB_LIBCMD_NEWQUERY:
+ handle_newq(w, buf, len);
+ break;
+ case UB_LIBCMD_CANCEL:
+ handle_cancel(w, buf, len);
+ break;
}
return 0;
}
-/** get bufsize for cfg */
-static size_t
-getbufsz(struct ub_val_ctx* ctx)
+/** handle opportunity to write result back */
+int
+libworker_handle_result_write(struct comm_point* c, void* arg,
+ int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
{
- size_t s = 65535;
- lock_basic_lock(&ctx->cfglock);
- s = ctx->env->cfg->msg_buffer_size;
- lock_basic_unlock(&ctx->cfglock);
- return s;
+ struct libworker* w = (struct libworker*)arg;
+ struct libworker_res_list* item = w->res_list;
+ int r;
+ if(!item) {
+ comm_point_stop_listening(c);
+ return 0;
+ }
+ r = libworker_write_msg(c->fd, item->buf, item->len, 1);
+ if(r == -1)
+ return 0; /* try again later */
+ if(r == 0) {
+ /* error on pipe, must have exited somehow */
+ /* it is of no use to go on, exit */
+ comm_base_exit(w->base);
+ return 0;
+ }
+ /* done this result, remove it */
+ free(item->buf);
+ item->buf = NULL;
+ w->res_list = w->res_list->next;
+ free(item);
+ if(!w->res_list) {
+ w->res_last = NULL;
+ comm_point_stop_listening(c);
+ }
+ return 0;
}
/** the background thread func */
/* setup */
struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;
struct libworker* w = libworker_setup(ctx);
- size_t bufsz = getbufsz(ctx);
log_thread_set(&w->thread_num);
if(!w) {
log_err("libunbound bg worker init failed, nomem");
return NULL;
}
lock_basic_lock(&ctx->qqpipe_lock);
- if(!(w->cmd_com=comm_point_create_local(w->base, ctx->qqpipe[0],
- bufsz, libworker_handle_control_cmd, w))) {
+ if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0,
+ libworker_handle_control_cmd, w))) {
lock_basic_unlock(&ctx->qqpipe_lock);
log_err("libunbound bg worker init failed, no cmdcom");
return NULL;
}
lock_basic_unlock(&ctx->qqpipe_lock);
lock_basic_lock(&ctx->rrpipe_lock);
- /* TODO create writing local commpoint */
- if(!(w->res_com=comm_point_create_local(w->base, ctx->rrpipe[1],
- bufsz, libworker_handle_result_write, w))) {
+ if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1,
+ libworker_handle_result_write, w))) {
lock_basic_unlock(&ctx->qqpipe_lock);
log_err("libunbound bg worker init failed, no cmdcom");
return NULL;
return UB_NOERROR;
}
+/** handle new query command for bg worker */
+static void
+handle_newq(struct libworker* w, uint8_t* buf, uint32_t len)
+{
+ struct ctx_query* q = context_deserialize_new_query(w->ctx, buf, len);
+ if(!q) {
+ log_err("failed to deserialize newq");
+ return;
+ }
+ /* TODO start new query in bg mode */
+}
+
void libworker_alloc_cleanup(void* arg)
{
struct libworker* w = (struct libworker*)arg;
return 0;
}
+int
+libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock)
+{
+ ssize_t r;
+ /* test */
+ if(nonblock) {
+ r = write(fd, &len, sizeof(len));
+ if(r == -1) {
+ if(errno==EINTR || errno==EAGAIN)
+ return -1;
+ log_err("msg write failed: %s", strerror(errno));
+ return -1; /* can still continue, perhaps */
+ }
+ } else r = 0;
+ if(!fd_set_block(fd))
+ return 0;
+ /* write remainder */
+ if(r != (ssize_t)sizeof(len)) {
+ if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
+ log_err("msg write failed: %s", strerror(errno));
+ return 0;
+ }
+ }
+ if(write(fd, buf, len) == -1) {
+ log_err("msg write failed: %s", strerror(errno));
+ return 0;
+ }
+ if(!fd_set_nonblock(fd))
+ return 0;
+ return 1;
+}
+
+int
+libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock)
+{
+ ssize_t r;
+
+ /* test */
+ *len = 0;
+ if(nonblock) {
+ r = read(fd, len, sizeof(*len));
+ if(r == -1) {
+ if(errno==EINTR || errno==EAGAIN)
+ return -1;
+ log_err("msg read failed: %s", strerror(errno));
+ return -1; /* we can still continue, perhaps */
+ }
+ if(r == 0) /* EOF */
+ return 0;
+ } else r = 0;
+ if(!fd_set_block(fd))
+ return 0;
+ /* read remainder */
+ if(r != (ssize_t)sizeof(*len)) {
+ if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
+ log_err("msg read failed: %s", strerror(errno));
+ return 0;
+ }
+ if(r == 0) /* EOF */
+ return 0;
+ }
+ *buf = (uint8_t*)malloc(*len);
+ if(!*buf) {
+ log_err("out of memory");
+ return 0;
+ }
+ if((r=read(fd, *buf, *len)) == -1) {
+ log_err("msg read failed: %s", strerror(errno));
+ return 0;
+ }
+ if(r == 0) /* EOF */
+ return 0;
+ if(!fd_set_nonblock(fd))
+ return 0;
+ return 1;
+}
+
/* --- fake callbacks for fptr_wlist to work --- */
int worker_handle_control_cmd(struct comm_point* ATTR_UNUSED(c),
void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
struct module_qstate;
struct comm_point;
struct comm_reply;
+struct libworker_res_list;
/**
* The library-worker status structure
struct ub_randstate* rndstate;
/** commpoint to listen to commands */
struct comm_point* cmd_com;
- /** commpoint to write results back (nonblocking) */
+ /** commpoint to write results back */
struct comm_point* res_com;
+
+ /** list of outstanding results to be written back */
+ struct libworker_res_list* res_list;
+ /** last in list */
+ struct libworker_res_list* res_last;
};
/**
struct ctx_query* q;
};
+/**
+ * List of results (arbitrary command serializations) to write back
+ */
+struct libworker_res_list {
+ /** next in list */
+ struct libworker_res_list* next;
+ /** serialized buffer to write */
+ uint8_t* buf;
+ /** length to write */
+ uint32_t len;
+};
+
/**
* Create a background worker
* @param ctx: is updated with pid/tid of the background worker.
int libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
struct comm_reply* reply_info);
+/** handle control command coming into server */
+int libworker_handle_control_cmd(struct comm_point* c, void* arg,
+ int err, struct comm_reply* rep);
+
+/** handle opportunity to write result back */
+int libworker_handle_result_write(struct comm_point* c, void* arg,
+ int err, struct comm_reply* rep);
+
+/**
+ * Write length bytes followed by message.
+ * @param fd: the socket to write on. Is nonblocking.
+ * Set to blocking by the function,
+ * and back to non-blocking at exit of function.
+ * @param buf: the message.
+ * @param len: length of message.
+ * @param nonblock: if set to true, the first write is nonblocking.
+ * If the first write fails the function returns -1.
+ * If set false, the first write is blocking.
+ * @return: all remainder writes are nonblocking.
+ * return 0 on error, in that case blocking/nonblocking of socket is
+ * unknown.
+ * return 1 if all OK.
+ */
+int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock);
+
+/**
+ * Read length bytes followed by message.
+ * @param fd: the socket to write on. Is nonblocking.
+ * Set to blocking by the function,
+ * and back to non-blocking at exit of function.
+ * @param buf: the message, malloced.
+ * @param len: length of message, returned.
+ * @param nonblock: if set to true, the first read is nonblocking.
+ * If the first read fails the function returns -1.
+ * If set false, the first read is blocking.
+ * @return: all remainder reads are nonblocking.
+ * return 0 on error, in that case blocking/nonblocking of socket is
+ * unknown. On EOF 0 is returned.
+ * return 1 if all OK.
+ */
+int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock);
+
#endif /* LIBUNBOUND_WORKER_H */
else if(fptr == &comm_timer_callback) return 1;
else if(fptr == &comm_signal_callback) return 1;
else if(fptr == &comm_point_local_handle_callback) return 1;
+ else if(fptr == &comm_point_raw_handle_callback) return 1;
return 0;
}
log_err("Ignored event %d for localhdl.", event);
}
+void comm_point_raw_handle_callback(int ATTR_UNUSED(fd),
+ short ATTR_UNUSED(event), void* arg)
+{
+ struct comm_point* c = (struct comm_point*)arg;
+ log_assert(c->type == comm_raw);
+
+ (void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL);
+}
+
struct comm_point*
comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
comm_point_callback_t* callback, void* callback_arg)
if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
event_add(&c->ev->ev, c->timeout) != 0 )
{
- log_err("could not add tcphdl event");
+ log_err("could not add localhdl event");
+ free(c->ev);
+ free(c);
+ return NULL;
+ }
+ return c;
+}
+
+struct comm_point*
+comm_point_create_raw(struct comm_base* base, int fd, int writing,
+ comm_point_callback_t* callback, void* callback_arg)
+{
+ struct comm_point* c = (struct comm_point*)calloc(1,
+ sizeof(struct comm_point));
+ short evbits;
+ if(!c)
+ return NULL;
+ c->ev = (struct internal_event*)calloc(1,
+ sizeof(struct internal_event));
+ if(!c->ev) {
+ free(c);
+ return NULL;
+ }
+ c->fd = fd;
+ c->buffer = NULL;
+ c->timeout = NULL;
+ c->tcp_is_reading = 0;
+ c->tcp_byte_count = 0;
+ c->tcp_parent = NULL;
+ c->max_tcp_count = 0;
+ c->tcp_handlers = NULL;
+ c->tcp_free = NULL;
+ c->type = comm_raw;
+ c->tcp_do_close = 0;
+ c->do_not_close = 1;
+ c->tcp_do_toggle_rw = 0;
+ c->tcp_check_nb_connect = 0;
+ c->callback = callback;
+ c->cb_arg = callback_arg;
+ /* libevent stuff */
+ if(writing)
+ evbits = EV_PERSIST | EV_WRITE;
+ else evbits = EV_PERSIST | EV_READ;
+ event_set(&c->ev->ev, c->fd, evbits, comm_point_raw_handle_callback,
+ c);
+ if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
+ event_add(&c->ev->ev, c->timeout) != 0 )
+ {
+ log_err("could not add rawhdl event");
free(c->ev);
free(c);
return NULL;
/** TCP handler socket - handle byteperbyte readwrite. */
comm_tcp,
/** AF_UNIX socket - for internal commands. */
- comm_local
+ comm_local,
+ /** raw - not DNS format - for pipe readers and writers */
+ comm_raw
}
/** variable with type of socket, UDP,TCP-accept,TCP,pipe */
type;
int fd, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg);
+/**
+ * Create commpoint to listen to a local domain pipe descriptor.
+ * @param base: in which base to alloc the commpoint.
+ * @param fd: file descriptor.
+ * @param writing: true if you want to listen to writes, false for reads.
+ * @param callback: callback function pointer for the handler.
+ * @param callback_arg: will be passed to your callback function.
+ * @return: the commpoint or NULL on error.
+ */
+struct comm_point* comm_point_create_raw(struct comm_base* base,
+ int fd, int writing,
+ comm_point_callback_t* callback, void* callback_arg);
+
/**
* Close a comm point fd.
* @param c: comm point to close.
*/
void comm_point_local_handle_callback(int fd, short event, void* arg);
+/**
+ * This routine is published for checks and tests, and is only used internally.
+ * libevent callback for raw fd access.
+ * @param fd: file descriptor.
+ * @param event: event bits from libevent:
+ * EV_READ, EV_WRITE, EV_SIGNAL, EV_TIMEOUT.
+ * @param arg: the comm_point structure.
+ */
+void comm_point_raw_handle_callback(int fd, short event, void* arg);
#endif /* NET_EVENT_H */