]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
Delayer util.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Mon, 25 Feb 2008 15:35:23 +0000 (15:35 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Mon, 25 Feb 2008 15:35:23 +0000 (15:35 +0000)
git-svn-id: file:///svn/unbound/trunk@989 be551aaa-1e26-0410-a405-d3ace91eadb9

doc/Changelog
testcode/delayer.c

index 47d47805b1df59ae1502efe1e27ba35cd16a2265..07379a8ca9bcefd7b035f73f3388474a0730773d 100644 (file)
@@ -1,3 +1,6 @@
+23 February 2008: Wouter
+       - delay utility works. Gets decent thoughput too (>20000).
+
 22 February 2008: Wouter
        - +2% for recursions, if identical queries (except for destination
          and query ID) in the reply list, avoid re-encoding the answer.
index 688f2810562b72c7a1335ac610a9cfab7aa8b58d..c1138593cf49860d44fe197dbe16ae6889dea6c8 100644 (file)
@@ -45,6 +45,9 @@
 #include "util/config_file.h"
 #include <signal.h>
 
+/** number of reads per select for delayer */
+#define TRIES_PER_SELECT 100
+
 /**
  * The ring buffer
  */
@@ -59,19 +62,87 @@ struct ringbuf {
        size_t high;
 };
 
+/**
+ * List of proxy fds that return replies from the server to our clients.
+ */
+struct proxy {
+       /** the fd to listen for replies from server */
+       int s;
+       /** last time this was used */
+       struct timeval lastuse;
+       /** remote address */
+       struct sockaddr_storage addr;
+       /** length of addr */
+       socklen_t addr_len;
+       /** number of queries waiting (in total) */
+       size_t numwait;
+       /** number of queries sent to server (in total) */
+       size_t numsent;
+       /** numberof answers returned to client (in total) */
+       size_t numreturn;
+       /** how many times repurposed */
+       size_t numreuse;
+       /** next in proxylist */
+       struct proxy* next;
+};
+
 /** usage information for delayer */
 void usage(char* argv[])
 {
        printf("usage: %s [options]\n", argv[0]);
        printf("        -f addr : use addr, forward to that server, @port.\n");
        printf("        -b addr : bind to this address to listen.\n");
-       printf("        -p port : bind to this port (0 for random).\n");
+       printf("        -p port : bind to this port (use 0 for random).\n");
        printf("        -m mem  : use this much memory for waiting queries.\n");
        printf("        -d delay: queries are delayed n milliseconds.\n");
        printf("        -h      : this help message\n");
        exit(1);
 }
 
+/** timeval compare, t1 < t2 */
+static int
+dl_tv_smaller(struct timeval* t1, const struct timeval* t2) 
+{
+#ifndef S_SPLINT_S
+       if(t1->tv_sec < t2->tv_sec)
+               return 1;
+       if(t1->tv_sec == t2->tv_sec &&
+               t1->tv_usec < t2->tv_usec)
+               return 1;
+#endif
+       return 0;
+}
+
+/** timeval add, t1 += t2 */
+static void
+dl_tv_add(struct timeval* t1, const struct timeval* t2) 
+{
+#ifndef S_SPLINT_S
+       t1->tv_sec += t2->tv_sec;
+       t1->tv_usec += t2->tv_usec;
+       while(t1->tv_usec > 1000000) {
+               t1->tv_usec -= 1000000;
+               t1->tv_sec++;
+       }
+#endif
+}
+
+/** timeval subtract, t1 -= t2 */
+static void
+dl_tv_subtract(struct timeval* t1, const struct timeval* t2) 
+{
+#ifndef S_SPLINT_S
+       t1->tv_sec -= t2->tv_sec;
+       if(t1->tv_usec >= t2->tv_usec) {
+               t1->tv_usec -= t2->tv_usec;
+       } else {
+               t1->tv_sec--;
+               t1->tv_usec = 1000000-(t2->tv_usec-t1->tv_usec);
+       }
+#endif
+}
+
+
 /** create new ring buffer */
 static struct ringbuf*
 ring_create(size_t sz)
@@ -95,6 +166,127 @@ ring_delete(struct ringbuf* r)
        free(r);
 }
 
