]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
TCP outgoing services.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 8 May 2007 13:25:21 +0000 (13:25 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 8 May 2007 13:25:21 +0000 (13:25 +0000)
git-svn-id: file:///svn/unbound/trunk@294 be551aaa-1e26-0410-a405-d3ace91eadb9

13 files changed:
daemon/worker.c
doc/Changelog
doc/example.conf
doc/unbound.conf.5
services/outside_network.c
services/outside_network.h
testcode/fake_event.c
util/config_file.c
util/config_file.h
util/configlexer.lex
util/configparser.y
util/netevent.c
util/netevent.h

index e6a1f471897f932ccb00b534c0c1d119a596dda5..4e78104f4e80ab769f8a6606878707d9d573d18d 100644 (file)
@@ -672,7 +672,8 @@ worker_init(struct worker* worker, struct config_file *cfg,
                cfg->outgoing_num_ports * worker->thread_num;
        worker->back = outside_network_create(worker->base,
                buffer_size, (size_t)cfg->outgoing_num_ports, cfg->ifs, 
-               cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport);
+               cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport, 
+               cfg->do_tcp?cfg->outgoing_num_tcp:0);
        if(!worker->back) {
                log_err("could not create outgoing sockets");
                worker_delete(worker);
index a058fadfb28145716480cb8c21b21118a51b3c96..7acb5cf8f5cc85b748dde7dd8c401f92479173a4 100644 (file)
@@ -1,3 +1,10 @@
+8 May 2007: Wouter
+       - outgoing network keeps list of available tcp buffers for outgoing 
+         tcp queries.
+       - outgoing-num-tcp config option.
+       - outgoing network keeps waiting list of queries waiting for buffer.
+       - netevent supports outgoing tcp commpoints, nonblocking connects.
+
 7 May 2007: Wouter
        - EDNS read from query, used to make reply smaller.
        - advertised edns value constants.
index 25bd71e42f3b8dae51759010204045e1a1efd177..8d0094eaee564c3d1e0535eb2d3f15dd96b2ddcb 100644 (file)
@@ -39,6 +39,9 @@ server:
        # But also takes more system resources (for open sockets).
        # outgoing-range: 16
 
+       # number of outgoing simultaneous tcp buffers to hold per thread.
+       # outgoing-num-tcp: 10
+
        # the amount of memory to use for the message cache.
        # in bytes. default is 4 Mb
        # msg-cache-size: 4194304
index e6dd79c62f06fb7ffcc8489ebcd6f6b9e6ad2f94..c4a4b2ce0c70cb7bda92cbbd9b56999482070150 100644 (file)
@@ -63,6 +63,9 @@ Number of ports to open. This number is opened per thread for every outgoing
 query interface. Must be at least 1. Default is 16.
 Larger numbers give more protection against spoofing attempts, but need
 extra resources from the operating system.
+.It \fBoutgoing-num-tcp:\fR <number>
+Number of outgoing TCP buffers to allocate per thread. Default is 10. If set
+to 0, or if do_tcp is "no", no TCP queries to authoritative servers are done.
 .It \fBmsg-cache-size:\fR <number>
 Number of bytes size of the message cache. Default is 4 megabytes.
 .It \fBmsg-cache-slabs:\fR <number>
index a4cadeb0828dcf5803a73b356efb48281600c4b4..c2a738891c7921f51b0ce0b483964bda37cc7ee1 100644 (file)
@@ -109,6 +109,98 @@ pending_cmp(const void* key1, const void* key2)
        }
 }
 
