From: Wouter Wijngaards Date: Wed, 7 Feb 2007 15:44:19 +0000 (+0000) Subject: tcp queries get answers. X-Git-Tag: release-0.0~49 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b9f0fb076f820f87e4c8398cc4cb23d0a5774cbf;p=thirdparty%2Funbound.git tcp queries get answers. git-svn-id: file:///svn/unbound/trunk@76 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/util/netevent.c b/util/netevent.c index bbead76e3..6824afbd2 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -44,8 +44,8 @@ #include /* -------- Start of local definitions -------- */ -/** The TCP reading query timeout in seconds */ -#define TCP_READ_TIMEOUT 120 +/** The TCP reading or writing query timeout in seconds */ +#define TCP_QUERY_TIMEOUT 120 /* We define libevent structures here to hide the libevent stuff. */ @@ -255,7 +255,7 @@ setup_tcp_handler(struct comm_point* c, int fd) ldns_buffer_clear(c->buffer); c->tcp_is_reading = 1; c->tcp_byte_count = 0; - comm_point_start_listening(c, fd, TCP_READ_TIMEOUT); + comm_point_start_listening(c, fd, TCP_QUERY_TIMEOUT); } static void @@ -313,64 +313,74 @@ reclaim_tcp_handler(struct comm_point* c) } } +/** do the callback when writing is done */ +static void +tcp_callback_writer(struct comm_point* c) +{ + log_assert(c->type == comm_tcp); + ldns_buffer_clear(c->buffer); + c->tcp_is_reading = 1; + c->tcp_byte_count = 0; + comm_point_stop_listening(c); + /* for listening socket */ + reclaim_tcp_handler(c); +} + /** do the callback when reading is done */ static void tcp_callback_reader(struct comm_point* c) { + struct comm_reply rep; log_assert(c->type == comm_tcp); ldns_buffer_flip(c->buffer); c->tcp_is_reading = 0; c->tcp_byte_count = 0; comm_point_stop_listening(c); - if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL) ) { - /* setup to send reply */ + rep.c = c; + rep.addrlen = 0; + if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &rep) ) { + comm_point_start_listening(c, -1, TCP_QUERY_TIMEOUT); } } -static void -comm_point_tcp_handle_callback(int fd, short event, void* arg) +/** Handle tcp reading callback. + * @param fd: file descriptor of socket. + * @param c: comm point to read from into buffer. + * @return: 0 on error + */ +static int +comm_point_tcp_handle_read(int fd, struct comm_point* c) { - struct comm_point* c = (struct comm_point*)arg; ssize_t r; - log_info("callback tcphandle for %x", (int)c); log_assert(c->type == comm_tcp); - - if(!(event&EV_READ)) { - if(event&EV_TIMEOUT) { - verbose(VERB_DETAIL, "tcp read took to long drop"); - reclaim_tcp_handler(c); - return; - } - log_err("Ignored event %d for tcphdl.", event); - return; - } - - if(!c->tcp_is_reading) { - reclaim_tcp_handler(c); - return; - } + if(!c->tcp_is_reading) + return 0; if(c->tcp_byte_count < sizeof(uint16_t)) { /* read length bytes */ r = read(fd, ldns_buffer_at(c->buffer, c->tcp_byte_count), sizeof(uint16_t)-c->tcp_byte_count); - if(r == 0) { - reclaim_tcp_handler(c); - return; - } else if(r == -1) { - if(errno != EINTR && errno != EAGAIN) - log_err("read (in tcp s): %s", strerror(errno)); - return; + if(r == 0) + return 0; + else if(r == -1) { + if(errno == EINTR || errno == EAGAIN) + return 1; + log_err("read (in tcp s): %s", strerror(errno)); + return 0; } c->tcp_byte_count += r; if(c->tcp_byte_count != sizeof(uint16_t)) - return; + return 1; + if(ldns_buffer_read_u16_at(c->buffer, 0) > + ldns_buffer_capacity(c->buffer)) { + verbose(VERB_DETAIL, "tcp: dropped larger than buffer"); + return 0; + } ldns_buffer_set_limit(c->buffer, ldns_buffer_read_u16_at(c->buffer, 0)); if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) { verbose(VERB_DETAIL, "tcp: dropped bogus too short."); - reclaim_tcp_handler(c); - return; + return 0; } log_info("Reading tcp query of length %d", ldns_buffer_limit(c->buffer)); @@ -379,17 +389,97 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg) r = read(fd, ldns_buffer_current(c->buffer), ldns_buffer_remaining(c->buffer)); if(r == 0) { - reclaim_tcp_handler(c); - return; + return 0; } else if(r == -1) { - if(errno != EINTR && errno != EAGAIN) - log_err("read (in tcp r): %s", strerror(errno)); - return; + if(errno == EINTR || errno == EAGAIN) + return 1; + log_err("read (in tcp r): %s", strerror(errno)); + return 0; } ldns_buffer_skip(c->buffer, r); if(ldns_buffer_remaining(c->buffer) <= 0) { tcp_callback_reader(c); } + return 1; +} + +/** Handle tcp writing callback. + * @param fd: file descriptor of socket. + * @param c: comm point to write buffer out of. + * @return: 0 on error + */ +static int +comm_point_tcp_handle_write(int fd, struct comm_point* c) +{ + ssize_t r; + log_assert(c->type == comm_tcp); + if(c->tcp_is_reading) + return 0; + + if(c->tcp_byte_count < sizeof(uint16_t)) { + uint16_t len = htons(ldns_buffer_limit(c->buffer)); + r = write(fd, &len, sizeof(uint16_t)-c->tcp_byte_count); + if(r == -1) { + if(errno == EINTR || errno == EAGAIN) + return 1; + log_err("tcp write(s): %s", strerror(errno)); + return 0; + } + c->tcp_byte_count += r; + if(c->tcp_byte_count != sizeof(uint16_t)) + return 1; + ldns_buffer_set_position(c->buffer, 0); + } + r = write(fd, ldns_buffer_current(c->buffer), + ldns_buffer_remaining(c->buffer)); + if(r == -1) { + if(errno == EINTR || errno == EAGAIN) + return 1; + log_err("tcp write(w): %s", strerror(errno)); + return 0; + } + ldns_buffer_skip(c->buffer, r); + + if(ldns_buffer_remaining(c->buffer) == 0) { + tcp_callback_writer(c); + } + + return 1; +} + +static void +comm_point_tcp_handle_callback(int fd, short event, void* arg) +{ + struct comm_point* c = (struct comm_point*)arg; + log_assert(c->type == comm_tcp); + + if(event&EV_READ) { + if(!comm_point_tcp_handle_read(fd, c)) { + reclaim_tcp_handler(c); + if(!c->tcp_do_close) + (void)(*c->callback)(c, c->cb_arg, + NETEVENT_CLOSED, NULL); + } + return; + } + if(event&EV_WRITE) { + if(!comm_point_tcp_handle_write(fd, c)) { + reclaim_tcp_handler(c); + if(!c->tcp_do_close) + (void)(*c->callback)(c, c->cb_arg, + NETEVENT_CLOSED, NULL); + } + return; + } + if(event&EV_TIMEOUT) { + verbose(VERB_DETAIL, "tcp took too long, dropped"); + reclaim_tcp_handler(c); + if(!c->tcp_do_close) + (void)(*c->callback)(c, c->cb_arg, + NETEVENT_TIMEOUT, NULL); + return; + } + log_err("Ignored event %d for tcphdl.", event); } struct comm_point* @@ -599,13 +689,13 @@ comm_point_send_reply(struct comm_reply *repinfo) comm_point_send_udp_msg(repinfo->c, repinfo->c->buffer, (struct sockaddr*)&repinfo->addr, repinfo->addrlen); } else { - log_info("tcp reply"); - /* TODO */ + comm_point_start_listening(repinfo->c, -1, TCP_QUERY_TIMEOUT); } } void comm_point_stop_listening(struct comm_point* c) { + log_info("comm point stop listening %x", (int)c); if(event_del(&c->ev->ev) != 0) { log_err("event_del error to stoplisten"); } @@ -613,6 +703,7 @@ void comm_point_stop_listening(struct comm_point* c) void comm_point_start_listening(struct comm_point* c, int newfd, int sec) { + log_info("comm point start listening %x", (int)c); if(c->type == comm_tcp_accept && !c->tcp_free) { /* no use to start listening no free slots. */ return; @@ -626,13 +717,23 @@ void comm_point_start_listening(struct comm_point* c, int newfd, int sec) return; } } +#ifndef S_SPLINT_S /* splint fails on struct timeval. */ c->timeout->tv_sec = sec; c->timeout->tv_usec = 0; +#endif /* S_SPLINT_S */ + } + if(c->type == comm_tcp) { + c->ev->ev.ev_events &= ~(EV_READ|EV_WRITE); + if(c->tcp_is_reading) + c->ev->ev.ev_events |= EV_READ; + else c->ev->ev.ev_events |= EV_WRITE; + } + if(newfd != -1) { + if(c->fd != -1) + close(c->fd); + c->fd = newfd; + c->ev->ev.ev_fd = c->fd; } - if(c->fd != -1) - close(c->fd); - c->fd = newfd; - c->ev->ev.ev_fd = c->fd; if(event_add(&c->ev->ev, sec==0?NULL:c->timeout) != 0) { log_err("event_add failed. in cpsl."); }