]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
delayer in TCP.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 26 Feb 2008 13:04:05 +0000 (13:04 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Tue, 26 Feb 2008 13:04:05 +0000 (13:04 +0000)
git-svn-id: file:///svn/unbound/trunk@990 be551aaa-1e26-0410-a405-d3ace91eadb9

doc/Changelog
testcode/delayer.c

index 07379a8ca9bcefd7b035f73f3388474a0730773d..77bc864109958a0c8914741c205dd548e0d1865a 100644 (file)
@@ -1,4 +1,9 @@
-23 February 2008: Wouter
+26 February 2008: Wouter
+       - delay utility delays TCP as well. If the server that is forwarded 
+         to has a TCP error, the delay utility closes the connection.
+       - delay does REUSE_ADDR, and can handle a server that closes its end.
+
+25 February 2008: Wouter
        - delay utility works. Gets decent thoughput too (>20000).
 
 22 February 2008: Wouter
index c1138593cf49860d44fe197dbe16ae6889dea6c8..77fb33476fcafca29f703a15d0bd1715c9ea186c 100644 (file)
@@ -86,6 +86,51 @@ struct proxy {
        struct proxy* next;
 };
 
+/**
+ * An item that has to be TCP relayed
+ */
+struct tcp_send_list {
+       /** the data item */
+       uint8_t* item;
+       /** size of item */
+       size_t len;
+       /** time when the item can be transmitted on */
+       struct timeval wait;
+       /** how much of the item has already been transmitted */
+       size_t done;
+       /** next in list */
+       struct tcp_send_list* next;
+};
+
+/**
+ * List of TCP proxy fd pairs to TCP connect client to server 
+ */
+struct tcp_proxy {
+       /** the fd to listen for client query */
+       int client_s;
+       /** the fd to listen for server answer */
+       int server_s;
+
+       /** remote client address */
+       struct sockaddr_storage addr;
+       /** length of address */
+       socklen_t addr_len;
+       /** timeout on this entry */
+       struct timeval timeout;
+
+       /** list of query items to send to server */
+       struct tcp_send_list* querylist;
+       /** last in query list */
+       struct tcp_send_list* querylast;
+       /** list of answer items to send to client */
+       struct tcp_send_list* answerlist;
+       /** last in answerlist */
+       struct tcp_send_list* answerlast;
+
+       /** next in list */
+       struct tcp_proxy* next;
+};
+
 /** usage information for delayer */
 void usage(char* argv[])
 {
@@ -94,7 +139,8 @@ void usage(char* argv[])
        printf("        -b addr : bind to this address to listen.\n");
        printf("        -p port : bind to this port (use 0 for random).\n");
        printf("        -m mem  : use this much memory for waiting queries.\n");
-       printf("        -d delay: queries are delayed n milliseconds.\n");
+       printf("        -d delay: UDP queries are delayed n milliseconds.\n");
+       printf("                  TCP is delayed twice (on send, on recv).\n");
        printf("        -h      : this help message\n");
        exit(1);
 }
@@ -372,24 +418,21 @@ service_proxy(fd_set* rset, int retsock, struct proxy* proxies,
 static struct proxy*
 find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
        fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6,
-       struct timeval* now)
+       struct timeval* now, struct timeval* reuse_timeout)
 {
        struct proxy* p;
        struct timeval t;
-       struct timeval reuse_timeout;
        for(p = *proxies; p; p = p->next) {
                if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0)
                        return p;
        }
        /* possibly: reuse lapsed entries */
-       reuse_timeout.tv_sec = 1;
-       reuse_timeout.tv_usec = 0;
        for(p = *proxies; p; p = p->next) {
                if(p->numwait > p->numsent || p->numsent > p->numreturn)
                        continue;
                t = *now;
                dl_tv_subtract(&t, &p->lastuse);
-               if(dl_tv_smaller(&t, &reuse_timeout))
+               if(dl_tv_smaller(&t, reuse_timeout))
                        continue;
                /* yes! */
                verbose(1, "reuse existing entry");
@@ -399,7 +442,7 @@ find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
                return p;
        }
        /* create new */
-       p = calloc(1, sizeof(*p));
+       p = (struct proxy*)calloc(1, sizeof(*p));
        if(!p) fatal_exit("out of memory");
        p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0);
        if(p->s == -1) fatal_exit("socket: %s", strerror(errno));