+/** use next free buffer to service a tcp query */
+static void
+outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
+{
+       struct pending_tcp* pend = w->outnet->tcp_free;
+       int s;
+       log_assert(pend);
+       log_assert(pkt);
+       /* open socket */
+#ifndef INET6
+       if(addr_is_ip6(addr))
+               s = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
+       else
+#endif
+               s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+       if(s == -1) {
+               log_err("outgoing tcp: socket: %s", strerror(errno));
+               log_addr(&w->addr, w->addrlen);
+               (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
+               free(w);
+               return;
+       }
+       fd_set_nonblock(s);
+       if(connect(s, (struct sockaddr*)&w->addr, w->addrlen) == -1) {
+               if(errno != EINPROGRESS) {
+                       log_err("outgoing tcp: connect: %s", strerror(errno));
+                       log_addr(&w->addr, w->addrlen);
+                       close(s);
+                       (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
+                       free(w);
+                       return;
+               }
+       }
+       w->pkt = NULL;
+       w->next_waiting = (void*)pend;
+       memmove(&pend->id, pkt, sizeof(uint16_t));
+       w->outnet->tcp_free = pend->next_free;
+       pend->next_free = NULL;
+       pend->query = w;
+       ldns_buffer_clear(pend->c->buffer);
+       ldns_buffer_write(pend->c->buffer, pkt, w->pkt_len);
+       ldns_buffer_flip(pend->c->buffer);
+       pend->c->tcp_is_reading = 0;
+       pend->c->tcp_byte_count = 0;
+       comm_point_start_listening(pend->c, s, -1);
+       return;
+}
+
+/** see if buffers can be used to service TCP queries. */
+static void
+use_free_buffer(struct outside_network* outnet)
+{
+       struct waiting_tcp* w;
+       while(outnet->tcp_free && outnet->tcp_wait_first) {
+               w = outnet->tcp_wait_first;
+               outnet->tcp_wait_first = w->next_waiting;
+               if(outnet->tcp_wait_last == w)
+                       outnet->tcp_wait_last = NULL;
+               outnet_tcp_take_into_use(w, w->pkt);
+       }
+}
+
+/** callback for pending tcp connections */
+static int 
+outnet_tcp_cb(struct comm_point* c, void* arg, int error,
+       struct comm_reply *reply_info)
+{
+       struct pending_tcp* pend = (struct pending_tcp*)arg;
+       struct outside_network* outnet = pend->query->outnet;
+       verbose(VERB_ALGO, "outnettcp cb");
+       if(error != NETEVENT_NOERROR) {
+               log_info("outnettcp got tcp error %d", error);
+               /* pass error below and exit */
+       } else {
+               /* check ID */
+               if(ldns_buffer_limit(c->buffer) < sizeof(uint16_t) ||
+                       LDNS_ID_WIRE(ldns_buffer_begin(c->buffer))!=pend->id) {
+                       log_info("outnettcp: bad ID in reply");
+                       log_addr(&pend->query->addr, pend->query->addrlen);
+                       error = NETEVENT_CLOSED;
+               }
+       }
+       (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
+       comm_point_close(c);
+       pend->next_free = outnet->tcp_free;
+       outnet->tcp_free = pend;
+       free(pend->query);
+       pend->query = NULL;
+       use_free_buffer(outnet);
+       return 0;
+}
+
 /** callback for incoming udp answers from the network. */
 static int 
 outnet_udp_cb(struct comm_point* c, void* arg, int error,
@@ -271,10 +363,34 @@ pending_udp_timer_cb(void *arg)
        pending_delete(p->outnet, p);
 }
 
+/** create pending_tcp buffers */
+static int
+create_pending_tcp(struct outside_network* outnet, size_t bufsize)
+{
+       size_t i;
+       if(outnet->num_tcp == 0)
+               return 1; /* no tcp needed, nothing to do */
+       if(!(outnet->tcp_conns = (struct pending_tcp **)calloc(
+                       outnet->num_tcp, sizeof(struct pending_tcp*))))
+               return 0;
+       for(i=0; i<outnet->num_tcp; i++) {
+               if(!(outnet->tcp_conns[i] = (struct pending_tcp*)calloc(1, 
+                       sizeof(struct pending_tcp))))
+                       return 0;
+               outnet->tcp_conns[i]->next_free = outnet->tcp_free;
+               outnet->tcp_free = outnet->tcp_conns[i];
+               outnet->tcp_conns[i]->c = comm_point_create_tcp_out(
+                       bufsize, outnet_tcp_cb, outnet->tcp_conns[i]);
+               if(!outnet->tcp_conns[i]->c)
+                       return 0;
+       }
+       return 1;
+}
+
 struct outside_network* 
 outside_network_create(struct comm_base *base, size_t bufsize, 
        size_t num_ports, char** ifs, int num_ifs, int do_ip4, 
-       int do_ip6, int port_base)
+       int do_ip6, int port_base, size_t num_tcp)
 {
        struct outside_network* outnet = (struct outside_network*)
                calloc(1, sizeof(struct outside_network));
@@ -284,6 +400,7 @@ outside_network_create(struct comm_base *base, size_t bufsize,
                return NULL;
        }
        outnet->base = base;
+       outnet->num_tcp = num_tcp;
 #ifndef INET6
        do_ip6 = 0;
 #endif
@@ -295,7 +412,8 @@ outside_network_create(struct comm_base *base, size_t bufsize,
                        outnet->num_udp4+1, sizeof(struct comm_point*))) ||
                !(outnet->udp6_ports = (struct comm_point **)calloc(
                        outnet->num_udp6+1, sizeof(struct comm_point*))) ||
-               !(outnet->pending = rbtree_create(pending_cmp)) ) {
+               !(outnet->pending = rbtree_create(pending_cmp)) ||
+               !create_pending_tcp(outnet, bufsize)) {
                log_err("malloc failed");
                outside_network_delete(outnet);
                return NULL;
@@ -376,6 +494,15 @@ outside_network_delete(struct outside_network* outnet)
                        comm_point_delete(outnet->udp6_ports[i]);
                free(outnet->udp6_ports);
        }
+       if(outnet->tcp_conns) {
+               size_t i;
+               for(i=0; i<outnet->num_tcp; i++)
+                       if(outnet->tcp_conns[i]) {
+                               comm_point_delete(outnet->tcp_conns[i]->c);
+                               free(outnet->tcp_conns[i]);
+                       }
+               free(outnet->tcp_conns);
+       }
        free(outnet);
 }
 
