]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
tcp input.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Wed, 7 Feb 2007 14:18:42 +0000 (14:18 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Wed, 7 Feb 2007 14:18:42 +0000 (14:18 +0000)
git-svn-id: file:///svn/unbound/trunk@75 be551aaa-1e26-0410-a405-d3ace91eadb9

doc/Changelog
services/outside_network.c
util/netevent.c
util/netevent.h

index f0d4b9811bf28d23e679846c049fb4f98f76349f..273f0dc8ad50ee3e15f975b852bd2d63e10e4c78 100644 (file)
@@ -6,6 +6,7 @@
        - set addrlen value when calling recvfrom.
        - comparison of addrs more portable.
        - LIBEVENT option for testbed to set libevent directory.
+       - work on tcp input.
 
 6 February 2007: Wouter
        - reviewed code and improved in places.
index 051df7a8ad9961398184193902e9738481ab9d2a..aefad487a1bcbe5f7e1120f61cb32d45e1c65b12 100644 (file)
 /** byte size of ip6 address */
 #define INET6_SIZE 16 
 
-/** send addr to logfile */
-static void
-log_addr(struct sockaddr_storage* addr, socklen_t addrlen)
-{
-       uint16_t port;
-       const char* family = "unknown";
-       char dest[100];
-       int af = (int)((struct sockaddr_in*)addr)->sin_family;
-       void* sinaddr = &((struct sockaddr_in*)addr)->sin_addr;
-       switch(af) {
-               case AF_INET: family="ip4"; break;
-               case AF_INET6: family="ip6"; 
-                       sinaddr = &((struct sockaddr_in6*)addr)->sin6_addr;
-                       break;
-               case AF_UNIX: family="unix"; break;
-               default: break;
-       }
-       if(inet_ntop(af, sinaddr, dest, (socklen_t)sizeof(dest)) == 0) {
-               strncpy(dest, "(inet_ntop error)", sizeof(dest));
-       }
-       port = ntohs(((struct sockaddr_in*)addr)->sin_port);
-       log_info("addr fam=%s port=%d dest=%s len=%d",
-               family, (int)port, dest, (int)addrlen);
-}
-
 /** compare function of pending rbtree */
 static int 
 pending_cmp(const void* key1, const void* key2)
index 69bf44d6e70ed855c26df6443e970406ffb56a48..bbead76e31771e69bdb05d1077c3ea41f34abb41 100644 (file)
@@ -44,6 +44,9 @@
 #include <errno.h>
 
 /* -------- Start of local definitions -------- */
+/** The TCP reading query timeout in seconds */
+#define TCP_READ_TIMEOUT 120 
+
 /* We define libevent structures here to hide the libevent stuff. */
 
 /* we use libevent */
@@ -243,24 +246,150 @@ comm_point_udp_callback(int fd, short event, void* arg)
        }
 }
 
+/** Use a new tcp handler for new query fd, set to read query. */
+static void
+setup_tcp_handler(struct comm_point* c, int fd) 
+{
+       log_assert(c->type == comm_tcp);
+       log_assert(c->fd == -1);
+       ldns_buffer_clear(c->buffer);
+       c->tcp_is_reading = 1;
+       c->tcp_byte_count = 0;
+       comm_point_start_listening(c, fd, TCP_READ_TIMEOUT);
+}
+
 static void 
-comm_point_tcp_accept_callback(int ATTR_UNUSED(fd), short ATTR_UNUSED(event), 
-       void* arg)
+comm_point_tcp_accept_callback(int fd, short event, void* arg)
 {
-       struct comm_point* c = (struct comm_point*)arg;
+       struct comm_point* c = (struct comm_point*)arg, *c_hdl;
+       struct comm_reply rep;
+       int new_fd;
        log_info("callback tcpaccept for %x", (int)c);
        log_assert(c->type == comm_tcp_accept);
-       /* TODO */
+       if(!(event & EV_READ)) {
+               log_info("ignoring tcp accept event %d", (int)event);
+               return;
+       }
+       /* accept incoming connection. */
+       rep.c = NULL;
+       rep.addrlen = (socklen_t)sizeof(rep.addr);
+       new_fd = accept(fd, (struct sockaddr*)&rep.addr, &rep.addrlen);
+       if(new_fd == -1) {
+               /* EINTR is signal interrupt. others are closed connection. */
+               if(errno != EINTR && errno != EWOULDBLOCK && 
+                       errno != ECONNABORTED && errno != EPROTO)
+                       log_err("accept failed: %s", strerror(errno));
+               return;
+       }
+       /* find free tcp handler. */
+       if(!c->tcp_free) {
+               log_err("accepted too many tcp, connections full, from:");
+               log_addr(&rep.addr, rep.addrlen);
+               close(new_fd);
+               return;
+       }
+       /* grab it */
+       c_hdl = c->tcp_free;
+       c->tcp_free = c_hdl->tcp_free;
+       if(!c->tcp_free) {
+               /* stop accepting incoming queries for now. */
+               comm_point_stop_listening(c);
+       }
+       /* addr is dropped. Not needed for tcp reply. */
+       setup_tcp_handler(c_hdl, new_fd);
+}
+
+/** Make tcp handler free for next assignment. */
+static void
+reclaim_tcp_handler(struct comm_point* c)
+{
+       log_assert(c->type == comm_tcp);
+       comm_point_close(c);
+       c->tcp_free = c->tcp_parent->tcp_free;
+       c->tcp_parent->tcp_free = c;
+       if(!c->tcp_free) {
+               /* re-enable listening on accept socket */
+               comm_point_start_listening(c->tcp_parent, -1, -1);
+       }
+}
+
+/** do the callback when reading is done */
+static void
+tcp_callback_reader(struct comm_point* c)
+{
+       log_assert(c->type == comm_tcp);
+       ldns_buffer_flip(c->buffer);
+       c->tcp_is_reading = 0;
+       c->tcp_byte_count = 0;
+       comm_point_stop_listening(c);
+       if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL) ) {
+               /* setup to send reply */
+       }
 }
 
 static void 
