From 5979bc061f09cfe9e48c40b39023df733d9ee42a Mon Sep 17 00:00:00 2001 From: Wouter Wijngaards Date: Tue, 26 Feb 2008 13:04:05 +0000 Subject: [PATCH] delayer in TCP. git-svn-id: file:///svn/unbound/trunk@990 be551aaa-1e26-0410-a405-d3ace91eadb9 --- doc/Changelog | 7 +- testcode/delayer.c | 423 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 408 insertions(+), 22 deletions(-) diff --git a/doc/Changelog b/doc/Changelog index 07379a8ca..77bc86410 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,4 +1,9 @@ -23 February 2008: Wouter +26 February 2008: Wouter + - delay utility delays TCP as well. If the server that is forwarded + to has a TCP error, the delay utility closes the connection. + - delay does REUSE_ADDR, and can handle a server that closes its end. + +25 February 2008: Wouter - delay utility works. Gets decent thoughput too (>20000). 22 February 2008: Wouter diff --git a/testcode/delayer.c b/testcode/delayer.c index c1138593c..77fb33476 100644 --- a/testcode/delayer.c +++ b/testcode/delayer.c @@ -86,6 +86,51 @@ struct proxy { struct proxy* next; }; +/** + * An item that has to be TCP relayed + */ +struct tcp_send_list { + /** the data item */ + uint8_t* item; + /** size of item */ + size_t len; + /** time when the item can be transmitted on */ + struct timeval wait; + /** how much of the item has already been transmitted */ + size_t done; + /** next in list */ + struct tcp_send_list* next; +}; + +/** + * List of TCP proxy fd pairs to TCP connect client to server + */ +struct tcp_proxy { + /** the fd to listen for client query */ + int client_s; + /** the fd to listen for server answer */ + int server_s; + + /** remote client address */ + struct sockaddr_storage addr; + /** length of address */ + socklen_t addr_len; + /** timeout on this entry */ + struct timeval timeout; + + /** list of query items to send to server */ + struct tcp_send_list* querylist; + /** last in query list */ + struct tcp_send_list* querylast; + /** list of answer items to send to client */ + struct tcp_send_list* answerlist; + /** last in answerlist */ + struct tcp_send_list* answerlast; + + /** next in list */ + struct tcp_proxy* next; +}; + /** usage information for delayer */ void usage(char* argv[]) { @@ -94,7 +139,8 @@ void usage(char* argv[]) printf(" -b addr : bind to this address to listen.\n"); printf(" -p port : bind to this port (use 0 for random).\n"); printf(" -m mem : use this much memory for waiting queries.\n"); - printf(" -d delay: queries are delayed n milliseconds.\n"); + printf(" -d delay: UDP queries are delayed n milliseconds.\n"); + printf(" TCP is delayed twice (on send, on recv).\n"); printf(" -h : this help message\n"); exit(1); } @@ -372,24 +418,21 @@ service_proxy(fd_set* rset, int retsock, struct proxy* proxies, static struct proxy* find_create_proxy(struct sockaddr_storage* from, socklen_t from_len, fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6, - struct timeval* now) + struct timeval* now, struct timeval* reuse_timeout) { struct proxy* p; struct timeval t; - struct timeval reuse_timeout; for(p = *proxies; p; p = p->next) { if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0) return p; } /* possibly: reuse lapsed entries */ - reuse_timeout.tv_sec = 1; - reuse_timeout.tv_usec = 0; for(p = *proxies; p; p = p->next) { if(p->numwait > p->numsent || p->numsent > p->numreturn) continue; t = *now; dl_tv_subtract(&t, &p->lastuse); - if(dl_tv_smaller(&t, &reuse_timeout)) + if(dl_tv_smaller(&t, reuse_timeout)) continue; /* yes! */ verbose(1, "reuse existing entry"); @@ -399,7 +442,7 @@ find_create_proxy(struct sockaddr_storage* from, socklen_t from_len, return p; } /* create new */ - p = calloc(1, sizeof(*p)); + p = (struct proxy*)calloc(1, sizeof(*p)); if(!p) fatal_exit("out of memory"); p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0); if(p->s == -1) fatal_exit("socket: %s", strerror(errno)); @@ -419,7 +462,7 @@ static void service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt, fd_set* rorig, int* max, struct proxy** proxies, struct sockaddr_storage* srv_addr, socklen_t srv_len, - struct timeval* now, struct timeval* delay) + struct timeval* now, struct timeval* delay, struct timeval* reuse) { int i; struct sockaddr_storage from; @@ -439,7 +482,7 @@ service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt, ldns_buffer_set_limit(pkt, (size_t)len); /* find its proxy element */ p = find_create_proxy(&from, from_len, rorig, max, proxies, - addr_is_ip6(srv_addr, srv_len), now); + addr_is_ip6(srv_addr, srv_len), now, reuse); if(!p) fatal_exit("error: cannot find or create proxy"); p->lastuse = *now; ring_add(ring, pkt, now, delay, p); @@ -448,15 +491,299 @@ service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt, } } +/** delete tcp proxy */ +static void +tcp_proxy_delete(struct tcp_proxy* p) +{ + struct tcp_send_list* s, *sn; + if(!p) + return; + log_addr(1, "delete tcp proxy", &p->addr, p->addr_len); + s = p->querylist; + while(s) { + sn = s->next; + free(s->item); + free(s); + s = sn; + } + s = p->answerlist; + while(s) { + sn = s->next; + free(s->item); + free(s); + s = sn; + } + close(p->client_s); + if(p->server_s != -1) + close(p->server_s); + free(p); +} + +/** accept new TCP connections, and set them up */ +static void +service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies, + struct sockaddr_storage* srv_addr, socklen_t srv_len, + struct timeval* now, struct timeval* tcp_timeout) +{ + int newfd; + struct sockaddr_storage addr; + struct tcp_proxy* p; + socklen_t addr_len; + newfd = accept(s, (struct sockaddr*)&addr, &addr_len); + if(newfd == -1) { + if(errno == EAGAIN || errno == EINTR) + return; + fatal_exit("accept: %s", strerror(errno)); + } + p = (struct tcp_proxy*)calloc(1, sizeof(*p)); + if(!p) fatal_exit("out of memory"); + memmove(&p->addr, &addr, addr_len); + p->addr_len = addr_len; + log_addr(1, "new tcp proxy", &p->addr, p->addr_len); + p->client_s = newfd; + p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET, + SOCK_STREAM, 0); + if(p->server_s == -1) + fatal_exit("tcp socket: %s", strerror(errno)); + fd_set_nonblock(p->client_s); + fd_set_nonblock(p->server_s); + if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) { + if(errno != EINPROGRESS) { + log_err("tcp connect: %s", strerror(errno)); + close(p->server_s); + close(p->client_s); + free(p); + return; + } + } + p->timeout = *now; + dl_tv_add(&p->timeout, tcp_timeout); + + /* listen to client and server */ + FD_SET(p->client_s, rorig); + FD_SET(p->server_s, rorig); + if(p->client_s+1 > *max) + *max = p->client_s+1; + if(p->server_s+1 > *max) + *max = p->server_s+1; + + /* add into proxy list */ + p->next = *proxies; + *proxies = p; +} + +/** relay TCP, read a part */ +static int +tcp_relay_read(int s, struct tcp_send_list** first, + struct tcp_send_list** last, struct timeval* now, + struct timeval* delay, ldns_buffer* pkt) +{ + struct tcp_send_list* item; + ssize_t r = read(s, ldns_buffer_begin(pkt), ldns_buffer_capacity(pkt)); + if(r == -1) { + if(errno == EINTR || errno == EAGAIN) + return 1; + log_err("tcp read: %s", strerror(errno)); + return 0; + } else if(r == 0) { + /* connection closed */ + return 0; + } + item = (struct tcp_send_list*)malloc(sizeof(*item)); + if(!item) { + log_err("out of memory"); + return 0; + } + verbose(1, "read item len %d", (int)r); + item->len = (size_t)r; + item->item = memdup(ldns_buffer_begin(pkt), item->len); + if(!item->item) { + free(item); + log_err("out of memory"); + return 0; + } + item->done = 0; + item->wait = *now; + dl_tv_add(&item->wait, delay); + item->next = NULL; + + /* link in */ + if(*first) { + (*last)->next = item; + } else { + *first = item; + } + *last = item; + return 1; +} + +/** relay TCP, write a part */ +static int +tcp_relay_write(int s, struct tcp_send_list** first, + struct tcp_send_list** last, struct timeval* now) +{ + ssize_t r; + struct tcp_send_list* p; + while(*first) { + p = *first; + /* is the item ready? */ + if(!dl_tv_smaller(&p->wait, now)) + return 1; + /* write it */ + r = write(s, p->item + p->done, p->len - p->done); + if(r == -1) { + if(errno == EAGAIN || errno == EINTR) + return 1; + log_err("tcp write: %s", strerror(errno)); + return 0; + } else if(r == 0) { + /* closed */ + return 0; + } + /* account it */ + p->done += (size_t)r; + verbose(1, "write item %d of %d", (int)p->done, (int)p->len); + if(p->done >= p->len) { + free(p->item); + *first = p->next; + if(!*first) + *last = NULL; + free(p); + } else { + /* partial write */ + return 1; + } + } + return 1; +} + +/** perform TCP relaying */ +static void +service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now, + struct timeval* delay, struct timeval* tcp_timeout, ldns_buffer* pkt, + fd_set* rset, fd_set* rorig, fd_set* worig) +{ + struct tcp_proxy* p, **prev; + struct timeval tout; + int delete_it; + p = *tcp_proxies; + prev = tcp_proxies; + tout = *now; + dl_tv_add(&tout, tcp_timeout); + + while(p) { + delete_it = 0; + /* can we receive further queries? */ + if(!delete_it && FD_ISSET(p->client_s, rset)) { + p->timeout = tout; + log_addr(1, "read tcp query", &p->addr, p->addr_len); + if(!tcp_relay_read(p->client_s, &p->querylist, + &p->querylast, now, delay, pkt)) + delete_it = 1; + } + /* can we receive further answers? */ + if(!delete_it && p->server_s != -1 && + FD_ISSET(p->server_s, rset)) { + p->timeout = tout; + log_addr(1, "read tcp answer", &p->addr, p->addr_len); + if(!tcp_relay_read(p->server_s, &p->answerlist, + &p->answerlast, now, delay, pkt)) { + close(p->server_s); + FD_CLR(p->server_s, worig); + FD_CLR(p->server_s, rorig); + p->server_s = -1; + } + } + /* can we send on further queries */ + if(!delete_it && p->querylist && p->server_s != -1) { + p->timeout = tout; + if(dl_tv_smaller(&p->querylist->wait, now)) + log_addr(1, "write tcp query", + &p->addr, p->addr_len); + if(!tcp_relay_write(p->server_s, &p->querylist, + &p->querylast, now)) + delete_it = 1; + if(p->querylist && p->server_s != -1 && + dl_tv_smaller(&p->querylist->wait, now)) + FD_SET(p->server_s, worig); + else FD_CLR(p->server_s, worig); + } + + /* can we send on further answers */ + if(!delete_it && p->answerlist) { + p->timeout = tout; + if(dl_tv_smaller(&p->answerlist->wait, now)) + log_addr(1, "write tcp answer", + &p->addr, p->addr_len); + if(!tcp_relay_write(p->client_s, &p->answerlist, + &p->answerlast, now)) + delete_it = 1; + if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, + now)) + FD_SET(p->client_s, worig); + else FD_CLR(p->client_s, worig); + if(!p->answerlist && p->server_s == -1) + delete_it = 1; + } + + /* does this entry timeout? (unused too long) */ + if(dl_tv_smaller(&p->timeout, now)) { + delete_it = 1; + } + if(delete_it) { + struct tcp_proxy* np = p->next; + *prev = np; + FD_CLR(p->client_s, rorig); + FD_CLR(p->client_s, worig); + if(p->server_s != -1) { + FD_CLR(p->server_s, rorig); + FD_CLR(p->server_s, worig); + } + tcp_proxy_delete(p); + p = np; + continue; + } + + prev = &p->next; + p = p->next; + } +} + /** find waiting time */ static int service_findwait(struct timeval* now, struct timeval* wait, - struct ringbuf* ring) + struct ringbuf* ring, struct tcp_proxy* tcplist) { /* first item is the time to wait */ struct timeval* peek = ring_peek_time(ring); + struct timeval tcv; + int have_tcpval = 0; + struct tcp_proxy* p; + + /* also for TCP list the first in sendlists is the time to wait */ + for(p=tcplist; p; p=p->next) { + if(!have_tcpval) + tcv = p->timeout; + have_tcpval = 1; + if(dl_tv_smaller(&p->timeout, &tcv)) + tcv = p->timeout; + if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv)) + tcv = p->querylist->wait; + if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv)) + tcv = p->answerlist->wait; + } if(peek) { + /* peek can be unaligned */ + /* use wait as a temp variable */ memmove(wait, peek, sizeof(*wait)); + if(!have_tcpval) + tcv = *wait; + else if(dl_tv_smaller(wait, &tcv)) + tcv = *wait; + have_tcpval = 1; + } + if(have_tcpval) { + *wait = tcv; dl_tv_subtract(wait, now); return 1; } @@ -495,29 +822,51 @@ proxy_list_clear(struct proxy* p) } } +/** clear TCP proxy list */ +static void +tcp_proxy_list_clear(struct tcp_proxy* p) +{ + struct tcp_proxy* np; + while(p) { + np = p->next; + tcp_proxy_delete(p); + p = np; + } +} + /** delayer service loop */ static void -service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay, - struct sockaddr_storage* srv_addr, socklen_t srv_len, +service_loop(int udp_s, int listen_s, struct ringbuf* ring, + struct timeval* delay, struct timeval* reuse, + struct sockaddr_storage* srv_addr, socklen_t srv_len, ldns_buffer* pkt) { fd_set rset, rorig; + fd_set wset, worig; struct timeval now, wait; int max, have_wait = 0; struct proxy* proxies = NULL; + struct tcp_proxy* tcp_proxies = NULL; + struct timeval tcp_timeout; + tcp_timeout.tv_sec = 120; + tcp_timeout.tv_usec = 0; #ifndef S_SPLINT_S FD_ZERO(&rorig); + FD_ZERO(&worig); FD_SET(udp_s, &rorig); + FD_SET(listen_s, &rorig); #endif max = udp_s + 1; + if(listen_s + 1 > max) max = listen_s + 1; while(!do_quit) { /* wait for events */ rset = rorig; + wset = worig; if(have_wait) verbose(1, "wait for %d.%6.6d", (unsigned)wait.tv_sec, (unsigned)wait.tv_usec); else verbose(1, "wait"); - if(select(max, &rset, NULL, NULL, have_wait?&wait:NULL) < 0) { + if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) { if(errno == EAGAIN || errno == EINTR) continue; fatal_exit("select: %s", strerror(errno)); @@ -528,8 +877,7 @@ service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay, continue; fatal_exit("gettimeofday: %s", strerror(errno)); } - verbose(1, " "); - verbose(1, "process at %u.%6.6u", + verbose(1, "process at %u.%6.6u\n", (unsigned)now.tv_sec, (unsigned)now.tv_usec); /* sendout delayed queries to master server (frees up buffer)*/ service_send(ring, &now, pkt, srv_addr, srv_len); @@ -537,11 +885,18 @@ service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay, service_proxy(&rset, udp_s, proxies, pkt, &now); /* see what can be received to start waiting */ service_recv(udp_s, ring, pkt, &rorig, &max, &proxies, - srv_addr, srv_len, &now, delay); + srv_addr, srv_len, &now, delay, reuse); + /* see if there are new tcp connections */ + service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies, + srv_addr, srv_len, &now, &tcp_timeout); + /* service tcp connections */ + service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout, + pkt, &rset, &rorig, &worig); /* see what next timeout is (if any) */ - have_wait = service_findwait(&now, &wait, ring); + have_wait = service_findwait(&now, &wait, ring, tcp_proxies); } proxy_list_clear(proxies); + tcp_proxy_list_clear(tcp_proxies); } /** delayer main service routine */ @@ -552,11 +907,17 @@ service(char* bind_str, int bindport, char* serv_str, size_t memsize, struct sockaddr_storage bind_addr, srv_addr; socklen_t bind_len, srv_len; struct ringbuf* ring = ring_create(memsize); - struct timeval delay; + struct timeval delay, reuse; ldns_buffer* pkt; - int i, s; + int i, s, listen_s; delay.tv_sec = delay_msec / 1000; delay.tv_usec = (delay_msec % 1000)*1000; + reuse = delay; /* reuse is max(4*delay, 1 second) */ + dl_tv_add(&reuse, &delay); + dl_tv_add(&reuse, &delay); + dl_tv_add(&reuse, &delay); + if(reuse.tv_sec == 0) + reuse.tv_sec = 1; if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) { printf("cannot parse forward address: %s\n", serv_str); exit(1); @@ -592,15 +953,35 @@ service(char* bind_str, int bindport, char* serv_str, size_t memsize, } else break; } fd_set_nonblock(s); + /* and TCP port */ + if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET, + SOCK_STREAM, 0)) == -1) + fatal_exit("tcp socket: %s", strerror(errno)); +#ifdef SO_REUSEADDR + if(1) { + int on = 1; + if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, &on, + (socklen_t)sizeof(on)) < 0) + fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s", + strerror(errno)); + } +#endif + if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1) + fatal_exit("tcp bind: %s", strerror(errno)); + if(listen(listen_s, 5) == -1) + fatal_exit("tcp listen: %s", strerror(errno)); + fd_set_nonblock(listen_s); printf("listening on port: %d\n", bindport); /* process loop */ do_quit = 0; - service_loop(s, ring, &delay, &srv_addr, srv_len, pkt); + service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len, + pkt); /* cleanup */ verbose(1, "cleanup"); close(s); + close(listen_s); ldns_buffer_free(pkt); ring_delete(ring); } @@ -620,7 +1001,7 @@ int main(int argc, char** argv) size_t memsize = 10*1024*1024; int delay = 100; - verbosity = 0; + verbosity = 1; log_init(0, 0, 0); log_ident_set("delayer"); srandom(time(NULL) ^ getpid()); -- 2.47.2