@@ -531,3 +658,86 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
        tv.tv_usec = 0;
        comm_timer_set(pend->timer, &tv);
 }
+
+/** callback for outgoing TCP timer event */
+static void
+outnet_tcptimer(void* arg)
+{
+       struct waiting_tcp* w = (struct waiting_tcp*)arg;
+       struct outside_network* outnet = w->outnet;
+       if(w->pkt) {
+               /* it is on the waiting list */
+               struct waiting_tcp* p=outnet->tcp_wait_first, *prev=NULL;
+               while(p) {
+                       if(p == w) {
+                               if(prev) prev->next_waiting = w->next_waiting;
+                               else    outnet->tcp_wait_first=w->next_waiting;
+                               outnet->tcp_wait_last = prev;
+                               break;
+                       }
+                       prev = p;
+                       p=p->next_waiting;
+               }
+       } else {
+               /* it was in use */
+               struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
+               comm_point_close(pend->c);
+               pend->query = NULL;
+               pend->next_free = outnet->tcp_free;
+               outnet->tcp_free = pend;
+       }
+       (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_TIMEOUT, NULL);
+       free(w);
+       use_free_buffer(outnet);
+}
+
+void 
+pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, 
+       struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
+       comm_point_callback_t* callback, void* callback_arg,
+       struct ub_randstate* rnd)
+{
+       struct pending_tcp* pend = outnet->tcp_free;
+       struct waiting_tcp* w;
+       struct timeval tv;
+       uint16_t id;
+       /* if no buffer is free allocate space to store query */
+       w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp) 
+               + (pend?0:ldns_buffer_limit(packet)));
+       if(!w) {
+               /* callback user for the error */
+               (void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
+               return;
+       }
+       if(!(w->timer = comm_timer_create(outnet->base, outnet_tcptimer, w))) {
+               free(w);
+               (void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
+               return;
+       }
+       w->pkt = NULL;
+       w->pkt_len = ldns_buffer_limit(packet);
+       /* id uses lousy random() TODO use better and entropy */
+       id = ((unsigned)ub_random(rnd)>>8) & 0xffff;
+       LDNS_ID_SET(ldns_buffer_begin(packet), id);
+       memcpy(&w->addr, addr, addrlen);
+       w->addrlen = addrlen;
+       w->outnet = outnet;
+       w->cb = callback;
+       w->cb_arg = callback_arg;
+       tv.tv_sec = timeout;
+       tv.tv_usec = 0;
+       comm_timer_set(w->timer, &tv);
+       if(pend) {
+               /* we have a buffer available right now */
+               outnet_tcp_take_into_use(w, ldns_buffer_begin(packet));
+       } else {
+               /* queue up */
+               w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp);
+               memmove(w->pkt, ldns_buffer_begin(packet), w->pkt_len);
+               w->next_waiting = NULL;
+               if(outnet->tcp_wait_last)
+                       outnet->tcp_wait_last->next_waiting = w;
+               else    outnet->tcp_wait_first = w;
+               outnet->tcp_wait_last = w;
+       }
+}
index 5aaecec581c1643d88340e9396f20b086b447fb9..1b15cd625aab3bd9dea057269234a969553e2328 100644 (file)
@@ -49,6 +49,8 @@
 struct pending;
 struct pending_timeout;
 struct ub_randstate;
