cfg->outgoing_num_ports * worker->thread_num;
worker->back = outside_network_create(worker->base,
buffer_size, (size_t)cfg->outgoing_num_ports, cfg->ifs,
- cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport);
+ cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport,
+ cfg->do_tcp?cfg->outgoing_num_tcp:0);
if(!worker->back) {
log_err("could not create outgoing sockets");
worker_delete(worker);
+8 May 2007: Wouter
+ - outgoing network keeps list of available tcp buffers for outgoing
+ tcp queries.
+ - outgoing-num-tcp config option.
+ - outgoing network keeps waiting list of queries waiting for buffer.
+ - netevent supports outgoing tcp commpoints, nonblocking connects.
+
7 May 2007: Wouter
- EDNS read from query, used to make reply smaller.
- advertised edns value constants.
# But also takes more system resources (for open sockets).
# outgoing-range: 16
+ # number of outgoing simultaneous tcp buffers to hold per thread.
+ # outgoing-num-tcp: 10
+
# the amount of memory to use for the message cache.
# in bytes. default is 4 Mb
# msg-cache-size: 4194304
query interface. Must be at least 1. Default is 16.
Larger numbers give more protection against spoofing attempts, but need
extra resources from the operating system.
+.It \fBoutgoing-num-tcp:\fR <number>
+Number of outgoing TCP buffers to allocate per thread. Default is 10. If set
+to 0, or if do_tcp is "no", no TCP queries to authoritative servers are done.
.It \fBmsg-cache-size:\fR <number>
Number of bytes size of the message cache. Default is 4 megabytes.
.It \fBmsg-cache-slabs:\fR <number>
}
}
+/** use next free buffer to service a tcp query */
+static void
+outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
+{
+ struct pending_tcp* pend = w->outnet->tcp_free;
+ int s;
+ log_assert(pend);
+ log_assert(pkt);
+ /* open socket */
+#ifndef INET6
+ if(addr_is_ip6(addr))
+ s = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
+ else
+#endif
+ s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if(s == -1) {
+ log_err("outgoing tcp: socket: %s", strerror(errno));
+ log_addr(&w->addr, w->addrlen);
+ (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
+ free(w);
+ return;
+ }
+ fd_set_nonblock(s);
+ if(connect(s, (struct sockaddr*)&w->addr, w->addrlen) == -1) {
+ if(errno != EINPROGRESS) {
+ log_err("outgoing tcp: connect: %s", strerror(errno));
+ log_addr(&w->addr, w->addrlen);
+ close(s);
+ (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
+ free(w);
+ return;
+ }
+ }
+ w->pkt = NULL;
+ w->next_waiting = (void*)pend;
+ memmove(&pend->id, pkt, sizeof(uint16_t));
+ w->outnet->tcp_free = pend->next_free;
+ pend->next_free = NULL;
+ pend->query = w;
+ ldns_buffer_clear(pend->c->buffer);
+ ldns_buffer_write(pend->c->buffer, pkt, w->pkt_len);
+ ldns_buffer_flip(pend->c->buffer);
+ pend->c->tcp_is_reading = 0;
+ pend->c->tcp_byte_count = 0;
+ comm_point_start_listening(pend->c, s, -1);
+ return;
+}
+
+/** see if buffers can be used to service TCP queries. */
+static void
+use_free_buffer(struct outside_network* outnet)
+{
+ struct waiting_tcp* w;
+ while(outnet->tcp_free && outnet->tcp_wait_first) {
+ w = outnet->tcp_wait_first;
+ outnet->tcp_wait_first = w->next_waiting;
+ if(outnet->tcp_wait_last == w)
+ outnet->tcp_wait_last = NULL;
+ outnet_tcp_take_into_use(w, w->pkt);
+ }
+}
+
+/** callback for pending tcp connections */
+static int
+outnet_tcp_cb(struct comm_point* c, void* arg, int error,
+ struct comm_reply *reply_info)
+{
+ struct pending_tcp* pend = (struct pending_tcp*)arg;
+ struct outside_network* outnet = pend->query->outnet;
+ verbose(VERB_ALGO, "outnettcp cb");
+ if(error != NETEVENT_NOERROR) {
+ log_info("outnettcp got tcp error %d", error);
+ /* pass error below and exit */
+ } else {
+ /* check ID */
+ if(ldns_buffer_limit(c->buffer) < sizeof(uint16_t) ||
+ LDNS_ID_WIRE(ldns_buffer_begin(c->buffer))!=pend->id) {
+ log_info("outnettcp: bad ID in reply");
+ log_addr(&pend->query->addr, pend->query->addrlen);
+ error = NETEVENT_CLOSED;
+ }
+ }
+ (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
+ comm_point_close(c);
+ pend->next_free = outnet->tcp_free;
+ outnet->tcp_free = pend;
+ free(pend->query);
+ pend->query = NULL;
+ use_free_buffer(outnet);
+ return 0;
+}
+
/** callback for incoming udp answers from the network. */
static int
outnet_udp_cb(struct comm_point* c, void* arg, int error,
pending_delete(p->outnet, p);
}
+/** create pending_tcp buffers */
+static int
+create_pending_tcp(struct outside_network* outnet, size_t bufsize)
+{
+ size_t i;
+ if(outnet->num_tcp == 0)
+ return 1; /* no tcp needed, nothing to do */
+ if(!(outnet->tcp_conns = (struct pending_tcp **)calloc(
+ outnet->num_tcp, sizeof(struct pending_tcp*))))
+ return 0;
+ for(i=0; i<outnet->num_tcp; i++) {
+ if(!(outnet->tcp_conns[i] = (struct pending_tcp*)calloc(1,
+ sizeof(struct pending_tcp))))
+ return 0;
+ outnet->tcp_conns[i]->next_free = outnet->tcp_free;
+ outnet->tcp_free = outnet->tcp_conns[i];
+ outnet->tcp_conns[i]->c = comm_point_create_tcp_out(
+ bufsize, outnet_tcp_cb, outnet->tcp_conns[i]);
+ if(!outnet->tcp_conns[i]->c)
+ return 0;
+ }
+ return 1;
+}
+
struct outside_network*
outside_network_create(struct comm_base *base, size_t bufsize,
size_t num_ports, char** ifs, int num_ifs, int do_ip4,
- int do_ip6, int port_base)
+ int do_ip6, int port_base, size_t num_tcp)
{
struct outside_network* outnet = (struct outside_network*)
calloc(1, sizeof(struct outside_network));
return NULL;
}
outnet->base = base;
+ outnet->num_tcp = num_tcp;
#ifndef INET6
do_ip6 = 0;
#endif
outnet->num_udp4+1, sizeof(struct comm_point*))) ||
!(outnet->udp6_ports = (struct comm_point **)calloc(
outnet->num_udp6+1, sizeof(struct comm_point*))) ||
- !(outnet->pending = rbtree_create(pending_cmp)) ) {
+ !(outnet->pending = rbtree_create(pending_cmp)) ||
+ !create_pending_tcp(outnet, bufsize)) {
log_err("malloc failed");
outside_network_delete(outnet);
return NULL;
comm_point_delete(outnet->udp6_ports[i]);
free(outnet->udp6_ports);
}
+ if(outnet->tcp_conns) {
+ size_t i;
+ for(i=0; i<outnet->num_tcp; i++)
+ if(outnet->tcp_conns[i]) {
+ comm_point_delete(outnet->tcp_conns[i]->c);
+ free(outnet->tcp_conns[i]);
+ }
+ free(outnet->tcp_conns);
+ }
free(outnet);
}
tv.tv_usec = 0;
comm_timer_set(pend->timer, &tv);
}
+
+/** callback for outgoing TCP timer event */
+static void
+outnet_tcptimer(void* arg)
+{
+ struct waiting_tcp* w = (struct waiting_tcp*)arg;
+ struct outside_network* outnet = w->outnet;
+ if(w->pkt) {
+ /* it is on the waiting list */
+ struct waiting_tcp* p=outnet->tcp_wait_first, *prev=NULL;
+ while(p) {
+ if(p == w) {
+ if(prev) prev->next_waiting = w->next_waiting;
+ else outnet->tcp_wait_first=w->next_waiting;
+ outnet->tcp_wait_last = prev;
+ break;
+ }
+ prev = p;
+ p=p->next_waiting;
+ }
+ } else {
+ /* it was in use */
+ struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
+ comm_point_close(pend->c);
+ pend->query = NULL;
+ pend->next_free = outnet->tcp_free;
+ outnet->tcp_free = pend;
+ }
+ (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_TIMEOUT, NULL);
+ free(w);
+ use_free_buffer(outnet);
+}
+
+void
+pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
+ struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
+ comm_point_callback_t* callback, void* callback_arg,
+ struct ub_randstate* rnd)
+{
+ struct pending_tcp* pend = outnet->tcp_free;
+ struct waiting_tcp* w;
+ struct timeval tv;
+ uint16_t id;
+ /* if no buffer is free allocate space to store query */
+ w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp)
+ + (pend?0:ldns_buffer_limit(packet)));
+ if(!w) {
+ /* callback user for the error */
+ (void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
+ return;
+ }
+ if(!(w->timer = comm_timer_create(outnet->base, outnet_tcptimer, w))) {
+ free(w);
+ (void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
+ return;
+ }
+ w->pkt = NULL;
+ w->pkt_len = ldns_buffer_limit(packet);
+ /* id uses lousy random() TODO use better and entropy */
+ id = ((unsigned)ub_random(rnd)>>8) & 0xffff;
+ LDNS_ID_SET(ldns_buffer_begin(packet), id);
+ memcpy(&w->addr, addr, addrlen);
+ w->addrlen = addrlen;
+ w->outnet = outnet;
+ w->cb = callback;
+ w->cb_arg = callback_arg;
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+ comm_timer_set(w->timer, &tv);
+ if(pend) {
+ /* we have a buffer available right now */
+ outnet_tcp_take_into_use(w, ldns_buffer_begin(packet));
+ } else {
+ /* queue up */
+ w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp);
+ memmove(w->pkt, ldns_buffer_begin(packet), w->pkt_len);
+ w->next_waiting = NULL;
+ if(outnet->tcp_wait_last)
+ outnet->tcp_wait_last->next_waiting = w;
+ else outnet->tcp_wait_first = w;
+ outnet->tcp_wait_last = w;
+ }
+}
struct pending;
struct pending_timeout;
struct ub_randstate;
+struct pending_tcp;
+struct waiting_tcp;
/**
* Send queries to outside servers and wait for answers from servers.
/** number of udp6 ports */
size_t num_udp6;
- /** pending answers. sorted by id, addr */
+ /** pending udp answers. sorted by id, addr */
rbtree_t *pending;
+
+ /**
+ * Array of tcp pending used for outgoing TCP connections.
+ * Each can be used to establish a TCP connection with a server.
+ * The file descriptors are -1 if its free, need to be opened for
+ * the tcp connection. Can be used for ip4 and ip6.
+ */
+ struct pending_tcp **tcp_conns;
+ /** number of tcp communication points. */
+ size_t num_tcp;
+ /** list of tcp comm points that are free for use */
+ struct pending_tcp* tcp_free;
+ /** list of tcp queries waiting for a buffer */
+ struct waiting_tcp* tcp_wait_first;
+ /** last of waiting query list */
+ struct waiting_tcp* tcp_wait_last;
};
/**
struct outside_network* outnet;
};
+/**
+ * Pending TCP query to server.
+ */
+struct pending_tcp {
+ /** next in list of free tcp comm points, or NULL. */
+ struct pending_tcp* next_free;
+ /** the ID for the query; checked in reply */
+ uint16_t id;
+ /** tcp comm point it was sent on (and reply must come back on). */
+ struct comm_point* c;
+ /** the query being serviced, NULL if the pending_tcp is unused. */
+ struct waiting_tcp* query;
+};
+
+/**
+ * Query waiting for TCP buffer.
+ */
+struct waiting_tcp {
+ /**
+ * next in waiting list.
+ * if pkt==0, this points to the pending_tcp structure.
+ */
+ struct waiting_tcp* next_waiting;
+ /** timeout event; timer keeps running whether the query is
+ * waiting for a buffer or the tcp reply is pending */
+ struct comm_timer* timer;
+ /** the outside network it is part of */
+ struct outside_network* outnet;
+ /** remote address. */
+ struct sockaddr_storage addr;
+ /** length of addr field in use. */
+ socklen_t addrlen;
+ /**
+ * The query itself, the query packet to send.
+ * allocated after the waiting_tcp structure.
+ * set to NULL when the query is serviced and it part of pending_tcp.
+ * if this is NULL, the next_waiting points to the pending_tcp.
+ */
+ uint8_t* pkt;
+ /** length of query packet. */
+ size_t pkt_len;
+ /** callback for the timeout, error or reply to the message */
+ comm_point_callback_t* cb;
+ /** callback user argument */
+ void* cb_arg;
+};
+
/**
* Create outside_network structure with N udp ports.
* @param base: the communication base to use for event handling.
* @param do_ip6: service IP6.
* @param port_base: if -1 system assigns ports, otherwise try to get
* the ports numbered from this starting number.
+ * @param num_tcp: number of outgoing tcp buffers to preallocate.
* @return: the new structure (with no pending answers) or NULL on error.
*/
struct outside_network* outside_network_create(struct comm_base* base,
size_t bufsize, size_t num_ports, char** ifs, int num_ifs,
- int do_ip4, int do_ip6, int port_base);
+ int do_ip4, int do_ip6, int port_base, size_t num_tcp);
/**
* Delete outside_network structure.
comm_point_callback_t* callback, void* callback_arg,
struct ub_randstate* rnd);
+/**
+ * Send TCP query. May wait for TCP buffer. Selects ID to be random, and
+ * checks id.
+ * @param outnet: provides the event handling.
+ * @param packet: wireformat query to send to destination. copied from.
+ * @param addr: address to send to.
+ * @param addrlen: length of addr.
+ * @param timeout: in seconds from now.
+ * Timer starts running now. Timer may expire if all buffers are used,
+ * without any query been sent to the server yet.
+ * @param callback: function to call on error, timeout or reply.
+ * The routine does not return an error, instead it calls the callback,
+ * with an error code if an error happens.
+ * @param callback_arg: user argument for callback function.
+ * @param rnd: random state for generating ID.
+ */
+void pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
+ struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
+ comm_point_callback_t* callback, void* callback_arg,
+ struct ub_randstate* rnd);
+
/**
* Delete pending answer.
* @param outnet: outside network the pending query is part of.
outside_network_create(struct comm_base* base, size_t bufsize,
size_t ATTR_UNUSED(num_ports), char** ATTR_UNUSED(ifs),
int ATTR_UNUSED(num_ifs), int ATTR_UNUSED(do_ip4),
- int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base))
+ int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base),
+ size_t ATTR_UNUSED(num_tcp))
{
struct outside_network* outnet = calloc(1,
sizeof(struct outside_network));
cfg->do_tcp = 1;
cfg->outgoing_base_port = cfg->port + 1000;
cfg->outgoing_num_ports = 16;
+ cfg->outgoing_num_tcp = 10;
cfg->msg_cache_size = 4 * 1024 * 1024;
cfg->msg_cache_slabs = 4;
cfg->num_queries_per_thread = 1024;
int outgoing_base_port;
/** outgoing port range number of ports (per thread, per if) */
int outgoing_num_ports;
+ /** number of outgoing tcp buffers per (per thread) */
+ size_t outgoing_num_tcp;
/** size of the message cache */
size_t msg_cache_size;
port{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_PORT;}
outgoing-port{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_PORT;}
outgoing-range{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_RANGE;}
+outgoing-num-tcp{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_NUM_TCP;}
do-ip4{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_IP4;}
do-ip6{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_IP6;}
do-udp{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_UDP;}
%token VAR_FORWARD_TO VAR_FORWARD_TO_PORT VAR_CHROOT
%token VAR_USERNAME VAR_DIRECTORY VAR_LOGFILE VAR_PIDFILE
%token VAR_MSG_CACHE_SIZE VAR_MSG_CACHE_SLABS VAR_NUM_QUERIES_PER_THREAD
-%token VAR_RRSET_CACHE_SIZE VAR_RRSET_CACHE_SLABS
+%token VAR_RRSET_CACHE_SIZE VAR_RRSET_CACHE_SLABS VAR_OUTGOING_NUM_TCP
%%
toplevelvars: /* empty */ | toplevelvars toplevelvar ;
server_username | server_directory | server_logfile | server_pidfile |
server_msg_cache_size | server_msg_cache_slabs |
server_num_queries_per_thread | server_rrset_cache_size |
- server_rrset_cache_slabs
+ server_rrset_cache_slabs | server_outgoing_num_tcp
;
server_num_threads: VAR_NUM_THREADS STRING
{
free($2);
}
;
+server_outgoing_num_tcp: VAR_OUTGOING_NUM_TCP STRING
+ {
+ OUTYY(("P(server_outgoing_num_tcp:%s)\n", $2));
+ if(atoi($2) == 0 && strcmp($2, "0") != 0)
+ yyerror("number expected");
+ else cfg_parser->cfg->outgoing_num_tcp = atoi($2);
+ free($2);
+ }
+ ;
server_do_ip4: VAR_DO_IP4 STRING
{
OUTYY(("P(server_do_ip4:%s)\n", $2));
{
log_assert(c->type == comm_tcp);
comm_point_close(c);
- c->tcp_free = c->tcp_parent->tcp_free;
- c->tcp_parent->tcp_free = c;
- if(!c->tcp_free) {
- /* re-enable listening on accept socket */
- comm_point_start_listening(c->tcp_parent, -1, -1);
+ if(c->tcp_parent) {
+ c->tcp_free = c->tcp_parent->tcp_free;
+ c->tcp_parent->tcp_free = c;
+ if(!c->tcp_free) {
+ /* re-enable listening on accept socket */
+ comm_point_start_listening(c->tcp_parent, -1, -1);
+ }
}
}
c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
comm_point_stop_listening(c);
- /* for listening socket */
- reclaim_tcp_handler(c);
+ if(c->tcp_parent) /* for listening socket */
+ reclaim_tcp_handler(c);
+ else /* its outgoing socket, start listening for reading */
+ comm_point_start_listening(c, -1, -1);
}
/** do the callback when reading is done */
return 1;
}
-/** Handle tcp writing callback.
+/**
+ * Handle tcp writing callback.
* @param fd: file descriptor of socket.
* @param c: comm point to write buffer out of.
* @return: 0 on error
log_assert(c->type == comm_tcp);
if(c->tcp_is_reading)
return 0;
+ if(c->tcp_byte_count == 0 && c->tcp_check_nb_connect) {
+ /* check for pending error from nonblocking connect */
+ /* from Stevens, unix network programming, vol1, 3rd ed, p450*/
+ int error = 0;
+ socklen_t len = (socklen_t)sizeof(error);
+ if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
+ error = errno; /* on solaris errno is error */
+ }
+ if(error == EINPROGRESS || error == EWOULDBLOCK)
+ return 1; /* try again later */
+ if(error != 0) {
+ log_err("tcp connect: %s", strerror(error));
+ return 0;
+ }
+ }
if(c->tcp_byte_count < sizeof(uint16_t)) {
uint16_t len = htons(ldns_buffer_limit(c->buffer));
c->tcp_do_close = 0;
c->do_not_close = 0;
c->tcp_do_toggle_rw = 0;
+ c->tcp_check_nb_connect = 0;
c->callback = callback;
c->cb_arg = callback_arg;
evbits = EV_READ | EV_PERSIST;
c->tcp_do_close = 0;
c->do_not_close = 0;
c->tcp_do_toggle_rw = 1;
+ c->tcp_check_nb_connect = 0;
c->callback = callback;
c->cb_arg = callback_arg;
/* add to parent free list */
c->tcp_do_close = 0;
c->do_not_close = 0;
c->tcp_do_toggle_rw = 0;
+ c->tcp_check_nb_connect = 0;
c->callback = NULL;
c->cb_arg = NULL;
evbits = EV_READ | EV_PERSIST;
return c;
}
+struct comm_point*
+comm_point_create_tcp_out(size_t bufsize,
+ comm_point_callback_t* callback, void* callback_arg)
+{
+ struct comm_point* c = (struct comm_point*)calloc(1,
+ sizeof(struct comm_point));
+ if(!c)
+ return NULL;
+ c->ev = (struct internal_event*)calloc(1,
+ sizeof(struct internal_event));
+ if(!c->ev) {
+ free(c);
+ return NULL;
+ }
+ c->fd = -1;
+ c->buffer = ldns_buffer_new(bufsize);
+ if(!c->buffer) {
+ free(c->ev);
+ free(c);
+ return NULL;
+ }
+ c->timeout = NULL;
+ c->tcp_is_reading = 0;
+ c->tcp_byte_count = 0;
+ c->tcp_parent = NULL;
+ c->max_tcp_count = 0;
+ c->tcp_handlers = NULL;
+ c->tcp_free = NULL;
+ c->type = comm_tcp;
+ c->tcp_do_close = 0;
+ c->do_not_close = 0;
+ c->tcp_do_toggle_rw = 1;
+ c->tcp_check_nb_connect = 1;
+ c->callback = callback;
+ c->cb_arg = callback_arg;
+ return c;
+}
+
struct comm_point*
comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg)
c->tcp_do_close = 0;
c->do_not_close = 1;
c->tcp_do_toggle_rw = 0;
+ c->tcp_check_nb_connect = 0;
c->callback = callback;
c->cb_arg = callback_arg;
/* libevent stuff */
{
if(!c)
return;
- if(event_del(&c->ev->ev) != 0) {
- log_err("could not event_del on close");
- }
+ if(c->fd != -1)
+ if(event_del(&c->ev->ev) != 0) {
+ log_err("could not event_del on close");
+ }
/* close fd after removing from event lists, or epoll.. is messed up */
if(c->fd != -1 && !c->do_not_close)
close(c->fd);
So that when that is done the callback is called. */
int tcp_do_toggle_rw;
+ /** if set, checks for pending error from nonblocking connect() call.*/
+ int tcp_check_nb_connect;
+
/** callback when done.
tcp_accept does not get called back, is NULL then.
If a timeout happens, callback with timeout=1 is called.
int fd, int num, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg);
+/**
+ * Create an outgoing TCP commpoint. No file descriptor is opened, left at -1.
+ * @param bufsize: size of buffer to create for handlers.
+ * @param callback: callback function pointer for the handler.
+ * @param callback_arg: will be passed to your callback function.
+ * @return: the commpoint or NULL on error.
+ */
+struct comm_point* comm_point_create_tcp_out(size_t bufsize,
+ comm_point_callback_t* callback, void* callback_arg);
+
/**
* Create commpoint to listen to a local domain file descriptor.
* @param base: in which base to alloc the commpoint.