+/** add entry to ringbuffer */
+static void
+ring_add(struct ringbuf* r, ldns_buffer* pkt, struct timeval* now, 
+       struct timeval* delay, struct proxy* p)
+{
+       /* time -- proxy* -- 16bitlen -- message */
+       uint16_t len = (uint16_t)ldns_buffer_limit(pkt);
+       struct timeval when;
+       size_t needed;
+       uint8_t* where = NULL;
+       log_assert(ldns_buffer_limit(pkt) <= 65535);
+       needed = sizeof(when) + sizeof(p) + sizeof(len) + len;
+       /* put item into ringbuffer */
+       if(r->low < r->high) {
+               /* used part is in the middle */
+               if(r->size - r->high >= needed) {
+                       where = r->buf + r->high;
+                       r->high += needed;
+               } else if(r->low > needed) {
+                       /* wrap around ringbuffer */
+                       /* make sure r->low == r->high means empty */
+                       /* so r->low == r->high cannot be used to signify
+                        * a completely full ringbuf */
+                       if(r->size - r->high > sizeof(when)+sizeof(p)) {
+                               /* zero entry at end of buffer */
+                               memset(r->buf+r->high, 0, 
+                                       sizeof(when)+sizeof(p));
+                       }
+                       where = r->buf;
+                       r->high = needed;
+               } else {
+                       /* drop message */
+                       log_warn("warning: mem full, dropped message");
+                       return;
+               }
+       } else {
+               /* empty */
+               if(r->high == r->low) {
+                       where = r->buf;
+                       r->low = 0;
+                       r->high = needed;
+               /* unused part is in the middle */
+               /* so ringbuffer has wrapped around */
+               } else if(r->low - r->high > needed) {
+                       where = r->buf + r->high;
+                       r->high += needed;
+               } else {
+                       log_warn("warning: mem full, dropped message");
+                       return;
+               }
+       }
+       when = *now;
+       dl_tv_add(&when, delay);
+       /* copy it at where part */
+       log_assert(where != NULL);
+       memmove(where, &when, sizeof(when));
+       memmove(where+sizeof(when), &p, sizeof(p));
+       memmove(where+sizeof(when)+sizeof(p), &len, sizeof(len));
+       memmove(where+sizeof(when)+sizeof(p)+sizeof(len), 
+               ldns_buffer_begin(pkt), len);
+}
+
+/** see if the ringbuffer is empty */
+static int
+ring_empty(struct ringbuf* r)
+{
+       return (r->low == r->high);
+}
+
+/** peek at timevalue for next item in ring */
+static struct timeval*
+ring_peek_time(struct ringbuf* r)
+{
+       if(ring_empty(r))
+               return NULL;
+       return (struct timeval*)&r->buf[r->low];
+}
+
+/** get entry from ringbuffer */
+static int
+ring_pop(struct ringbuf* r, ldns_buffer* pkt, struct timeval* tv, 
+       struct proxy** p)
+{
+       /* time -- proxy* -- 16bitlen -- message */
+       uint16_t len;
+       uint8_t* where = NULL;
+       size_t done;
+       if(r->low == r->high)
+               return 0;
+       where = r->buf + r->low;
+       memmove(tv, where, sizeof(*tv));
+       memmove(p, where+sizeof(*tv), sizeof(*p));
+       memmove(&len, where+sizeof(*tv)+sizeof(*p), sizeof(len));
+       memmove(ldns_buffer_begin(pkt), 
+               where+sizeof(*tv)+sizeof(*p)+sizeof(len), len);
+       ldns_buffer_set_limit(pkt, (size_t)len);
+       done = sizeof(*tv)+sizeof(*p)+sizeof(len)+len;
+       /* move lowmark */
+       if(r->low < r->high) {
+               /* used part in middle */
+               log_assert(r->high - r->low >= done);
+               r->low += done;
+       } else {
+               /* unused part in middle */
+               log_assert(r->size - r->low >= done);
+               r->low += done;
+               if(r->size - r->low > sizeof(*tv)+sizeof(*p)) {
+                       /* see if it is zeroed; means end of buffer */
+                       struct proxy* pz;
+                       memmove(&pz, r->buf+r->low+sizeof(*tv), sizeof(pz));
+                       if(pz == NULL)
+                               r->low = 0;
+               } else r->low = 0;
+       }
+       if(r->low == r->high) {
+               r->low = 0; /* reset if empty */
+               r->high = 0;
+       }
+       return 1;
+}
+       
 /** signal handler global info */
 static volatile int do_quit = 0;
 
@@ -105,6 +297,253 @@ static RETSIGTYPE delayer_sigh(int sig)
        do_quit = 1;
 }
 