+struct pending_tcp;
+struct waiting_tcp;
 
 /**
  * Send queries to outside servers and wait for answers from servers.
@@ -77,8 +79,24 @@ struct outside_network {
        /** number of udp6 ports */
        size_t num_udp6;
 
-       /** pending answers. sorted by id, addr */
+       /** pending udp answers. sorted by id, addr */
        rbtree_t *pending;
+
+       /**
+        * Array of tcp pending used for outgoing TCP connections.
+        * Each can be used to establish a TCP connection with a server.
+        * The file descriptors are -1 if its free, need to be opened for 
+        * the tcp connection. Can be used for ip4 and ip6.
+        */
+       struct pending_tcp **tcp_conns;
+       /** number of tcp communication points. */
+       size_t num_tcp;
+       /** list of tcp comm points that are free for use */
+       struct pending_tcp* tcp_free;
+       /** list of tcp queries waiting for a buffer */
+       struct waiting_tcp* tcp_wait_first;
+       /** last of waiting query list */
+       struct waiting_tcp* tcp_wait_last;
 };
 
 /**
@@ -105,6 +123,53 @@ struct pending {
        struct outside_network* outnet;
 };
 
+/**
+ * Pending TCP query to server.
+ */
+struct pending_tcp {
+       /** next in list of free tcp comm points, or NULL. */
+       struct pending_tcp* next_free;
+       /** the ID for the query; checked in reply */
+       uint16_t id;
+       /** tcp comm point it was sent on (and reply must come back on). */
+       struct comm_point* c;
+       /** the query being serviced, NULL if the pending_tcp is unused. */
+       struct waiting_tcp* query;
+};
+
+/**
+ * Query waiting for TCP buffer.
+ */
+struct waiting_tcp {
+       /** 
+        * next in waiting list.
+        * if pkt==0, this points to the pending_tcp structure.
+        */
+       struct waiting_tcp* next_waiting;
+       /** timeout event; timer keeps running whether the query is
+        * waiting for a buffer or the tcp reply is pending */
+       struct comm_timer* timer;
+       /** the outside network it is part of */
+       struct outside_network* outnet;
+       /** remote address. */
+       struct sockaddr_storage addr;
+       /** length of addr field in use. */
+       socklen_t addrlen;
+       /** 
+        * The query itself, the query packet to send.
+        * allocated after the waiting_tcp structure.
+        * set to NULL when the query is serviced and it part of pending_tcp.
+        * if this is NULL, the next_waiting points to the pending_tcp.
+        */
+       uint8_t* pkt;
+       /** length of query packet. */
+       size_t pkt_len;
+       /** callback for the timeout, error or reply to the message */
+       comm_point_callback_t* cb;
+       /** callback user argument */
+       void* cb_arg;
+};
+
 /**
  * Create outside_network structure with N udp ports.
  * @param base: the communication base to use for event handling.
@@ -117,11 +182,12 @@ struct pending {
  * @param do_ip6: service IP6.
  * @param port_base: if -1 system assigns ports, otherwise try to get
  *    the ports numbered from this starting number.
+ * @param num_tcp: number of outgoing tcp buffers to preallocate.
  * @return: the new structure (with no pending answers) or NULL on error.
  */
 struct outside_network* outside_network_create(struct comm_base* base,
        size_t bufsize, size_t num_ports, char** ifs, int num_ifs,
-       int do_ip4, int do_ip6, int port_base);
+       int do_ip4, int do_ip6, int port_base, size_t num_tcp);
 
 /**
  * Delete outside_network structure.
@@ -148,6 +214,27 @@ void pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
        comm_point_callback_t* callback, void* callback_arg,
        struct ub_randstate* rnd);
 
+/**
+ * Send TCP query. May wait for TCP buffer. Selects ID to be random, and 
+ * checks id.
+ * @param outnet: provides the event handling.
+ * @param packet: wireformat query to send to destination. copied from.
+ * @param addr: address to send to.
+ * @param addrlen: length of addr.
+ * @param timeout: in seconds from now.
+ *    Timer starts running now. Timer may expire if all buffers are used,
+ *    without any query been sent to the server yet.
+ * @param callback: function to call on error, timeout or reply.
+ *    The routine does not return an error, instead it calls the callback,
+ *    with an error code if an error happens.
+ * @param callback_arg: user argument for callback function.
+ * @param rnd: random state for generating ID.
+ */
+void pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, 
+       struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
+       comm_point_callback_t* callback, void* callback_arg,
+       struct ub_randstate* rnd);
+
 /**
  * Delete pending answer.
  * @param outnet: outside network the pending query is part of.
index 9563c2304266b394283f30c40aad232eb5f9592c..9368bb9789031b2b55b17b894261296efa9e1e32 100644 (file)
@@ -633,7 +633,8 @@ struct outside_network*
 outside_network_create(struct comm_base* base, size_t bufsize, 
        size_t ATTR_UNUSED(num_ports), char** ATTR_UNUSED(ifs), 
        int ATTR_UNUSED(num_ifs), int ATTR_UNUSED(do_ip4), 
-       int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base))
+       int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base),
+       size_t ATTR_UNUSED(num_tcp))
 {
        struct outside_network* outnet =  calloc(1, 
                sizeof(struct outside_network));
index 113f9abba4d9aab559baab17a164e6371b757d4c..0128afb98d8115b26218cec7c3774acd48883b1e 100644 (file)
@@ -77,6 +77,7 @@ config_create()
        cfg->do_tcp = 1;
        cfg->outgoing_base_port = cfg->port + 1000;
        cfg->outgoing_num_ports = 16;
+       cfg->outgoing_num_tcp = 10;
        cfg->msg_cache_size = 4 * 1024 * 1024;
        cfg->msg_cache_slabs = 4;
        cfg->num_queries_per_thread = 1024;
index f4837de40907a7610258ce15031880d065b90515..47ed20a83ca718954a720f3374661eedfc6acf9e 100644 (file)
@@ -68,6 +68,8 @@ struct config_file {
        int outgoing_base_port;
        /** outgoing port range number of ports (per thread, per if) */
        int outgoing_num_ports;
