struct proxy* next;
};
+/**
+ * An item that has to be TCP relayed
+ */
+struct tcp_send_list {
+ /** the data item */
+ uint8_t* item;
+ /** size of item */
+ size_t len;
+ /** time when the item can be transmitted on */
+ struct timeval wait;
+ /** how much of the item has already been transmitted */
+ size_t done;
+ /** next in list */
+ struct tcp_send_list* next;
+};
+
+/**
+ * List of TCP proxy fd pairs to TCP connect client to server
+ */
+struct tcp_proxy {
+ /** the fd to listen for client query */
+ int client_s;
+ /** the fd to listen for server answer */
+ int server_s;
+
+ /** remote client address */
+ struct sockaddr_storage addr;
+ /** length of address */
+ socklen_t addr_len;
+ /** timeout on this entry */
+ struct timeval timeout;
+
+ /** list of query items to send to server */
+ struct tcp_send_list* querylist;
+ /** last in query list */
+ struct tcp_send_list* querylast;
+ /** list of answer items to send to client */
+ struct tcp_send_list* answerlist;
+ /** last in answerlist */
+ struct tcp_send_list* answerlast;
+
+ /** next in list */
+ struct tcp_proxy* next;
+};
+
/** usage information for delayer */
void usage(char* argv[])
{
printf(" -b addr : bind to this address to listen.\n");
printf(" -p port : bind to this port (use 0 for random).\n");
printf(" -m mem : use this much memory for waiting queries.\n");
- printf(" -d delay: queries are delayed n milliseconds.\n");
+ printf(" -d delay: UDP queries are delayed n milliseconds.\n");
+ printf(" TCP is delayed twice (on send, on recv).\n");
printf(" -h : this help message\n");
exit(1);
}
static struct proxy*
find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6,
- struct timeval* now)
+ struct timeval* now, struct timeval* reuse_timeout)
{
struct proxy* p;
struct timeval t;
- struct timeval reuse_timeout;
for(p = *proxies; p; p = p->next) {
if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0)
return p;
}
/* possibly: reuse lapsed entries */
- reuse_timeout.tv_sec = 1;
- reuse_timeout.tv_usec = 0;
for(p = *proxies; p; p = p->next) {
if(p->numwait > p->numsent || p->numsent > p->numreturn)
continue;
t = *now;
dl_tv_subtract(&t, &p->lastuse);
- if(dl_tv_smaller(&t, &reuse_timeout))
+ if(dl_tv_smaller(&t, reuse_timeout))
continue;
/* yes! */
verbose(1, "reuse existing entry");
return p;
}
/* create new */
- p = calloc(1, sizeof(*p));
+ p = (struct proxy*)calloc(1, sizeof(*p));
if(!p) fatal_exit("out of memory");
p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0);
if(p->s == -1) fatal_exit("socket: %s", strerror(errno));
service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt,
fd_set* rorig, int* max, struct proxy** proxies,
struct sockaddr_storage* srv_addr, socklen_t srv_len,
- struct timeval* now, struct timeval* delay)
+ struct timeval* now, struct timeval* delay, struct timeval* reuse)
{
int i;
struct sockaddr_storage from;
ldns_buffer_set_limit(pkt, (size_t)len);
/* find its proxy element */
p = find_create_proxy(&from, from_len, rorig, max, proxies,
- addr_is_ip6(srv_addr, srv_len), now);
+ addr_is_ip6(srv_addr, srv_len), now, reuse);
if(!p) fatal_exit("error: cannot find or create proxy");
p->lastuse = *now;
ring_add(ring, pkt, now, delay, p);
}
}
+/** delete tcp proxy */
+static void
+tcp_proxy_delete(struct tcp_proxy* p)
+{
+ struct tcp_send_list* s, *sn;
+ if(!p)
+ return;
+ log_addr(1, "delete tcp proxy", &p->addr, p->addr_len);
+ s = p->querylist;
+ while(s) {
+ sn = s->next;
+ free(s->item);
+ free(s);
+ s = sn;
+ }
+ s = p->answerlist;
+ while(s) {
+ sn = s->next;
+ free(s->item);
+ free(s);
+ s = sn;
+ }
+ close(p->client_s);
+ if(p->server_s != -1)
+ close(p->server_s);
+ free(p);
+}
+
+/** accept new TCP connections, and set them up */
+static void
+service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies,
+ struct sockaddr_storage* srv_addr, socklen_t srv_len,
+ struct timeval* now, struct timeval* tcp_timeout)
+{
+ int newfd;
+ struct sockaddr_storage addr;
+ struct tcp_proxy* p;
+ socklen_t addr_len;
+ newfd = accept(s, (struct sockaddr*)&addr, &addr_len);
+ if(newfd == -1) {
+ if(errno == EAGAIN || errno == EINTR)
+ return;
+ fatal_exit("accept: %s", strerror(errno));
+ }
+ p = (struct tcp_proxy*)calloc(1, sizeof(*p));
+ if(!p) fatal_exit("out of memory");
+ memmove(&p->addr, &addr, addr_len);
+ p->addr_len = addr_len;
+ log_addr(1, "new tcp proxy", &p->addr, p->addr_len);
+ p->client_s = newfd;
+ p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET,
+ SOCK_STREAM, 0);
+ if(p->server_s == -1)
+ fatal_exit("tcp socket: %s", strerror(errno));
+ fd_set_nonblock(p->client_s);
+ fd_set_nonblock(p->server_s);
+ if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) {
+ if(errno != EINPROGRESS) {
+ log_err("tcp connect: %s", strerror(errno));
+ close(p->server_s);
+ close(p->client_s);
+ free(p);
+ return;
+ }
+ }
+ p->timeout = *now;
+ dl_tv_add(&p->timeout, tcp_timeout);
+
+ /* listen to client and server */
+ FD_SET(p->client_s, rorig);
+ FD_SET(p->server_s, rorig);
+ if(p->client_s+1 > *max)
+ *max = p->client_s+1;
+ if(p->server_s+1 > *max)
+ *max = p->server_s+1;
+
+ /* add into proxy list */
+ p->next = *proxies;
+ *proxies = p;
+}
+
+/** relay TCP, read a part */
+static int
+tcp_relay_read(int s, struct tcp_send_list** first,
+ struct tcp_send_list** last, struct timeval* now,
+ struct timeval* delay, ldns_buffer* pkt)
+{
+ struct tcp_send_list* item;
+ ssize_t r = read(s, ldns_buffer_begin(pkt), ldns_buffer_capacity(pkt));
+ if(r == -1) {
+ if(errno == EINTR || errno == EAGAIN)
+ return 1;
+ log_err("tcp read: %s", strerror(errno));
+ return 0;
+ } else if(r == 0) {
+ /* connection closed */
+ return 0;
+ }
+ item = (struct tcp_send_list*)malloc(sizeof(*item));
+ if(!item) {
+ log_err("out of memory");
+ return 0;
+ }
+ verbose(1, "read item len %d", (int)r);
+ item->len = (size_t)r;
+ item->item = memdup(ldns_buffer_begin(pkt), item->len);
+ if(!item->item) {
+ free(item);
+ log_err("out of memory");
+ return 0;
+ }
+ item->done = 0;
+ item->wait = *now;
+ dl_tv_add(&item->wait, delay);
+ item->next = NULL;
+
+ /* link in */
+ if(*first) {
+ (*last)->next = item;
+ } else {
+ *first = item;
+ }
+ *last = item;
+ return 1;
+}
+
+/** relay TCP, write a part */
+static int
+tcp_relay_write(int s, struct tcp_send_list** first,
+ struct tcp_send_list** last, struct timeval* now)
+{
+ ssize_t r;
+ struct tcp_send_list* p;
+ while(*first) {
+ p = *first;
+ /* is the item ready? */
+ if(!dl_tv_smaller(&p->wait, now))
+ return 1;
+ /* write it */
+ r = write(s, p->item + p->done, p->len - p->done);
+ if(r == -1) {
+ if(errno == EAGAIN || errno == EINTR)
+ return 1;
+ log_err("tcp write: %s", strerror(errno));
+ return 0;
+ } else if(r == 0) {
+ /* closed */
+ return 0;
+ }
+ /* account it */
+ p->done += (size_t)r;
+ verbose(1, "write item %d of %d", (int)p->done, (int)p->len);
+ if(p->done >= p->len) {
+ free(p->item);
+ *first = p->next;
+ if(!*first)
+ *last = NULL;
+ free(p);
+ } else {
+ /* partial write */
+ return 1;
+ }
+ }
+ return 1;
+}
+
+/** perform TCP relaying */
+static void
+service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now,
+ struct timeval* delay, struct timeval* tcp_timeout, ldns_buffer* pkt,
+ fd_set* rset, fd_set* rorig, fd_set* worig)
+{
+ struct tcp_proxy* p, **prev;
+ struct timeval tout;
+ int delete_it;
+ p = *tcp_proxies;
+ prev = tcp_proxies;
+ tout = *now;
+ dl_tv_add(&tout, tcp_timeout);
+
+ while(p) {
+ delete_it = 0;
+ /* can we receive further queries? */
+ if(!delete_it && FD_ISSET(p->client_s, rset)) {
+ p->timeout = tout;
+ log_addr(1, "read tcp query", &p->addr, p->addr_len);
+ if(!tcp_relay_read(p->client_s, &p->querylist,
+ &p->querylast, now, delay, pkt))
+ delete_it = 1;
+ }
+ /* can we receive further answers? */
+ if(!delete_it && p->server_s != -1 &&
+ FD_ISSET(p->server_s, rset)) {
+ p->timeout = tout;
+ log_addr(1, "read tcp answer", &p->addr, p->addr_len);
+ if(!tcp_relay_read(p->server_s, &p->answerlist,
+ &p->answerlast, now, delay, pkt)) {
+ close(p->server_s);
+ FD_CLR(p->server_s, worig);
+ FD_CLR(p->server_s, rorig);
+ p->server_s = -1;
+ }
+ }
+ /* can we send on further queries */
+ if(!delete_it && p->querylist && p->server_s != -1) {
+ p->timeout = tout;
+ if(dl_tv_smaller(&p->querylist->wait, now))
+ log_addr(1, "write tcp query",
+ &p->addr, p->addr_len);
+ if(!tcp_relay_write(p->server_s, &p->querylist,
+ &p->querylast, now))
+ delete_it = 1;
+ if(p->querylist && p->server_s != -1 &&
+ dl_tv_smaller(&p->querylist->wait, now))
+ FD_SET(p->server_s, worig);
+ else FD_CLR(p->server_s, worig);
+ }
+
+ /* can we send on further answers */
+ if(!delete_it && p->answerlist) {
+ p->timeout = tout;
+ if(dl_tv_smaller(&p->answerlist->wait, now))
+ log_addr(1, "write tcp answer",
+ &p->addr, p->addr_len);
+ if(!tcp_relay_write(p->client_s, &p->answerlist,
+ &p->answerlast, now))
+ delete_it = 1;
+ if(p->answerlist && dl_tv_smaller(&p->answerlist->wait,
+ now))
+ FD_SET(p->client_s, worig);
+ else FD_CLR(p->client_s, worig);
+ if(!p->answerlist && p->server_s == -1)
+ delete_it = 1;
+ }
+
+ /* does this entry timeout? (unused too long) */
+ if(dl_tv_smaller(&p->timeout, now)) {
+ delete_it = 1;
+ }
+ if(delete_it) {
+ struct tcp_proxy* np = p->next;
+ *prev = np;
+ FD_CLR(p->client_s, rorig);
+ FD_CLR(p->client_s, worig);
+ if(p->server_s != -1) {
+ FD_CLR(p->server_s, rorig);
+ FD_CLR(p->server_s, worig);
+ }
+ tcp_proxy_delete(p);
+ p = np;
+ continue;
+ }
+
+ prev = &p->next;
+ p = p->next;
+ }
+}
+
/** find waiting time */
static int
service_findwait(struct timeval* now, struct timeval* wait,
- struct ringbuf* ring)
+ struct ringbuf* ring, struct tcp_proxy* tcplist)
{
/* first item is the time to wait */
struct timeval* peek = ring_peek_time(ring);
+ struct timeval tcv;
+ int have_tcpval = 0;
+ struct tcp_proxy* p;
+
+ /* also for TCP list the first in sendlists is the time to wait */
+ for(p=tcplist; p; p=p->next) {
+ if(!have_tcpval)
+ tcv = p->timeout;
+ have_tcpval = 1;
+ if(dl_tv_smaller(&p->timeout, &tcv))
+ tcv = p->timeout;
+ if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv))
+ tcv = p->querylist->wait;
+ if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv))
+ tcv = p->answerlist->wait;
+ }
if(peek) {
+ /* peek can be unaligned */
+ /* use wait as a temp variable */
memmove(wait, peek, sizeof(*wait));
+ if(!have_tcpval)
+ tcv = *wait;
+ else if(dl_tv_smaller(wait, &tcv))
+ tcv = *wait;
+ have_tcpval = 1;
+ }
+ if(have_tcpval) {
+ *wait = tcv;
dl_tv_subtract(wait, now);
return 1;
}
}
}
+/** clear TCP proxy list */
+static void
+tcp_proxy_list_clear(struct tcp_proxy* p)
+{
+ struct tcp_proxy* np;
+ while(p) {
+ np = p->next;
+ tcp_proxy_delete(p);
+ p = np;
+ }
+}
+
/** delayer service loop */
static void
-service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay,
- struct sockaddr_storage* srv_addr, socklen_t srv_len,
+service_loop(int udp_s, int listen_s, struct ringbuf* ring,
+ struct timeval* delay, struct timeval* reuse,
+ struct sockaddr_storage* srv_addr, socklen_t srv_len,
ldns_buffer* pkt)
{
fd_set rset, rorig;
+ fd_set wset, worig;
struct timeval now, wait;
int max, have_wait = 0;
struct proxy* proxies = NULL;
+ struct tcp_proxy* tcp_proxies = NULL;
+ struct timeval tcp_timeout;
+ tcp_timeout.tv_sec = 120;
+ tcp_timeout.tv_usec = 0;
#ifndef S_SPLINT_S
FD_ZERO(&rorig);
+ FD_ZERO(&worig);
FD_SET(udp_s, &rorig);
+ FD_SET(listen_s, &rorig);
#endif
max = udp_s + 1;
+ if(listen_s + 1 > max) max = listen_s + 1;
while(!do_quit) {
/* wait for events */
rset = rorig;
+ wset = worig;
if(have_wait)
verbose(1, "wait for %d.%6.6d",
(unsigned)wait.tv_sec, (unsigned)wait.tv_usec);
else verbose(1, "wait");
- if(select(max, &rset, NULL, NULL, have_wait?&wait:NULL) < 0) {
+ if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) {
if(errno == EAGAIN || errno == EINTR)
continue;
fatal_exit("select: %s", strerror(errno));
continue;
fatal_exit("gettimeofday: %s", strerror(errno));
}
- verbose(1, " ");
- verbose(1, "process at %u.%6.6u",
+ verbose(1, "process at %u.%6.6u\n",
(unsigned)now.tv_sec, (unsigned)now.tv_usec);
/* sendout delayed queries to master server (frees up buffer)*/
service_send(ring, &now, pkt, srv_addr, srv_len);
service_proxy(&rset, udp_s, proxies, pkt, &now);
/* see what can be received to start waiting */
service_recv(udp_s, ring, pkt, &rorig, &max, &proxies,
- srv_addr, srv_len, &now, delay);
+ srv_addr, srv_len, &now, delay, reuse);
+ /* see if there are new tcp connections */
+ service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies,
+ srv_addr, srv_len, &now, &tcp_timeout);
+ /* service tcp connections */
+ service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout,
+ pkt, &rset, &rorig, &worig);
/* see what next timeout is (if any) */
- have_wait = service_findwait(&now, &wait, ring);
+ have_wait = service_findwait(&now, &wait, ring, tcp_proxies);
}
proxy_list_clear(proxies);
+ tcp_proxy_list_clear(tcp_proxies);
}
/** delayer main service routine */
struct sockaddr_storage bind_addr, srv_addr;
socklen_t bind_len, srv_len;
struct ringbuf* ring = ring_create(memsize);
- struct timeval delay;
+ struct timeval delay, reuse;
ldns_buffer* pkt;
- int i, s;
+ int i, s, listen_s;
delay.tv_sec = delay_msec / 1000;
delay.tv_usec = (delay_msec % 1000)*1000;
+ reuse = delay; /* reuse is max(4*delay, 1 second) */
+ dl_tv_add(&reuse, &delay);
+ dl_tv_add(&reuse, &delay);
+ dl_tv_add(&reuse, &delay);
+ if(reuse.tv_sec == 0)
+ reuse.tv_sec = 1;
if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) {
printf("cannot parse forward address: %s\n", serv_str);
exit(1);
} else break;
}
fd_set_nonblock(s);
+ /* and TCP port */
+ if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET,
+ SOCK_STREAM, 0)) == -1)
+ fatal_exit("tcp socket: %s", strerror(errno));
+#ifdef SO_REUSEADDR
+ if(1) {
+ int on = 1;
+ if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, &on,
+ (socklen_t)sizeof(on)) < 0)
+ fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s",
+ strerror(errno));
+ }
+#endif
+ if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1)
+ fatal_exit("tcp bind: %s", strerror(errno));
+ if(listen(listen_s, 5) == -1)
+ fatal_exit("tcp listen: %s", strerror(errno));
+ fd_set_nonblock(listen_s);
printf("listening on port: %d\n", bindport);
/* process loop */
do_quit = 0;
- service_loop(s, ring, &delay, &srv_addr, srv_len, pkt);
+ service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len,
+ pkt);
/* cleanup */
verbose(1, "cleanup");
close(s);
+ close(listen_s);
ldns_buffer_free(pkt);
ring_delete(ring);
}
size_t memsize = 10*1024*1024;
int delay = 100;
- verbosity = 0;
+ verbosity = 1;
log_init(0, 0, 0);
log_ident_set("delayer");
srandom(time(NULL) ^ getpid());