@@ -419,7 +462,7 @@ static void
 service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt, 
        fd_set* rorig, int* max, struct proxy** proxies,
        struct sockaddr_storage* srv_addr, socklen_t srv_len, 
-       struct timeval* now, struct timeval* delay)
+       struct timeval* now, struct timeval* delay, struct timeval* reuse)
 {
        int i;
        struct sockaddr_storage from;
@@ -439,7 +482,7 @@ service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt,
                ldns_buffer_set_limit(pkt, (size_t)len);
                /* find its proxy element */
                p = find_create_proxy(&from, from_len, rorig, max, proxies,
-                       addr_is_ip6(srv_addr, srv_len), now);
+                       addr_is_ip6(srv_addr, srv_len), now, reuse);
                if(!p) fatal_exit("error: cannot find or create proxy");
                p->lastuse = *now;
                ring_add(ring, pkt, now, delay, p);
@@ -448,15 +491,299 @@ service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt,
        }
 }
 
+/** delete tcp proxy */
+static void
+tcp_proxy_delete(struct tcp_proxy* p)
+{
+       struct tcp_send_list* s, *sn;
+       if(!p)
+               return;
+       log_addr(1, "delete tcp proxy", &p->addr, p->addr_len);
+       s = p->querylist;
+       while(s) {
+               sn = s->next;
+               free(s->item);
+               free(s);
+               s = sn;
+       }
+       s = p->answerlist;
+       while(s) {
+               sn = s->next;
+               free(s->item);
+               free(s);
+               s = sn;
+       }
+       close(p->client_s);
+       if(p->server_s != -1)
+               close(p->server_s);
+       free(p);
+}
+
+/** accept new TCP connections, and set them up */
+static void
+service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies,
+       struct sockaddr_storage* srv_addr, socklen_t srv_len, 
+       struct timeval* now, struct timeval* tcp_timeout)
+{
+       int newfd;
+       struct sockaddr_storage addr;
+       struct tcp_proxy* p;
+       socklen_t addr_len;
+       newfd = accept(s, (struct sockaddr*)&addr, &addr_len);
+       if(newfd == -1) {
+               if(errno == EAGAIN || errno == EINTR)
+                       return;
+               fatal_exit("accept: %s", strerror(errno));
+       }
+       p = (struct tcp_proxy*)calloc(1, sizeof(*p));
+       if(!p) fatal_exit("out of memory");
+       memmove(&p->addr, &addr, addr_len);
+       p->addr_len = addr_len;
+       log_addr(1, "new tcp proxy", &p->addr, p->addr_len);
+       p->client_s = newfd;
+       p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET,
+               SOCK_STREAM, 0);
+       if(p->server_s == -1)
+               fatal_exit("tcp socket: %s", strerror(errno));
+       fd_set_nonblock(p->client_s);
+       fd_set_nonblock(p->server_s);
+       if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) {
+               if(errno != EINPROGRESS) {
+                       log_err("tcp connect: %s", strerror(errno));
+                       close(p->server_s);
+                       close(p->client_s);
+                       free(p);
+                       return;
+               }
+       }
+       p->timeout = *now;
+       dl_tv_add(&p->timeout, tcp_timeout);
+
+       /* listen to client and server */
+       FD_SET(p->client_s, rorig);
+       FD_SET(p->server_s, rorig);
+       if(p->client_s+1 > *max)
+               *max = p->client_s+1;
+       if(p->server_s+1 > *max)
+               *max = p->server_s+1;
+
+       /* add into proxy list */
+       p->next = *proxies;
+       *proxies = p;
+}
+
+/** relay TCP, read a part */
+static int
+tcp_relay_read(int s, struct tcp_send_list** first, 
+       struct tcp_send_list** last, struct timeval* now, 
+       struct timeval* delay, ldns_buffer* pkt)
+{
+       struct tcp_send_list* item;
+       ssize_t r = read(s, ldns_buffer_begin(pkt), ldns_buffer_capacity(pkt));
+       if(r == -1) {
+               if(errno == EINTR || errno == EAGAIN)
+                       return 1;
+               log_err("tcp read: %s", strerror(errno));
+               return 0;
+       } else if(r == 0) {
+               /* connection closed */
+               return 0;
+       }
+       item = (struct tcp_send_list*)malloc(sizeof(*item));
+       if(!item) {
+               log_err("out of memory");
+               return 0;
+       }
+       verbose(1, "read item len %d", (int)r);
+       item->len = (size_t)r;
+       item->item = memdup(ldns_buffer_begin(pkt), item->len);
+       if(!item->item) {
+               free(item);
+               log_err("out of memory");
+               return 0;
+       }
+       item->done = 0;
+       item->wait = *now;
+       dl_tv_add(&item->wait, delay);
+       item->next = NULL;
+       
+       /* link in */
+       if(*first) {
+               (*last)->next = item;
+       } else {
+               *first = item;
+       }
+       *last = item;
+       return 1;
+}
+
+/** relay TCP, write a part */
+static int
+tcp_relay_write(int s, struct tcp_send_list** first, 
+       struct tcp_send_list** last, struct timeval* now)
+{
+       ssize_t r;
+       struct tcp_send_list* p;
+       while(*first) {
+               p = *first;
+               /* is the item ready? */
+               if(!dl_tv_smaller(&p->wait, now))
+                       return 1;
+               /* write it */
+               r = write(s, p->item + p->done, p->len - p->done);
+               if(r == -1) {
+                       if(errno == EAGAIN || errno == EINTR)
+                               return 1;
+                       log_err("tcp write: %s", strerror(errno));
+                       return 0;
+               } else if(r == 0) {
+                       /* closed */
+                       return 0;
+               }
+               /* account it */
+               p->done += (size_t)r;
+               verbose(1, "write item %d of %d", (int)p->done, (int)p->len);
+               if(p->done >= p->len) {
+                       free(p->item);
+                       *first = p->next;
+                       if(!*first)
+                               *last = NULL;
+                       free(p);
+               } else {
+                       /* partial write */
+                       return 1;
+               }
+       }
+       return 1;
+}
+
+/** perform TCP relaying */
+static void
+service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now,
+       struct timeval* delay, struct timeval* tcp_timeout, ldns_buffer* pkt,
+       fd_set* rset, fd_set* rorig, fd_set* worig)
+{
+       struct tcp_proxy* p, **prev;
+       struct timeval tout;
+       int delete_it;
+       p = *tcp_proxies;
+       prev = tcp_proxies;
+       tout = *now;
+       dl_tv_add(&tout, tcp_timeout);
+
+       while(p) {
+               delete_it = 0;
+               /* can we receive further queries? */
+               if(!delete_it && FD_ISSET(p->client_s, rset)) {
+                       p->timeout = tout;
+                       log_addr(1, "read tcp query", &p->addr, p->addr_len);
+                       if(!tcp_relay_read(p->client_s, &p->querylist, 
+                               &p->querylast, now, delay, pkt))
+                               delete_it = 1;
+               }
+               /* can we receive further answers? */
+               if(!delete_it && p->server_s != -1 &&
+                       FD_ISSET(p->server_s, rset)) {
+                       p->timeout = tout;
+                       log_addr(1, "read tcp answer", &p->addr, p->addr_len);
+                       if(!tcp_relay_read(p->server_s, &p->answerlist, 
+                               &p->answerlast, now, delay, pkt)) {
+                               close(p->server_s);
+                               FD_CLR(p->server_s, worig);
+                               FD_CLR(p->server_s, rorig);
+                               p->server_s = -1;
+                       }
+               }
+               /* can we send on further queries */
+               if(!delete_it && p->querylist && p->server_s != -1) {
+                       p->timeout = tout;
+                       if(dl_tv_smaller(&p->querylist->wait, now))
+                               log_addr(1, "write tcp query", 
+                                       &p->addr, p->addr_len);
+                       if(!tcp_relay_write(p->server_s, &p->querylist, 
+                               &p->querylast, now))
+                               delete_it = 1;
+                       if(p->querylist && p->server_s != -1 &&
+                               dl_tv_smaller(&p->querylist->wait, now))
+                               FD_SET(p->server_s, worig);
+                       else    FD_CLR(p->server_s, worig);
+               }
+
+               /* can we send on further answers */
+               if(!delete_it && p->answerlist) {
+                       p->timeout = tout;
+                       if(dl_tv_smaller(&p->answerlist->wait, now))
+                               log_addr(1, "write tcp answer", 
+                                       &p->addr, p->addr_len);
+                       if(!tcp_relay_write(p->client_s, &p->answerlist, 
+                               &p->answerlast, now))
+                               delete_it = 1;
+                       if(p->answerlist && dl_tv_smaller(&p->answerlist->wait,
+                               now))
+                               FD_SET(p->client_s, worig);
+                       else    FD_CLR(p->client_s, worig);
+                       if(!p->answerlist && p->server_s == -1)
+                               delete_it = 1;
+               }
+
+               /* does this entry timeout? (unused too long) */
+               if(dl_tv_smaller(&p->timeout, now)) {
+                       delete_it = 1;
+               }
+               if(delete_it) {
+                       struct tcp_proxy* np = p->next;
+                       *prev = np;
+                       FD_CLR(p->client_s, rorig);
+                       FD_CLR(p->client_s, worig);
+                       if(p->server_s != -1) {
+                               FD_CLR(p->server_s, rorig);
+                               FD_CLR(p->server_s, worig);
+                       }
+                       tcp_proxy_delete(p);
+                       p = np;
+                       continue;
+               }
+
+               prev = &p->next;
+               p = p->next;
+       }
+}
+
 /** find waiting time */
 static int
 service_findwait(struct timeval* now, struct timeval* wait, 
-       struct ringbuf* ring)
+       struct ringbuf* ring, struct tcp_proxy* tcplist)
 {
        /* first item is the time to wait */
        struct timeval* peek = ring_peek_time(ring);
+       struct timeval tcv;
+       int have_tcpval = 0;
+       struct tcp_proxy* p;
+
+       /* also for TCP list the first in sendlists is the time to wait */
+       for(p=tcplist; p; p=p->next) {
+               if(!have_tcpval)
+                       tcv = p->timeout;
+               have_tcpval = 1;
+               if(dl_tv_smaller(&p->timeout, &tcv))
+                       tcv = p->timeout;
+               if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv))
+                       tcv = p->querylist->wait;
+               if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv))
+                       tcv = p->answerlist->wait;
+       }
        if(peek) {
+               /* peek can be unaligned */
+               /* use wait as a temp variable */
                memmove(wait, peek, sizeof(*wait));
+               if(!have_tcpval)
+                       tcv = *wait;
+               else if(dl_tv_smaller(wait, &tcv))
+                       tcv = *wait;
+               have_tcpval = 1;
+       }
+       if(have_tcpval) {
+               *wait = tcv;
                dl_tv_subtract(wait, now);
                return 1;
        }
