From: Wouter Wijngaards Date: Tue, 8 May 2007 13:25:21 +0000 (+0000) Subject: TCP outgoing services. X-Git-Tag: release-0.3~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6c3c370b2ac39e10e2d665fd5561d0f7c29c8eed;p=thirdparty%2Funbound.git TCP outgoing services. git-svn-id: file:///svn/unbound/trunk@294 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/daemon/worker.c b/daemon/worker.c index e6a1f4718..4e78104f4 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -672,7 +672,8 @@ worker_init(struct worker* worker, struct config_file *cfg, cfg->outgoing_num_ports * worker->thread_num; worker->back = outside_network_create(worker->base, buffer_size, (size_t)cfg->outgoing_num_ports, cfg->ifs, - cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport); + cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport, + cfg->do_tcp?cfg->outgoing_num_tcp:0); if(!worker->back) { log_err("could not create outgoing sockets"); worker_delete(worker); diff --git a/doc/Changelog b/doc/Changelog index a058fadfb..7acb5cf8f 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,3 +1,10 @@ +8 May 2007: Wouter + - outgoing network keeps list of available tcp buffers for outgoing + tcp queries. + - outgoing-num-tcp config option. + - outgoing network keeps waiting list of queries waiting for buffer. + - netevent supports outgoing tcp commpoints, nonblocking connects. + 7 May 2007: Wouter - EDNS read from query, used to make reply smaller. - advertised edns value constants. diff --git a/doc/example.conf b/doc/example.conf index 25bd71e42..8d0094eae 100644 --- a/doc/example.conf +++ b/doc/example.conf @@ -39,6 +39,9 @@ server: # But also takes more system resources (for open sockets). # outgoing-range: 16 + # number of outgoing simultaneous tcp buffers to hold per thread. + # outgoing-num-tcp: 10 + # the amount of memory to use for the message cache. # in bytes. default is 4 Mb # msg-cache-size: 4194304 diff --git a/doc/unbound.conf.5 b/doc/unbound.conf.5 index e6dd79c62..c4a4b2ce0 100644 --- a/doc/unbound.conf.5 +++ b/doc/unbound.conf.5 @@ -63,6 +63,9 @@ Number of ports to open. This number is opened per thread for every outgoing query interface. Must be at least 1. Default is 16. Larger numbers give more protection against spoofing attempts, but need extra resources from the operating system. +.It \fBoutgoing-num-tcp:\fR +Number of outgoing TCP buffers to allocate per thread. Default is 10. If set +to 0, or if do_tcp is "no", no TCP queries to authoritative servers are done. .It \fBmsg-cache-size:\fR Number of bytes size of the message cache. Default is 4 megabytes. .It \fBmsg-cache-slabs:\fR diff --git a/services/outside_network.c b/services/outside_network.c index a4cadeb08..c2a738891 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -109,6 +109,98 @@ pending_cmp(const void* key1, const void* key2) } } +/** use next free buffer to service a tcp query */ +static void +outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt) +{ + struct pending_tcp* pend = w->outnet->tcp_free; + int s; + log_assert(pend); + log_assert(pkt); + /* open socket */ +#ifndef INET6 + if(addr_is_ip6(addr)) + s = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + else +#endif + s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if(s == -1) { + log_err("outgoing tcp: socket: %s", strerror(errno)); + log_addr(&w->addr, w->addrlen); + (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL); + free(w); + return; + } + fd_set_nonblock(s); + if(connect(s, (struct sockaddr*)&w->addr, w->addrlen) == -1) { + if(errno != EINPROGRESS) { + log_err("outgoing tcp: connect: %s", strerror(errno)); + log_addr(&w->addr, w->addrlen); + close(s); + (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL); + free(w); + return; + } + } + w->pkt = NULL; + w->next_waiting = (void*)pend; + memmove(&pend->id, pkt, sizeof(uint16_t)); + w->outnet->tcp_free = pend->next_free; + pend->next_free = NULL; + pend->query = w; + ldns_buffer_clear(pend->c->buffer); + ldns_buffer_write(pend->c->buffer, pkt, w->pkt_len); + ldns_buffer_flip(pend->c->buffer); + pend->c->tcp_is_reading = 0; + pend->c->tcp_byte_count = 0; + comm_point_start_listening(pend->c, s, -1); + return; +} + +/** see if buffers can be used to service TCP queries. */ +static void +use_free_buffer(struct outside_network* outnet) +{ + struct waiting_tcp* w; + while(outnet->tcp_free && outnet->tcp_wait_first) { + w = outnet->tcp_wait_first; + outnet->tcp_wait_first = w->next_waiting; + if(outnet->tcp_wait_last == w) + outnet->tcp_wait_last = NULL; + outnet_tcp_take_into_use(w, w->pkt); + } +} + +/** callback for pending tcp connections */ +static int +outnet_tcp_cb(struct comm_point* c, void* arg, int error, + struct comm_reply *reply_info) +{ + struct pending_tcp* pend = (struct pending_tcp*)arg; + struct outside_network* outnet = pend->query->outnet; + verbose(VERB_ALGO, "outnettcp cb"); + if(error != NETEVENT_NOERROR) { + log_info("outnettcp got tcp error %d", error); + /* pass error below and exit */ + } else { + /* check ID */ + if(ldns_buffer_limit(c->buffer) < sizeof(uint16_t) || + LDNS_ID_WIRE(ldns_buffer_begin(c->buffer))!=pend->id) { + log_info("outnettcp: bad ID in reply"); + log_addr(&pend->query->addr, pend->query->addrlen); + error = NETEVENT_CLOSED; + } + } + (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info); + comm_point_close(c); + pend->next_free = outnet->tcp_free; + outnet->tcp_free = pend; + free(pend->query); + pend->query = NULL; + use_free_buffer(outnet); + return 0; +} + /** callback for incoming udp answers from the network. */ static int outnet_udp_cb(struct comm_point* c, void* arg, int error, @@ -271,10 +363,34 @@ pending_udp_timer_cb(void *arg) pending_delete(p->outnet, p); } +/** create pending_tcp buffers */ +static int +create_pending_tcp(struct outside_network* outnet, size_t bufsize) +{ + size_t i; + if(outnet->num_tcp == 0) + return 1; /* no tcp needed, nothing to do */ + if(!(outnet->tcp_conns = (struct pending_tcp **)calloc( + outnet->num_tcp, sizeof(struct pending_tcp*)))) + return 0; + for(i=0; inum_tcp; i++) { + if(!(outnet->tcp_conns[i] = (struct pending_tcp*)calloc(1, + sizeof(struct pending_tcp)))) + return 0; + outnet->tcp_conns[i]->next_free = outnet->tcp_free; + outnet->tcp_free = outnet->tcp_conns[i]; + outnet->tcp_conns[i]->c = comm_point_create_tcp_out( + bufsize, outnet_tcp_cb, outnet->tcp_conns[i]); + if(!outnet->tcp_conns[i]->c) + return 0; + } + return 1; +} + struct outside_network* outside_network_create(struct comm_base *base, size_t bufsize, size_t num_ports, char** ifs, int num_ifs, int do_ip4, - int do_ip6, int port_base) + int do_ip6, int port_base, size_t num_tcp) { struct outside_network* outnet = (struct outside_network*) calloc(1, sizeof(struct outside_network)); @@ -284,6 +400,7 @@ outside_network_create(struct comm_base *base, size_t bufsize, return NULL; } outnet->base = base; + outnet->num_tcp = num_tcp; #ifndef INET6 do_ip6 = 0; #endif @@ -295,7 +412,8 @@ outside_network_create(struct comm_base *base, size_t bufsize, outnet->num_udp4+1, sizeof(struct comm_point*))) || !(outnet->udp6_ports = (struct comm_point **)calloc( outnet->num_udp6+1, sizeof(struct comm_point*))) || - !(outnet->pending = rbtree_create(pending_cmp)) ) { + !(outnet->pending = rbtree_create(pending_cmp)) || + !create_pending_tcp(outnet, bufsize)) { log_err("malloc failed"); outside_network_delete(outnet); return NULL; @@ -376,6 +494,15 @@ outside_network_delete(struct outside_network* outnet) comm_point_delete(outnet->udp6_ports[i]); free(outnet->udp6_ports); } + if(outnet->tcp_conns) { + size_t i; + for(i=0; inum_tcp; i++) + if(outnet->tcp_conns[i]) { + comm_point_delete(outnet->tcp_conns[i]->c); + free(outnet->tcp_conns[i]); + } + free(outnet->tcp_conns); + } free(outnet); } @@ -531,3 +658,86 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, tv.tv_usec = 0; comm_timer_set(pend->timer, &tv); } + +/** callback for outgoing TCP timer event */ +static void +outnet_tcptimer(void* arg) +{ + struct waiting_tcp* w = (struct waiting_tcp*)arg; + struct outside_network* outnet = w->outnet; + if(w->pkt) { + /* it is on the waiting list */ + struct waiting_tcp* p=outnet->tcp_wait_first, *prev=NULL; + while(p) { + if(p == w) { + if(prev) prev->next_waiting = w->next_waiting; + else outnet->tcp_wait_first=w->next_waiting; + outnet->tcp_wait_last = prev; + break; + } + prev = p; + p=p->next_waiting; + } + } else { + /* it was in use */ + struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting; + comm_point_close(pend->c); + pend->query = NULL; + pend->next_free = outnet->tcp_free; + outnet->tcp_free = pend; + } + (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_TIMEOUT, NULL); + free(w); + use_free_buffer(outnet); +} + +void +pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, + struct sockaddr_storage* addr, socklen_t addrlen, int timeout, + comm_point_callback_t* callback, void* callback_arg, + struct ub_randstate* rnd) +{ + struct pending_tcp* pend = outnet->tcp_free; + struct waiting_tcp* w; + struct timeval tv; + uint16_t id; + /* if no buffer is free allocate space to store query */ + w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp) + + (pend?0:ldns_buffer_limit(packet))); + if(!w) { + /* callback user for the error */ + (void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL); + return; + } + if(!(w->timer = comm_timer_create(outnet->base, outnet_tcptimer, w))) { + free(w); + (void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL); + return; + } + w->pkt = NULL; + w->pkt_len = ldns_buffer_limit(packet); + /* id uses lousy random() TODO use better and entropy */ + id = ((unsigned)ub_random(rnd)>>8) & 0xffff; + LDNS_ID_SET(ldns_buffer_begin(packet), id); + memcpy(&w->addr, addr, addrlen); + w->addrlen = addrlen; + w->outnet = outnet; + w->cb = callback; + w->cb_arg = callback_arg; + tv.tv_sec = timeout; + tv.tv_usec = 0; + comm_timer_set(w->timer, &tv); + if(pend) { + /* we have a buffer available right now */ + outnet_tcp_take_into_use(w, ldns_buffer_begin(packet)); + } else { + /* queue up */ + w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp); + memmove(w->pkt, ldns_buffer_begin(packet), w->pkt_len); + w->next_waiting = NULL; + if(outnet->tcp_wait_last) + outnet->tcp_wait_last->next_waiting = w; + else outnet->tcp_wait_first = w; + outnet->tcp_wait_last = w; + } +} diff --git a/services/outside_network.h b/services/outside_network.h index 5aaecec58..1b15cd625 100644 --- a/services/outside_network.h +++ b/services/outside_network.h @@ -49,6 +49,8 @@ struct pending; struct pending_timeout; struct ub_randstate; +struct pending_tcp; +struct waiting_tcp; /** * Send queries to outside servers and wait for answers from servers. @@ -77,8 +79,24 @@ struct outside_network { /** number of udp6 ports */ size_t num_udp6; - /** pending answers. sorted by id, addr */ + /** pending udp answers. sorted by id, addr */ rbtree_t *pending; + + /** + * Array of tcp pending used for outgoing TCP connections. + * Each can be used to establish a TCP connection with a server. + * The file descriptors are -1 if its free, need to be opened for + * the tcp connection. Can be used for ip4 and ip6. + */ + struct pending_tcp **tcp_conns; + /** number of tcp communication points. */ + size_t num_tcp; + /** list of tcp comm points that are free for use */ + struct pending_tcp* tcp_free; + /** list of tcp queries waiting for a buffer */ + struct waiting_tcp* tcp_wait_first; + /** last of waiting query list */ + struct waiting_tcp* tcp_wait_last; }; /** @@ -105,6 +123,53 @@ struct pending { struct outside_network* outnet; }; +/** + * Pending TCP query to server. + */ +struct pending_tcp { + /** next in list of free tcp comm points, or NULL. */ + struct pending_tcp* next_free; + /** the ID for the query; checked in reply */ + uint16_t id; + /** tcp comm point it was sent on (and reply must come back on). */ + struct comm_point* c; + /** the query being serviced, NULL if the pending_tcp is unused. */ + struct waiting_tcp* query; +}; + +/** + * Query waiting for TCP buffer. + */ +struct waiting_tcp { + /** + * next in waiting list. + * if pkt==0, this points to the pending_tcp structure. + */ + struct waiting_tcp* next_waiting; + /** timeout event; timer keeps running whether the query is + * waiting for a buffer or the tcp reply is pending */ + struct comm_timer* timer; + /** the outside network it is part of */ + struct outside_network* outnet; + /** remote address. */ + struct sockaddr_storage addr; + /** length of addr field in use. */ + socklen_t addrlen; + /** + * The query itself, the query packet to send. + * allocated after the waiting_tcp structure. + * set to NULL when the query is serviced and it part of pending_tcp. + * if this is NULL, the next_waiting points to the pending_tcp. + */ + uint8_t* pkt; + /** length of query packet. */ + size_t pkt_len; + /** callback for the timeout, error or reply to the message */ + comm_point_callback_t* cb; + /** callback user argument */ + void* cb_arg; +}; + /** * Create outside_network structure with N udp ports. * @param base: the communication base to use for event handling. @@ -117,11 +182,12 @@ struct pending { * @param do_ip6: service IP6. * @param port_base: if -1 system assigns ports, otherwise try to get * the ports numbered from this starting number. + * @param num_tcp: number of outgoing tcp buffers to preallocate. * @return: the new structure (with no pending answers) or NULL on error. */ struct outside_network* outside_network_create(struct comm_base* base, size_t bufsize, size_t num_ports, char** ifs, int num_ifs, - int do_ip4, int do_ip6, int port_base); + int do_ip4, int do_ip6, int port_base, size_t num_tcp); /** * Delete outside_network structure. @@ -148,6 +214,27 @@ void pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, comm_point_callback_t* callback, void* callback_arg, struct ub_randstate* rnd); +/** + * Send TCP query. May wait for TCP buffer. Selects ID to be random, and + * checks id. + * @param outnet: provides the event handling. + * @param packet: wireformat query to send to destination. copied from. + * @param addr: address to send to. + * @param addrlen: length of addr. + * @param timeout: in seconds from now. + * Timer starts running now. Timer may expire if all buffers are used, + * without any query been sent to the server yet. + * @param callback: function to call on error, timeout or reply. + * The routine does not return an error, instead it calls the callback, + * with an error code if an error happens. + * @param callback_arg: user argument for callback function. + * @param rnd: random state for generating ID. + */ +void pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, + struct sockaddr_storage* addr, socklen_t addrlen, int timeout, + comm_point_callback_t* callback, void* callback_arg, + struct ub_randstate* rnd); + /** * Delete pending answer. * @param outnet: outside network the pending query is part of. diff --git a/testcode/fake_event.c b/testcode/fake_event.c index 9563c2304..9368bb978 100644 --- a/testcode/fake_event.c +++ b/testcode/fake_event.c @@ -633,7 +633,8 @@ struct outside_network* outside_network_create(struct comm_base* base, size_t bufsize, size_t ATTR_UNUSED(num_ports), char** ATTR_UNUSED(ifs), int ATTR_UNUSED(num_ifs), int ATTR_UNUSED(do_ip4), - int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base)) + int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base), + size_t ATTR_UNUSED(num_tcp)) { struct outside_network* outnet = calloc(1, sizeof(struct outside_network)); diff --git a/util/config_file.c b/util/config_file.c index 113f9abba..0128afb98 100644 --- a/util/config_file.c +++ b/util/config_file.c @@ -77,6 +77,7 @@ config_create() cfg->do_tcp = 1; cfg->outgoing_base_port = cfg->port + 1000; cfg->outgoing_num_ports = 16; + cfg->outgoing_num_tcp = 10; cfg->msg_cache_size = 4 * 1024 * 1024; cfg->msg_cache_slabs = 4; cfg->num_queries_per_thread = 1024; diff --git a/util/config_file.h b/util/config_file.h index f4837de40..47ed20a83 100644 --- a/util/config_file.h +++ b/util/config_file.h @@ -68,6 +68,8 @@ struct config_file { int outgoing_base_port; /** outgoing port range number of ports (per thread, per if) */ int outgoing_num_ports; + /** number of outgoing tcp buffers per (per thread) */ + size_t outgoing_num_tcp; /** size of the message cache */ size_t msg_cache_size; diff --git a/util/configlexer.lex b/util/configlexer.lex index 639ec7e03..f0cf4d80a 100644 --- a/util/configlexer.lex +++ b/util/configlexer.lex @@ -102,6 +102,7 @@ verbosity{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_VERBOSITY;} port{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_PORT;} outgoing-port{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_PORT;} outgoing-range{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_RANGE;} +outgoing-num-tcp{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_NUM_TCP;} do-ip4{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_IP4;} do-ip6{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_IP6;} do-udp{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_UDP;} diff --git a/util/configparser.y b/util/configparser.y index f483ecdac..d57198b86 100644 --- a/util/configparser.y +++ b/util/configparser.y @@ -73,7 +73,7 @@ extern struct config_parser_state* cfg_parser; %token VAR_FORWARD_TO VAR_FORWARD_TO_PORT VAR_CHROOT %token VAR_USERNAME VAR_DIRECTORY VAR_LOGFILE VAR_PIDFILE %token VAR_MSG_CACHE_SIZE VAR_MSG_CACHE_SLABS VAR_NUM_QUERIES_PER_THREAD -%token VAR_RRSET_CACHE_SIZE VAR_RRSET_CACHE_SLABS +%token VAR_RRSET_CACHE_SIZE VAR_RRSET_CACHE_SLABS VAR_OUTGOING_NUM_TCP %% toplevelvars: /* empty */ | toplevelvars toplevelvar ; @@ -97,7 +97,7 @@ content_server: server_num_threads | server_verbosity | server_port | server_username | server_directory | server_logfile | server_pidfile | server_msg_cache_size | server_msg_cache_slabs | server_num_queries_per_thread | server_rrset_cache_size | - server_rrset_cache_slabs + server_rrset_cache_slabs | server_outgoing_num_tcp ; server_num_threads: VAR_NUM_THREADS STRING { @@ -157,6 +157,15 @@ server_outgoing_range: VAR_OUTGOING_RANGE STRING free($2); } ; +server_outgoing_num_tcp: VAR_OUTGOING_NUM_TCP STRING + { + OUTYY(("P(server_outgoing_num_tcp:%s)\n", $2)); + if(atoi($2) == 0 && strcmp($2, "0") != 0) + yyerror("number expected"); + else cfg_parser->cfg->outgoing_num_tcp = atoi($2); + free($2); + } + ; server_do_ip4: VAR_DO_IP4 STRING { OUTYY(("P(server_do_ip4:%s)\n", $2)); diff --git a/util/netevent.c b/util/netevent.c index c80f42f68..184998658 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -318,11 +318,13 @@ reclaim_tcp_handler(struct comm_point* c) { log_assert(c->type == comm_tcp); comm_point_close(c); - c->tcp_free = c->tcp_parent->tcp_free; - c->tcp_parent->tcp_free = c; - if(!c->tcp_free) { - /* re-enable listening on accept socket */ - comm_point_start_listening(c->tcp_parent, -1, -1); + if(c->tcp_parent) { + c->tcp_free = c->tcp_parent->tcp_free; + c->tcp_parent->tcp_free = c; + if(!c->tcp_free) { + /* re-enable listening on accept socket */ + comm_point_start_listening(c->tcp_parent, -1, -1); + } } } @@ -336,8 +338,10 @@ tcp_callback_writer(struct comm_point* c) c->tcp_is_reading = 1; c->tcp_byte_count = 0; comm_point_stop_listening(c); - /* for listening socket */ - reclaim_tcp_handler(c); + if(c->tcp_parent) /* for listening socket */ + reclaim_tcp_handler(c); + else /* its outgoing socket, start listening for reading */ + comm_point_start_listening(c, -1, -1); } /** do the callback when reading is done */ @@ -421,7 +425,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok) return 1; } -/** Handle tcp writing callback. +/** + * Handle tcp writing callback. * @param fd: file descriptor of socket. * @param c: comm point to write buffer out of. * @return: 0 on error @@ -433,6 +438,21 @@ comm_point_tcp_handle_write(int fd, struct comm_point* c) log_assert(c->type == comm_tcp); if(c->tcp_is_reading) return 0; + if(c->tcp_byte_count == 0 && c->tcp_check_nb_connect) { + /* check for pending error from nonblocking connect */ + /* from Stevens, unix network programming, vol1, 3rd ed, p450*/ + int error = 0; + socklen_t len = (socklen_t)sizeof(error); + if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){ + error = errno; /* on solaris errno is error */ + } + if(error == EINPROGRESS || error == EWOULDBLOCK) + return 1; /* try again later */ + if(error != 0) { + log_err("tcp connect: %s", strerror(error)); + return 0; + } + } if(c->tcp_byte_count < sizeof(uint16_t)) { uint16_t len = htons(ldns_buffer_limit(c->buffer)); @@ -553,6 +573,7 @@ comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer, c->tcp_do_close = 0; c->do_not_close = 0; c->tcp_do_toggle_rw = 0; + c->tcp_check_nb_connect = 0; c->callback = callback; c->cb_arg = callback_arg; evbits = EV_READ | EV_PERSIST; @@ -607,6 +628,7 @@ comm_point_create_tcp_handler(struct comm_base *base, c->tcp_do_close = 0; c->do_not_close = 0; c->tcp_do_toggle_rw = 1; + c->tcp_check_nb_connect = 0; c->callback = callback; c->cb_arg = callback_arg; /* add to parent free list */ @@ -662,6 +684,7 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize, c->tcp_do_close = 0; c->do_not_close = 0; c->tcp_do_toggle_rw = 0; + c->tcp_check_nb_connect = 0; c->callback = NULL; c->cb_arg = NULL; evbits = EV_READ | EV_PERSIST; @@ -688,6 +711,44 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize, return c; } +struct comm_point* +comm_point_create_tcp_out(size_t bufsize, + comm_point_callback_t* callback, void* callback_arg) +{ + struct comm_point* c = (struct comm_point*)calloc(1, + sizeof(struct comm_point)); + if(!c) + return NULL; + c->ev = (struct internal_event*)calloc(1, + sizeof(struct internal_event)); + if(!c->ev) { + free(c); + return NULL; + } + c->fd = -1; + c->buffer = ldns_buffer_new(bufsize); + if(!c->buffer) { + free(c->ev); + free(c); + return NULL; + } + c->timeout = NULL; + c->tcp_is_reading = 0; + c->tcp_byte_count = 0; + c->tcp_parent = NULL; + c->max_tcp_count = 0; + c->tcp_handlers = NULL; + c->tcp_free = NULL; + c->type = comm_tcp; + c->tcp_do_close = 0; + c->do_not_close = 0; + c->tcp_do_toggle_rw = 1; + c->tcp_check_nb_connect = 1; + c->callback = callback; + c->cb_arg = callback_arg; + return c; +} + struct comm_point* comm_point_create_local(struct comm_base *base, int fd, size_t bufsize, comm_point_callback_t* callback, void* callback_arg) @@ -721,6 +782,7 @@ comm_point_create_local(struct comm_base *base, int fd, size_t bufsize, c->tcp_do_close = 0; c->do_not_close = 1; c->tcp_do_toggle_rw = 0; + c->tcp_check_nb_connect = 0; c->callback = callback; c->cb_arg = callback_arg; /* libevent stuff */ @@ -743,9 +805,10 @@ comm_point_close(struct comm_point* c) { if(!c) return; - if(event_del(&c->ev->ev) != 0) { - log_err("could not event_del on close"); - } + if(c->fd != -1) + if(event_del(&c->ev->ev) != 0) { + log_err("could not event_del on close"); + } /* close fd after removing from event lists, or epoll.. is messed up */ if(c->fd != -1 && !c->do_not_close) close(c->fd); diff --git a/util/netevent.h b/util/netevent.h index 2f8295743..4e031d7cb 100644 --- a/util/netevent.h +++ b/util/netevent.h @@ -155,6 +155,9 @@ struct comm_point { So that when that is done the callback is called. */ int tcp_do_toggle_rw; + /** if set, checks for pending error from nonblocking connect() call.*/ + int tcp_check_nb_connect; + /** callback when done. tcp_accept does not get called back, is NULL then. If a timeout happens, callback with timeout=1 is called. @@ -288,6 +291,16 @@ struct comm_point* comm_point_create_tcp(struct comm_base* base, int fd, int num, size_t bufsize, comm_point_callback_t* callback, void* callback_arg); +/** + * Create an outgoing TCP commpoint. No file descriptor is opened, left at -1. + * @param bufsize: size of buffer to create for handlers. + * @param callback: callback function pointer for the handler. + * @param callback_arg: will be passed to your callback function. + * @return: the commpoint or NULL on error. + */ +struct comm_point* comm_point_create_tcp_out(size_t bufsize, + comm_point_callback_t* callback, void* callback_arg); + /** * Create commpoint to listen to a local domain file descriptor. * @param base: in which base to alloc the commpoint.