From: Wouter Wijngaards Date: Wed, 7 Feb 2007 14:18:42 +0000 (+0000) Subject: tcp input. X-Git-Tag: release-0.0~50 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fa56f5ece14c5fe46b4c46243870c02fed1a5c60;p=thirdparty%2Funbound.git tcp input. git-svn-id: file:///svn/unbound/trunk@75 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/doc/Changelog b/doc/Changelog index f0d4b9811..273f0dc8a 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -6,6 +6,7 @@ - set addrlen value when calling recvfrom. - comparison of addrs more portable. - LIBEVENT option for testbed to set libevent directory. + - work on tcp input. 6 February 2007: Wouter - reviewed code and improved in places. diff --git a/services/outside_network.c b/services/outside_network.c index 051df7a8a..aefad487a 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -59,31 +59,6 @@ /** byte size of ip6 address */ #define INET6_SIZE 16 -/** send addr to logfile */ -static void -log_addr(struct sockaddr_storage* addr, socklen_t addrlen) -{ - uint16_t port; - const char* family = "unknown"; - char dest[100]; - int af = (int)((struct sockaddr_in*)addr)->sin_family; - void* sinaddr = &((struct sockaddr_in*)addr)->sin_addr; - switch(af) { - case AF_INET: family="ip4"; break; - case AF_INET6: family="ip6"; - sinaddr = &((struct sockaddr_in6*)addr)->sin6_addr; - break; - case AF_UNIX: family="unix"; break; - default: break; - } - if(inet_ntop(af, sinaddr, dest, (socklen_t)sizeof(dest)) == 0) { - strncpy(dest, "(inet_ntop error)", sizeof(dest)); - } - port = ntohs(((struct sockaddr_in*)addr)->sin_port); - log_info("addr fam=%s port=%d dest=%s len=%d", - family, (int)port, dest, (int)addrlen); -} - /** compare function of pending rbtree */ static int pending_cmp(const void* key1, const void* key2) diff --git a/util/netevent.c b/util/netevent.c index 69bf44d6e..bbead76e3 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -44,6 +44,9 @@ #include /* -------- Start of local definitions -------- */ +/** The TCP reading query timeout in seconds */ +#define TCP_READ_TIMEOUT 120 + /* We define libevent structures here to hide the libevent stuff. */ /* we use libevent */ @@ -243,24 +246,150 @@ comm_point_udp_callback(int fd, short event, void* arg) } } +/** Use a new tcp handler for new query fd, set to read query. */ +static void +setup_tcp_handler(struct comm_point* c, int fd) +{ + log_assert(c->type == comm_tcp); + log_assert(c->fd == -1); + ldns_buffer_clear(c->buffer); + c->tcp_is_reading = 1; + c->tcp_byte_count = 0; + comm_point_start_listening(c, fd, TCP_READ_TIMEOUT); +} + static void -comm_point_tcp_accept_callback(int ATTR_UNUSED(fd), short ATTR_UNUSED(event), - void* arg) +comm_point_tcp_accept_callback(int fd, short event, void* arg) { - struct comm_point* c = (struct comm_point*)arg; + struct comm_point* c = (struct comm_point*)arg, *c_hdl; + struct comm_reply rep; + int new_fd; log_info("callback tcpaccept for %x", (int)c); log_assert(c->type == comm_tcp_accept); - /* TODO */ + if(!(event & EV_READ)) { + log_info("ignoring tcp accept event %d", (int)event); + return; + } + /* accept incoming connection. */ + rep.c = NULL; + rep.addrlen = (socklen_t)sizeof(rep.addr); + new_fd = accept(fd, (struct sockaddr*)&rep.addr, &rep.addrlen); + if(new_fd == -1) { + /* EINTR is signal interrupt. others are closed connection. */ + if(errno != EINTR && errno != EWOULDBLOCK && + errno != ECONNABORTED && errno != EPROTO) + log_err("accept failed: %s", strerror(errno)); + return; + } + /* find free tcp handler. */ + if(!c->tcp_free) { + log_err("accepted too many tcp, connections full, from:"); + log_addr(&rep.addr, rep.addrlen); + close(new_fd); + return; + } + /* grab it */ + c_hdl = c->tcp_free; + c->tcp_free = c_hdl->tcp_free; + if(!c->tcp_free) { + /* stop accepting incoming queries for now. */ + comm_point_stop_listening(c); + } + /* addr is dropped. Not needed for tcp reply. */ + setup_tcp_handler(c_hdl, new_fd); +} + +/** Make tcp handler free for next assignment. */ +static void +reclaim_tcp_handler(struct comm_point* c) +{ + log_assert(c->type == comm_tcp); + comm_point_close(c); + c->tcp_free = c->tcp_parent->tcp_free; + c->tcp_parent->tcp_free = c; + if(!c->tcp_free) { + /* re-enable listening on accept socket */ + comm_point_start_listening(c->tcp_parent, -1, -1); + } +} + +/** do the callback when reading is done */ +static void +tcp_callback_reader(struct comm_point* c) +{ + log_assert(c->type == comm_tcp); + ldns_buffer_flip(c->buffer); + c->tcp_is_reading = 0; + c->tcp_byte_count = 0; + comm_point_stop_listening(c); + if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL) ) { + /* setup to send reply */ + } } static void -comm_point_tcp_handle_callback(int ATTR_UNUSED(fd), short ATTR_UNUSED(event), - void* arg) +comm_point_tcp_handle_callback(int fd, short event, void* arg) { struct comm_point* c = (struct comm_point*)arg; - log_info("callback tcpaccept for %x", (int)c); + ssize_t r; + log_info("callback tcphandle for %x", (int)c); log_assert(c->type == comm_tcp); - /* TODO */ + + if(!(event&EV_READ)) { + if(event&EV_TIMEOUT) { + verbose(VERB_DETAIL, "tcp read took to long drop"); + reclaim_tcp_handler(c); + return; + } + log_err("Ignored event %d for tcphdl.", event); + return; + } + + if(!c->tcp_is_reading) { + reclaim_tcp_handler(c); + return; + } + + if(c->tcp_byte_count < sizeof(uint16_t)) { + /* read length bytes */ + r = read(fd, ldns_buffer_at(c->buffer, c->tcp_byte_count), + sizeof(uint16_t)-c->tcp_byte_count); + if(r == 0) { + reclaim_tcp_handler(c); + return; + } else if(r == -1) { + if(errno != EINTR && errno != EAGAIN) + log_err("read (in tcp s): %s", strerror(errno)); + return; + } + c->tcp_byte_count += r; + if(c->tcp_byte_count != sizeof(uint16_t)) + return; + ldns_buffer_set_limit(c->buffer, + ldns_buffer_read_u16_at(c->buffer, 0)); + if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) { + verbose(VERB_DETAIL, "tcp: dropped bogus too short."); + reclaim_tcp_handler(c); + return; + } + log_info("Reading tcp query of length %d", + ldns_buffer_limit(c->buffer)); + } + + r = read(fd, ldns_buffer_current(c->buffer), + ldns_buffer_remaining(c->buffer)); + if(r == 0) { + reclaim_tcp_handler(c); + return; + } else if(r == -1) { + if(errno != EINTR && errno != EAGAIN) + log_err("read (in tcp r): %s", strerror(errno)); + return; + } + ldns_buffer_skip(c->buffer, r); + if(ldns_buffer_remaining(c->buffer) <= 0) { + tcp_callback_reader(c); + } } struct comm_point* @@ -284,7 +413,6 @@ comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer, c->tcp_is_reading = 0; c->tcp_byte_count = 0; c->tcp_parent = NULL; - c->cur_tcp_count = 0; c->max_tcp_count = 0; c->tcp_handlers = NULL; c->tcp_free = NULL; @@ -323,11 +451,21 @@ comm_point_create_tcp_handler(struct comm_base *base, } c->fd = -1; c->buffer = ldns_buffer_new(bufsize); - c->timeout = NULL; + if(!c->buffer) { + free(c->ev); + free(c); + return NULL; + } + c->timeout = (struct timeval*)malloc(sizeof(struct timeval)); + if(!c->timeout) { + ldns_buffer_free(c->buffer); + free(c->ev); + free(c); + return NULL; + } c->tcp_is_reading = 0; c->tcp_byte_count = 0; c->tcp_parent = parent; - c->cur_tcp_count = 0; c->max_tcp_count = 0; c->tcp_handlers = NULL; c->tcp_free = NULL; @@ -376,7 +514,6 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize, c->tcp_is_reading = 0; c->tcp_byte_count = 0; c->tcp_parent = NULL; - c->cur_tcp_count = 0; c->max_tcp_count = num; c->tcp_handlers = (struct comm_point**)calloc((size_t)num, sizeof(struct comm_point*)); @@ -467,6 +604,40 @@ comm_point_send_reply(struct comm_reply *repinfo) } } +void comm_point_stop_listening(struct comm_point* c) +{ + if(event_del(&c->ev->ev) != 0) { + log_err("event_del error to stoplisten"); + } +} + +void comm_point_start_listening(struct comm_point* c, int newfd, int sec) +{ + if(c->type == comm_tcp_accept && !c->tcp_free) { + /* no use to start listening no free slots. */ + return; + } + if(sec != -1 && sec != 0) { + if(!c->timeout) { + c->timeout = (struct timeval*)malloc(sizeof( + struct timeval)); + if(!c->timeout) { + log_err("cpsl: malloc failed. No net read."); + return; + } + } + c->timeout->tv_sec = sec; + c->timeout->tv_usec = 0; + } + if(c->fd != -1) + close(c->fd); + c->fd = newfd; + c->ev->ev.ev_fd = c->fd; + if(event_add(&c->ev->ev, sec==0?NULL:c->timeout) != 0) { + log_err("event_add failed. in cpsl."); + } +} + struct comm_timer* comm_timer_create(struct comm_base* base, void (*cb)(void*), void* cb_arg) { @@ -607,3 +778,27 @@ comm_signal_delete(struct comm_signal* comsig) } free(comsig); } + +void +log_addr(struct sockaddr_storage* addr, socklen_t addrlen) +{ + uint16_t port; + const char* family = "unknown"; + char dest[100]; + int af = (int)((struct sockaddr_in*)addr)->sin_family; + void* sinaddr = &((struct sockaddr_in*)addr)->sin_addr; + switch(af) { + case AF_INET: family="ip4"; break; + case AF_INET6: family="ip6"; + sinaddr = &((struct sockaddr_in6*)addr)->sin6_addr; + break; + case AF_UNIX: family="unix"; break; + default: break; + } + if(inet_ntop(af, sinaddr, dest, (socklen_t)sizeof(dest)) == 0) { + strncpy(dest, "(inet_ntop error)", sizeof(dest)); + } + port = ntohs(((struct sockaddr_in*)addr)->sin_port); + log_info("addr fam=%s port=%d dest=%s len=%d", + family, (int)port, dest, (int)addrlen); +} diff --git a/util/netevent.h b/util/netevent.h index f6e045398..4c0508e16 100644 --- a/util/netevent.h +++ b/util/netevent.h @@ -119,8 +119,6 @@ struct comm_point { struct comm_point* tcp_parent; /* -------- TCP Accept -------- */ - /** current number of TCP connections on this socket */ - int cur_tcp_count; /** the number of TCP handlers for this tcp-accept socket */ int max_tcp_count; /** malloced array of tcp handlers for a tcp-accept, @@ -322,6 +320,20 @@ void comm_point_send_reply(struct comm_reply* repinfo); int comm_point_send_udp_msg(struct comm_point* c, ldns_buffer* packet, struct sockaddr* addr, socklen_t addrlen); +/** + * Stop listening for input on the commpoint. No callbacks will happen. + * @param c: commpoint to disable. The fd is not closed. + */ +void comm_point_stop_listening(struct comm_point* c); + +/** + * Start listening again for input on the comm point. + * @param c: commpoint to enable again. + * @param newfd: new fd, or -1 to leave fd be. + * @param sec: timeout in seconds, or -1 for no (change to the) timeout. + */ +void comm_point_start_listening(struct comm_point* c, int newfd, int sec); + /** * create timer. Not active upon creation. * @param base: event handling base. @@ -383,4 +395,11 @@ int comm_signal_bind(struct comm_signal* comsig, int sig); */ void comm_signal_delete(struct comm_signal* comsig); +/** + * Prints the sockaddr in readable format with log_info. Debug helper. + * @param addr: the sockaddr to print. Can be ip4 or ip6. + * @param addrlen: length of addr. + */ +void log_addr(struct sockaddr_storage* addr, socklen_t addrlen); + #endif /* NET_EVENT_H */