@@ -495,29 +822,51 @@ proxy_list_clear(struct proxy* p)
        }
 }
 
+/** clear TCP proxy list */
+static void
+tcp_proxy_list_clear(struct tcp_proxy* p)
+{
+       struct tcp_proxy* np;
+       while(p) {
+               np = p->next;
+               tcp_proxy_delete(p);
+               p = np;
+       }
+}
+
 /** delayer service loop */
 static void
-service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay,
-       struct sockaddr_storage* srv_addr, socklen_t srv_len,
+service_loop(int udp_s, int listen_s, struct ringbuf* ring, 
+       struct timeval* delay, struct timeval* reuse,
+       struct sockaddr_storage* srv_addr, socklen_t srv_len, 
        ldns_buffer* pkt)
 {
        fd_set rset, rorig;
+       fd_set wset, worig;
        struct timeval now, wait;
        int max, have_wait = 0;
        struct proxy* proxies = NULL;
+       struct tcp_proxy* tcp_proxies = NULL;
+       struct timeval tcp_timeout;
+       tcp_timeout.tv_sec = 120;
+       tcp_timeout.tv_usec = 0;
 #ifndef S_SPLINT_S
        FD_ZERO(&rorig);
+       FD_ZERO(&worig);
        FD_SET(udp_s, &rorig);
+       FD_SET(listen_s, &rorig);
 #endif
        max = udp_s + 1;
+       if(listen_s + 1 > max) max = listen_s + 1;
        while(!do_quit) {
                /* wait for events */
                rset = rorig;
+               wset = worig;
                if(have_wait)
                        verbose(1, "wait for %d.%6.6d",
                        (unsigned)wait.tv_sec, (unsigned)wait.tv_usec);
                else    verbose(1, "wait");
-               if(select(max, &rset, NULL, NULL, have_wait?&wait:NULL) < 0) {
+               if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) {
                        if(errno == EAGAIN || errno == EINTR)
                                continue;
                        fatal_exit("select: %s", strerror(errno));
@@ -528,8 +877,7 @@ service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay,
                                continue;
                        fatal_exit("gettimeofday: %s", strerror(errno));
                }
-               verbose(1, " ");
-               verbose(1, "process at %u.%6.6u", 
+               verbose(1, "process at %u.%6.6u\n", 
                        (unsigned)now.tv_sec, (unsigned)now.tv_usec);
                /* sendout delayed queries to master server (frees up buffer)*/
                service_send(ring, &now, pkt, srv_addr, srv_len);
@@ -537,11 +885,18 @@ service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay,
                service_proxy(&rset, udp_s, proxies, pkt, &now);
                /* see what can be received to start waiting */
                service_recv(udp_s, ring, pkt, &rorig, &max, &proxies,
-                       srv_addr, srv_len, &now, delay);
+                       srv_addr, srv_len, &now, delay, reuse);
+               /* see if there are new tcp connections */
+               service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies,
+                       srv_addr, srv_len, &now, &tcp_timeout);
+               /* service tcp connections */
+               service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout, 
+                       pkt, &rset, &rorig, &worig);
                /* see what next timeout is (if any) */