+       /** number of outgoing tcp buffers per (per thread) */
+       size_t outgoing_num_tcp;
 
        /** size of the message cache */
        size_t msg_cache_size;
index 639ec7e03ad5bbe79bf23a0a68c97ccdf4feea10..f0cf4d80af7687bb76b38ef00ea93e36d491bc4a 100644 (file)
@@ -102,6 +102,7 @@ verbosity{COLON}    { LEXOUT(("v(%s) ", yytext)); return VAR_VERBOSITY;}
 port{COLON}            { LEXOUT(("v(%s) ", yytext)); return VAR_PORT;}
 outgoing-port{COLON}   { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_PORT;}
 outgoing-range{COLON}  { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_RANGE;}
+outgoing-num-tcp{COLON}        { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_NUM_TCP;}
 do-ip4{COLON}          { LEXOUT(("v(%s) ", yytext)); return VAR_DO_IP4;}
 do-ip6{COLON}          { LEXOUT(("v(%s) ", yytext)); return VAR_DO_IP6;}
 do-udp{COLON}          { LEXOUT(("v(%s) ", yytext)); return VAR_DO_UDP;}
index f483ecdac411f7dd5f45ab4f2c50c3113b29faa0..d57198b861fa7681543618b57fe24269de7a955c 100644 (file)
@@ -73,7 +73,7 @@ extern struct config_parser_state* cfg_parser;
 %token VAR_FORWARD_TO VAR_FORWARD_TO_PORT VAR_CHROOT
 %token VAR_USERNAME VAR_DIRECTORY VAR_LOGFILE VAR_PIDFILE
 %token VAR_MSG_CACHE_SIZE VAR_MSG_CACHE_SLABS VAR_NUM_QUERIES_PER_THREAD
-%token VAR_RRSET_CACHE_SIZE VAR_RRSET_CACHE_SLABS
+%token VAR_RRSET_CACHE_SIZE VAR_RRSET_CACHE_SLABS VAR_OUTGOING_NUM_TCP
 
 %%
 toplevelvars: /* empty */ | toplevelvars toplevelvar ;