-comm_point_tcp_handle_callback(int ATTR_UNUSED(fd), short ATTR_UNUSED(event), 
-       void* arg)
+comm_point_tcp_handle_callback(int fd, short event, void* arg)
 {
        struct comm_point* c = (struct comm_point*)arg;
-       log_info("callback tcpaccept for %x", (int)c);
+       ssize_t r;
+       log_info("callback tcphandle for %x", (int)c);
        log_assert(c->type == comm_tcp);
-       /* TODO */
+
+       if(!(event&EV_READ)) {
+               if(event&EV_TIMEOUT) {
+                       verbose(VERB_DETAIL, "tcp read took to long drop");
+                       reclaim_tcp_handler(c);
+                       return;
+               }
+               log_err("Ignored event %d for tcphdl.", event);
+               return;
+       }
+
+       if(!c->tcp_is_reading) {
+               reclaim_tcp_handler(c);
+               return;
+       }
+
+       if(c->tcp_byte_count < sizeof(uint16_t)) {
+               /* read length bytes */
+               r = read(fd, ldns_buffer_at(c->buffer, c->tcp_byte_count), 
+                       sizeof(uint16_t)-c->tcp_byte_count);
+               if(r == 0) {
+                       reclaim_tcp_handler(c);
+                       return;
+               } else if(r == -1) {
+                       if(errno != EINTR && errno != EAGAIN)
+                               log_err("read (in tcp s): %s", strerror(errno));
+                       return;
+               } 
+               c->tcp_byte_count += r;
+               if(c->tcp_byte_count != sizeof(uint16_t))
+                       return;
+               ldns_buffer_set_limit(c->buffer, 
+                       ldns_buffer_read_u16_at(c->buffer, 0));
+               if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
+                       verbose(VERB_DETAIL, "tcp: dropped bogus too short.");
+                       reclaim_tcp_handler(c);
+                       return;
+               }
+               log_info("Reading tcp query of length %d", 
+                       ldns_buffer_limit(c->buffer));
+       }
+
+       r = read(fd, ldns_buffer_current(c->buffer), 
+               ldns_buffer_remaining(c->buffer));
+       if(r == 0) {
+               reclaim_tcp_handler(c);
+               return;
+       } else if(r == -1) {
+               if(errno != EINTR && errno != EAGAIN)
+                       log_err("read (in tcp r): %s", strerror(errno));
+               return;
+       }
+       ldns_buffer_skip(c->buffer, r);
+       if(ldns_buffer_remaining(c->buffer) <= 0) {
+               tcp_callback_reader(c);
+       }
 }
 
 struct comm_point* 
@@ -284,7 +413,6 @@ comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
        c->tcp_is_reading = 0;
        c->tcp_byte_count = 0;
        c->tcp_parent = NULL;
-       c->cur_tcp_count = 0;
        c->max_tcp_count = 0;
        c->tcp_handlers = NULL;
        c->tcp_free = NULL;
@@ -323,11 +451,21 @@ comm_point_create_tcp_handler(struct comm_base *base,
        }
        c->fd = -1;
        c->buffer = ldns_buffer_new(bufsize);
-       c->timeout = NULL;
+       if(!c->buffer) {
+               free(c->ev);
+               free(c);
+               return NULL;
+       }
+       c->timeout = (struct timeval*)malloc(sizeof(struct timeval));
+       if(!c->timeout) {
+               ldns_buffer_free(c->buffer);
+               free(c->ev);
+               free(c);
+               return NULL;
+       }
        c->tcp_is_reading = 0;
        c->tcp_byte_count = 0;
        c->tcp_parent = parent;
-       c->cur_tcp_count = 0;
        c->max_tcp_count = 0;
        c->tcp_handlers = NULL;
        c->tcp_free = NULL;