-               have_wait = service_findwait(&now, &wait, ring);
+               have_wait = service_findwait(&now, &wait, ring, tcp_proxies);
        }
        proxy_list_clear(proxies);
+       tcp_proxy_list_clear(tcp_proxies);
 }
 
 /** delayer main service routine */
@@ -552,11 +907,17 @@ service(char* bind_str, int bindport, char* serv_str, size_t memsize,
        struct sockaddr_storage bind_addr, srv_addr;
        socklen_t bind_len, srv_len;
        struct ringbuf* ring = ring_create(memsize);
-       struct timeval delay;
+       struct timeval delay, reuse;
        ldns_buffer* pkt;
-       int i, s;
+       int i, s, listen_s;
        delay.tv_sec = delay_msec / 1000;
        delay.tv_usec = (delay_msec % 1000)*1000;
+       reuse = delay; /* reuse is max(4*delay, 1 second) */
+       dl_tv_add(&reuse, &delay);
+       dl_tv_add(&reuse, &delay);
+       dl_tv_add(&reuse, &delay);
+       if(reuse.tv_sec == 0)
+               reuse.tv_sec = 1;
        if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) {
                printf("cannot parse forward address: %s\n", serv_str);
                exit(1);
@@ -592,15 +953,35 @@ service(char* bind_str, int bindport, char* serv_str, size_t memsize,
                } else break;
        }
        fd_set_nonblock(s);