@@ -97,7 +97,7 @@ content_server: server_num_threads | server_verbosity | server_port |
        server_username | server_directory | server_logfile | server_pidfile |
        server_msg_cache_size | server_msg_cache_slabs |
        server_num_queries_per_thread | server_rrset_cache_size | 
-       server_rrset_cache_slabs 
+       server_rrset_cache_slabs | server_outgoing_num_tcp
        ;
 server_num_threads: VAR_NUM_THREADS STRING 
        { 
@@ -157,6 +157,15 @@ server_outgoing_range: VAR_OUTGOING_RANGE STRING
                free($2);
        }
        ;
+server_outgoing_num_tcp: VAR_OUTGOING_NUM_TCP STRING
+       {
+               OUTYY(("P(server_outgoing_num_tcp:%s)\n", $2));
+               if(atoi($2) == 0 && strcmp($2, "0") != 0)
+                       yyerror("number expected");
+               else cfg_parser->cfg->outgoing_num_tcp = atoi($2);
+               free($2);
+       }
+       ;
 server_do_ip4: VAR_DO_IP4 STRING
        {
                OUTYY(("P(server_do_ip4:%s)\n", $2));
index c80f42f6874999b5159464bb65a89f01e4b77fa9..184998658d78232b0283721b77f732630b4ae4b1 100644 (file)
@@ -318,11 +318,13 @@ reclaim_tcp_handler(struct comm_point* c)
 {
        log_assert(c->type == comm_tcp);
        comm_point_close(c);
-       c->tcp_free = c->tcp_parent->tcp_free;
-       c->tcp_parent->tcp_free = c;
-       if(!c->tcp_free) {
-               /* re-enable listening on accept socket */
-               comm_point_start_listening(c->tcp_parent, -1, -1);
+       if(c->tcp_parent) {
+               c->tcp_free = c->tcp_parent->tcp_free;
+               c->tcp_parent->tcp_free = c;
+               if(!c->tcp_free) {
+                       /* re-enable listening on accept socket */
+                       comm_point_start_listening(c->tcp_parent, -1, -1);
+               }
        }
 }
 
@@ -336,8 +338,10 @@ tcp_callback_writer(struct comm_point* c)
                c->tcp_is_reading = 1;
        c->tcp_byte_count = 0;
        comm_point_stop_listening(c);
-       /* for listening socket */
-       reclaim_tcp_handler(c);
+       if(c->tcp_parent) /* for listening socket */
+               reclaim_tcp_handler(c);
+       else    /* its outgoing socket, start listening for reading */
+               comm_point_start_listening(c, -1, -1);
 }
 
 /** do the callback when reading is done */
@@ -421,7 +425,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
        return 1;
 }
 