@@ -376,7 +514,6 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize,
        c->tcp_is_reading = 0;
        c->tcp_byte_count = 0;
        c->tcp_parent = NULL;
-       c->cur_tcp_count = 0;
        c->max_tcp_count = num;
        c->tcp_handlers = (struct comm_point**)calloc((size_t)num,
                sizeof(struct comm_point*));
@@ -467,6 +604,40 @@ comm_point_send_reply(struct comm_reply *repinfo)
        }
 }
 
+void comm_point_stop_listening(struct comm_point* c)
+{
+       if(event_del(&c->ev->ev) != 0) {
+               log_err("event_del error to stoplisten");
+       }
+}
+
+void comm_point_start_listening(struct comm_point* c, int newfd, int sec)
+{
+       if(c->type == comm_tcp_accept && !c->tcp_free) {
+               /* no use to start listening no free slots. */
+               return;
+       }
+       if(sec != -1 && sec != 0) {
+               if(!c->timeout) {
+                       c->timeout = (struct timeval*)malloc(sizeof(
+                               struct timeval));
+                       if(!c->timeout) {
+                               log_err("cpsl: malloc failed. No net read.");
+                               return;
+                       }
+               }
+               c->timeout->tv_sec = sec;
+               c->timeout->tv_usec = 0;
+       }
+       if(c->fd != -1)
+               close(c->fd);
+       c->fd = newfd;
+       c->ev->ev.ev_fd = c->fd;
+       if(event_add(&c->ev->ev, sec==0?NULL:c->timeout) != 0) {
+               log_err("event_add failed. in cpsl.");
+       }
+}
+
 struct comm_timer* 
 comm_timer_create(struct comm_base* base, void (*cb)(void*), void* cb_arg)
 {
@@ -607,3 +778,27 @@ comm_signal_delete(struct comm_signal* comsig)
        }
        free(comsig);
 }
+
+void
+log_addr(struct sockaddr_storage* addr, socklen_t addrlen)
+{
+       uint16_t port;
+       const char* family = "unknown";
+       char dest[100];
+       int af = (int)((struct sockaddr_in*)addr)->sin_family;
+       void* sinaddr = &((struct sockaddr_in*)addr)->sin_addr;
+       switch(af) {
+               case AF_INET: family="ip4"; break;
+               case AF_INET6: family="ip6"; 
+                       sinaddr = &((struct sockaddr_in6*)addr)->sin6_addr;
+                       break;
+               case AF_UNIX: family="unix"; break;
+               default: break;
+       }
+       if(inet_ntop(af, sinaddr, dest, (socklen_t)sizeof(dest)) == 0) {
+               strncpy(dest, "(inet_ntop error)", sizeof(dest));
+       }
+       port = ntohs(((struct sockaddr_in*)addr)->sin_port);
+       log_info("addr fam=%s port=%d dest=%s len=%d",
+               family, (int)port, dest, (int)addrlen);
+}
index f6e045398e9ab3ba986e94ae61fdc7b1ab7df1b2..4c0508e16ae7231c165cc42fc9a638924d37fa72 100644 (file)
@@ -119,8 +119,6 @@ struct comm_point {
        struct comm_point* tcp_parent;
 
        /* -------- TCP Accept -------- */
-       /** current number of TCP connections on this socket */
-       int cur_tcp_count;
        /** the number of TCP handlers for this tcp-accept socket */
        int max_tcp_count;
        /** malloced array of tcp handlers for a tcp-accept, 
@@ -322,6 +320,20 @@ void comm_point_send_reply(struct comm_reply* repinfo);
 int comm_point_send_udp_msg(struct comm_point* c, ldns_buffer* packet,
        struct sockaddr* addr, socklen_t addrlen);
 
+/**
+ * Stop listening for input on the commpoint. No callbacks will happen.
+ * @param c: commpoint to disable. The fd is not closed.
+ */
+void comm_point_stop_listening(struct comm_point* c);
+
+/**
+ * Start listening again for input on the comm point.
+ * @param c: commpoint to enable again.
+ * @param newfd: new fd, or -1 to leave fd be.
+ * @param sec: timeout in seconds, or -1 for no (change to the) timeout.
+ */
+void comm_point_start_listening(struct comm_point* c, int newfd, int sec);
+
 /**
  * create timer. Not active upon creation.
  * @param base: event handling base.
@@ -383,4 +395,11 @@ int comm_signal_bind(struct comm_signal* comsig, int sig);
  */
 void comm_signal_delete(struct comm_signal* comsig);
 
+/**
+ * Prints the sockaddr in readable format with log_info. Debug helper.
+ * @param addr: the sockaddr to print. Can be ip4 or ip6.
+ * @param addrlen: length of addr.
+ */
+void log_addr(struct sockaddr_storage* addr, socklen_t addrlen);
+
 #endif /* NET_EVENT_H */