+       /* and TCP port */
+       if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET,
+               SOCK_STREAM, 0)) == -1)
+               fatal_exit("tcp socket: %s", strerror(errno));
+#ifdef SO_REUSEADDR
+       if(1) {
+               int on = 1;
+               if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, &on,
+                       (socklen_t)sizeof(on)) < 0)
+                       fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s",
+                               strerror(errno));
+       }
+#endif
+       if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1)
+               fatal_exit("tcp bind: %s", strerror(errno));
+       if(listen(listen_s, 5) == -1)
+               fatal_exit("tcp listen: %s", strerror(errno));
+       fd_set_nonblock(listen_s);
        printf("listening on port: %d\n", bindport);
 
        /* process loop */
        do_quit = 0;
-       service_loop(s, ring, &delay, &srv_addr, srv_len, pkt);
+       service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len, 
+               pkt);
 
        /* cleanup */
        verbose(1, "cleanup");
        close(s);
+       close(listen_s);
        ldns_buffer_free(pkt);
        ring_delete(ring);
 }
@@ -620,7 +1001,7 @@ int main(int argc, char** argv)
        size_t memsize = 10*1024*1024;
        int delay = 100;
 
-       verbosity = 0;
+       verbosity = 1;
        log_init(0, 0, 0);
        log_ident_set("delayer");
        srandom(time(NULL) ^ getpid());