-/** Handle tcp writing callback. 
+/** 
+ * Handle tcp writing callback. 
  * @param fd: file descriptor of socket.
  * @param c: comm point to write buffer out of.
  * @return: 0 on error
@@ -433,6 +438,21 @@ comm_point_tcp_handle_write(int fd, struct comm_point* c)
        log_assert(c->type == comm_tcp);
        if(c->tcp_is_reading)
                return 0;
+       if(c->tcp_byte_count == 0 && c->tcp_check_nb_connect) {
+               /* check for pending error from nonblocking connect */
+               /* from Stevens, unix network programming, vol1, 3rd ed, p450*/
+               int error = 0;
+               socklen_t len = (socklen_t)sizeof(error);
+               if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
+                       error = errno; /* on solaris errno is error */
+               }
+               if(error == EINPROGRESS || error == EWOULDBLOCK)
+                       return 1; /* try again later */
+               if(error != 0) {
+                       log_err("tcp connect: %s", strerror(error));
+                       return 0;
+               }
+       }
 
        if(c->tcp_byte_count < sizeof(uint16_t)) {
                uint16_t len = htons(ldns_buffer_limit(c->buffer));
@@ -553,6 +573,7 @@ comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
        c->tcp_do_close = 0;
        c->do_not_close = 0;
        c->tcp_do_toggle_rw = 0;
+       c->tcp_check_nb_connect = 0;
        c->callback = callback;
        c->cb_arg = callback_arg;
        evbits = EV_READ | EV_PERSIST;
@@ -607,6 +628,7 @@ comm_point_create_tcp_handler(struct comm_base *base,
        c->tcp_do_close = 0;
        c->do_not_close = 0;
        c->tcp_do_toggle_rw = 1;
+       c->tcp_check_nb_connect = 0;
        c->callback = callback;
        c->cb_arg = callback_arg;
        /* add to parent free list */
@@ -662,6 +684,7 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize,
        c->tcp_do_close = 0;
        c->do_not_close = 0;
        c->tcp_do_toggle_rw = 0;
+       c->tcp_check_nb_connect = 0;
        c->callback = NULL;
        c->cb_arg = NULL;
        evbits = EV_READ | EV_PERSIST;
@@ -688,6 +711,44 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize,
        return c;
 }
 
+struct comm_point* 
+comm_point_create_tcp_out(size_t bufsize,
+        comm_point_callback_t* callback, void* callback_arg)
+{
+       struct comm_point* c = (struct comm_point*)calloc(1,
+               sizeof(struct comm_point));
+       if(!c)
+               return NULL;
+       c->ev = (struct internal_event*)calloc(1,
+               sizeof(struct internal_event));
+       if(!c->ev) {
+               free(c);
+               return NULL;
+       }
+       c->fd = -1;
+       c->buffer = ldns_buffer_new(bufsize);
+       if(!c->buffer) {
+               free(c->ev);
+               free(c);
+               return NULL;
+       }
+       c->timeout = NULL;
+       c->tcp_is_reading = 0;
+       c->tcp_byte_count = 0;
+       c->tcp_parent = NULL;
+       c->max_tcp_count = 0;
+       c->tcp_handlers = NULL;
+       c->tcp_free = NULL;
+       c->type = comm_tcp;
+       c->tcp_do_close = 0;
+       c->do_not_close = 0;
+       c->tcp_do_toggle_rw = 1;
+       c->tcp_check_nb_connect = 1;
+       c->callback = callback;
+       c->cb_arg = callback_arg;
+       return c;
+}
+
 struct comm_point* 
 comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
         comm_point_callback_t* callback, void* callback_arg)
@@ -721,6 +782,7 @@ comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
        c->tcp_do_close = 0;
        c->do_not_close = 1;
        c->tcp_do_toggle_rw = 0;
+       c->tcp_check_nb_connect = 0;
        c->callback = callback;
        c->cb_arg = callback_arg;
        /* libevent stuff */
@@ -743,9 +805,10 @@ comm_point_close(struct comm_point* c)
 {
        if(!c)
                return;
-       if(event_del(&c->ev->ev) != 0) {
-               log_err("could not event_del on close");
-       }
+       if(c->fd != -1)
+               if(event_del(&c->ev->ev) != 0) {
+                       log_err("could not event_del on close");
+               }
        /* close fd after removing from event lists, or epoll.. is messed up */
        if(c->fd != -1 && !c->do_not_close)
                close(c->fd);
index 2f8295743fdd9c60452c9d7aa5d3d0219f80ef43..4e031d7cbea3887b4fe0dc58634d11a3cb92b74d 100644 (file)
@@ -155,6 +155,9 @@ struct comm_point {
            So that when that is done the callback is called. */
        int tcp_do_toggle_rw;
 
+       /** if set, checks for pending error from nonblocking connect() call.*/
+       int tcp_check_nb_connect;
+
        /** callback when done.
            tcp_accept does not get called back, is NULL then.
            If a timeout happens, callback with timeout=1 is called.
@@ -288,6 +291,16 @@ struct comm_point* comm_point_create_tcp(struct comm_base* base,
        int fd, int num, size_t bufsize, 
        comm_point_callback_t* callback, void* callback_arg);
 
+/**
+ * Create an outgoing TCP commpoint. No file descriptor is opened, left at -1.
+ * @param bufsize: size of buffer to create for handlers.
+ * @param callback: callback function pointer for the handler.
+ * @param callback_arg: will be passed to your callback function.
+ * @return: the commpoint or NULL on error.
+ */
+struct comm_point* comm_point_create_tcp_out(size_t bufsize, 
+       comm_point_callback_t* callback, void* callback_arg);
+
 /**
  * Create commpoint to listen to a local domain file descriptor.
  * @param base: in which base to alloc the commpoint.