+/** send out waiting packets */
+static void
+service_send(struct ringbuf* ring, struct timeval* now, ldns_buffer* pkt,
+       struct sockaddr_storage* srv_addr, socklen_t srv_len)
+{
+       struct proxy* p;
+       struct timeval tv;
+       ssize_t sent;
+       while(!ring_empty(ring) && 
+               dl_tv_smaller(ring_peek_time(ring), now)) {
+               /* this items needs to be sent out */
+               if(!ring_pop(ring, pkt, &tv, &p))
+                       fatal_exit("ringbuf error: pop failed");
+               verbose(1, "send out query %d.%6.6d", 
+                       (unsigned)tv.tv_sec, (unsigned)tv.tv_usec);
+               log_addr(1, "from client", &p->addr, p->addr_len);
+               /* send it */
+               sent = sendto(p->s, ldns_buffer_begin(pkt), 
+                       ldns_buffer_limit(pkt), 0, 
+                       (struct sockaddr*)srv_addr, srv_len);
+               if(sent == -1) {
+                       log_err("sendto: %s", strerror(errno));
+               } else if(sent != (ssize_t)ldns_buffer_limit(pkt)) {
+                       log_err("sendto: partial send");
+               }
+               p->lastuse = *now;
+               p->numsent++;
+       }
+}
+
+/** do proxy for one readable client */
+static void
+do_proxy(struct proxy* p, int retsock, ldns_buffer* pkt)
+{
+       int i;
+       ssize_t r;
+       for(i=0; i<TRIES_PER_SELECT; i++) {
+               r = recv(p->s, ldns_buffer_begin(pkt), 
+                       ldns_buffer_capacity(pkt), 0);
+               if(r == -1) {
+                       if(errno == EAGAIN || errno == EINTR)
+                               return;
+                       log_err("recv: %s", strerror(errno));
+                       return;
+               }
+               ldns_buffer_set_limit(pkt, (size_t)r);
+               log_addr(1, "return reply to client", &p->addr, p->addr_len);
+               /* send reply back to the real client */
+               p->numreturn++;
+               r = sendto(retsock, ldns_buffer_begin(pkt), (size_t)r, 0,
+                       (struct sockaddr*)&p->addr, p->addr_len);
+               if(r == -1) {
+                       log_err("sendto: %s", strerror(errno));
+               }
+       }
+}
+
+/** proxy return replies to clients */
+static void
+service_proxy(fd_set* rset, int retsock, struct proxy* proxies, 
+       ldns_buffer* pkt, struct timeval* now)
+{
+       struct proxy* p;
+       for(p = proxies; p; p = p->next) {
+               if(FD_ISSET(p->s, rset)) {
+                       p->lastuse = *now;
+                       do_proxy(p, retsock, pkt);
+               }
+       }
+}
+
+/** find or else create proxy for this remote client */
+static struct proxy*
+find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
+       fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6,
+       struct timeval* now)
+{
+       struct proxy* p;
+       struct timeval t;
+       struct timeval reuse_timeout;
+       for(p = *proxies; p; p = p->next) {
+               if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0)
+                       return p;
+       }
+       /* possibly: reuse lapsed entries */
+       reuse_timeout.tv_sec = 1;
+       reuse_timeout.tv_usec = 0;
+       for(p = *proxies; p; p = p->next) {
+               if(p->numwait > p->numsent || p->numsent > p->numreturn)
+                       continue;
+               t = *now;
+               dl_tv_subtract(&t, &p->lastuse);
+               if(dl_tv_smaller(&t, &reuse_timeout))
+                       continue;
+               /* yes! */
+               verbose(1, "reuse existing entry");
+               memmove(&p->addr, from, from_len);
+               p->addr_len = from_len;
+               p->numreuse++;
+               return p;
+       }
+       /* create new */
+       p = calloc(1, sizeof(*p));
+       if(!p) fatal_exit("out of memory");
+       p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0);
+       if(p->s == -1) fatal_exit("socket: %s", strerror(errno));
+       fd_set_nonblock(p->s);
+       memmove(&p->addr, from, from_len);
+       p->addr_len = from_len;
+       p->next = *proxies;
+       *proxies = p;
+       FD_SET(p->s, rorig);
+       if(p->s+1 > *max)
+               *max = p->s+1;
+       return p;
+}
+
+/** recv new waiting packets */
+static void
+service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt, 
+       fd_set* rorig, int* max, struct proxy** proxies,
+       struct sockaddr_storage* srv_addr, socklen_t srv_len, 
+       struct timeval* now, struct timeval* delay)
+{
+       int i;
+       struct sockaddr_storage from;
+       socklen_t from_len;
+       ssize_t len;
+       struct proxy* p;
+       for(i=0; i<TRIES_PER_SELECT; i++) {
+               from_len = (socklen_t)sizeof(from);
+               len = recvfrom(s, ldns_buffer_begin(pkt),
+                       ldns_buffer_capacity(pkt), 0,
+                       (struct sockaddr*)&from, &from_len);
+               if(len < 0) {
+                       if(errno == EAGAIN || errno == EINTR)
+                               return;
+                       fatal_exit("recvfrom: %s", strerror(errno));
+               }
+               ldns_buffer_set_limit(pkt, (size_t)len);
+               /* find its proxy element */
+               p = find_create_proxy(&from, from_len, rorig, max, proxies,
+                       addr_is_ip6(srv_addr, srv_len), now);
+               if(!p) fatal_exit("error: cannot find or create proxy");
+               p->lastuse = *now;
+               ring_add(ring, pkt, now, delay, p);
+               p->numwait++;
+               log_addr(1, "recv from client", &p->addr, p->addr_len);
+       }
+}
+
+/** find waiting time */
+static int
+service_findwait(struct timeval* now, struct timeval* wait, 
+       struct ringbuf* ring)
+{
+       /* first item is the time to wait */
+       struct timeval* peek = ring_peek_time(ring);
+       if(peek) {
+               memmove(wait, peek, sizeof(*wait));
+               dl_tv_subtract(wait, now);
+               return 1;
+       }
+       /* nothing, block */
+       return 0;
+}
+
+/** clear proxy list */
+static void
+proxy_list_clear(struct proxy* p)
+{
+       char from[109];
+       struct proxy* np;
+       int i=0, port;
+       while(p) {
+               np = p->next;
+               port = (int)ntohs(((struct sockaddr_in*)&p->addr)->sin_port);
+               if(addr_is_ip6(&p->addr, p->addr_len)) {
+                       if(inet_ntop(AF_INET6, 
+                               &((struct sockaddr_in6*)&p->addr)->sin6_addr,
+                               from, (socklen_t)sizeof(from)) == 0)
+                               strncpy(from, "err", sizeof(from));
+               } else {
+                       if(inet_ntop(AF_INET, 
+                               &((struct sockaddr_in*)&p->addr)->sin_addr,
+                               from, (socklen_t)sizeof(from)) == 0)
+                               strncpy(from, "err", sizeof(from));
+               }
+               printf("client[%d]: last %s@%d of %d : %u in, %u out, "
+                       "%u returned\n", i++, from, port, (int)p->numreuse+1,
+                       (unsigned)p->numwait, (unsigned)p->numsent, 
+                       (unsigned)p->numreturn);
+               close(p->s);
+               free(p);
+               p = np;
+       }
+}
+
+/** delayer service loop */
+static void
+service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay,
+       struct sockaddr_storage* srv_addr, socklen_t srv_len,
+       ldns_buffer* pkt)
+{
+       fd_set rset, rorig;
+       struct timeval now, wait;
+       int max, have_wait = 0;
+       struct proxy* proxies = NULL;
+#ifndef S_SPLINT_S
+       FD_ZERO(&rorig);
+       FD_SET(udp_s, &rorig);
+#endif
+       max = udp_s + 1;
+       while(!do_quit) {
+               /* wait for events */
+               rset = rorig;
+               if(have_wait)
+                       verbose(1, "wait for %d.%6.6d",
+                       (unsigned)wait.tv_sec, (unsigned)wait.tv_usec);
+               else    verbose(1, "wait");
+               if(select(max, &rset, NULL, NULL, have_wait?&wait:NULL) < 0) {
+                       if(errno == EAGAIN || errno == EINTR)
+                               continue;
+                       fatal_exit("select: %s", strerror(errno));
+               }
+               /* get current time */
+               if(gettimeofday(&now, NULL) < 0) {
+                       if(errno == EAGAIN || errno == EINTR)
+                               continue;
+                       fatal_exit("gettimeofday: %s", strerror(errno));
+               }
+               verbose(1, " ");
+               verbose(1, "process at %u.%6.6u", 
+                       (unsigned)now.tv_sec, (unsigned)now.tv_usec);
+               /* sendout delayed queries to master server (frees up buffer)*/
+               service_send(ring, &now, pkt, srv_addr, srv_len);
+               /* proxy return replies */
+               service_proxy(&rset, udp_s, proxies, pkt, &now);
+               /* see what can be received to start waiting */
+               service_recv(udp_s, ring, pkt, &rorig, &max, &proxies,
+                       srv_addr, srv_len, &now, delay);
+               /* see what next timeout is (if any) */
+               have_wait = service_findwait(&now, &wait, ring);
+       }
+       proxy_list_clear(proxies);
+}
+
 /** delayer main service routine */
 static void
 service(char* bind_str, int bindport, char* serv_str, size_t memsize, 
@@ -114,19 +553,17 @@ service(char* bind_str, int bindport, char* serv_str, size_t memsize,
        socklen_t bind_len, srv_len;
        struct ringbuf* ring = ring_create(memsize);
        struct timeval delay;
-       int s;
+       ldns_buffer* pkt;
+       int i, s;
        delay.tv_sec = delay_msec / 1000;
        delay.tv_usec = (delay_msec % 1000)*1000;
-       if(bindport == 0)
-               bindport = 1024 + random()%64000;
-       if(!ipstrtoaddr(bind_str, bindport, &bind_addr, &bind_len)) {
-               printf("cannot parse listen address: %s\n", bind_str);
-               exit(1);
-       }
        if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) {
                printf("cannot parse forward address: %s\n", serv_str);
                exit(1);
        }
+       pkt = ldns_buffer_new(65535);
+       if(!pkt)
+               fatal_exit("out of memory");
        if( signal(SIGINT, delayer_sigh) == SIG_ERR ||
                signal(SIGTERM, delayer_sigh) == SIG_ERR ||
                signal(SIGHUP, delayer_sigh) == SIG_ERR ||
@@ -134,23 +571,37 @@ service(char* bind_str, int bindport, char* serv_str, size_t memsize,
                signal(SIGALRM, delayer_sigh) == SIG_ERR)
                fatal_exit("could not bind to signal");
        /* bind UDP port */
-       if((s = socket(addr_is_ip6(&bind_addr, bind_len)?AF_INET6:AF_INET,
+       if((s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET,
                SOCK_DGRAM, 0)) == -1)
                fatal_exit("socket: %s", strerror(errno));
-       if(bind(s, (struct sockaddr*)&bind_addr, bind_len) == -1)
-               fatal_exit("bind: %s", strerror(errno));
+       i=0;
+       if(bindport == 0) {
+               bindport = 1024 + random()%64000;
+               i = 100;
+       }
+       while(1) {
+               if(!ipstrtoaddr(bind_str, bindport, &bind_addr, &bind_len)) {
+                       printf("cannot parse listen address: %s\n", bind_str);
+                       exit(1);
+               }
+               if(bind(s, (struct sockaddr*)&bind_addr, bind_len) == -1) {
+                       log_err("bind: %s", strerror(errno));
+                       if(i--==0)
+                               fatal_exit("cannot bind any port");
+                       bindport = 1024 + random()%64000;
+               } else break;
+       }
+       fd_set_nonblock(s);
        printf("listening on port: %d\n", bindport);
 
        /* process loop */
-       /* wait for events */
-       while(!do_quit) {
-               /* get current time */
-               /* sendout delayed queries to master server (frees up buffer)*/
-               /* see what can be received to start waiting */
-               /* see what next timeout is (if any) */
-       }
+       do_quit = 0;
+       service_loop(s, ring, &delay, &srv_addr, srv_len, pkt);
+
        /* cleanup */
+       verbose(1, "cleanup");
        close(s);
+       ldns_buffer_free(pkt);
        ring_delete(ring);
 }
 
@@ -169,7 +620,11 @@ int main(int argc, char** argv)
        size_t memsize = 10*1024*1024;
        int delay = 100;
 
+       verbosity = 0;
+       log_init(0, 0, 0);
+       log_ident_set("delayer");
        srandom(time(NULL) ^ getpid());
+       if(argc == 1) usage(argv);
        while( (c=getopt(argc, argv, "b:d:f:hm:p:")) != -1) {
                switch(c) {
                        case 'b':
@@ -209,9 +664,8 @@ int main(int argc, char** argv)
        if(argc != 0)
                usage(argv);
 
-       printf("bind to %s @ %d and forward to %s\n"
-               "after %d msec (buffer %d)\n", 
-               bindto, bindport, server, delay, (int)memsize);
+       printf("bind to %s @ %d and forward to %s after %d msec\n", 
+               bindto, bindport, server, delay);
        service(bindto, bindport, server, memsize, delay);
        return 0;
 }