#include <errno.h>
/* -------- Start of local definitions -------- */
+/** The TCP reading query timeout in seconds */
+#define TCP_READ_TIMEOUT 120
+
/* We define libevent structures here to hide the libevent stuff. */
/* we use libevent */
}
}
+/** Use a new tcp handler for new query fd, set to read query. */
+static void
+setup_tcp_handler(struct comm_point* c, int fd)
+{
+ log_assert(c->type == comm_tcp);
+ log_assert(c->fd == -1);
+ ldns_buffer_clear(c->buffer);
+ c->tcp_is_reading = 1;
+ c->tcp_byte_count = 0;
+ comm_point_start_listening(c, fd, TCP_READ_TIMEOUT);
+}
+
static void
-comm_point_tcp_accept_callback(int ATTR_UNUSED(fd), short ATTR_UNUSED(event),
- void* arg)
+comm_point_tcp_accept_callback(int fd, short event, void* arg)
{
- struct comm_point* c = (struct comm_point*)arg;
+ struct comm_point* c = (struct comm_point*)arg, *c_hdl;
+ struct comm_reply rep;
+ int new_fd;
log_info("callback tcpaccept for %x", (int)c);
log_assert(c->type == comm_tcp_accept);
- /* TODO */
+ if(!(event & EV_READ)) {
+ log_info("ignoring tcp accept event %d", (int)event);
+ return;
+ }
+ /* accept incoming connection. */
+ rep.c = NULL;
+ rep.addrlen = (socklen_t)sizeof(rep.addr);
+ new_fd = accept(fd, (struct sockaddr*)&rep.addr, &rep.addrlen);
+ if(new_fd == -1) {
+ /* EINTR is signal interrupt. others are closed connection. */
+ if(errno != EINTR && errno != EWOULDBLOCK &&
+ errno != ECONNABORTED && errno != EPROTO)
+ log_err("accept failed: %s", strerror(errno));
+ return;
+ }
+ /* find free tcp handler. */
+ if(!c->tcp_free) {
+ log_err("accepted too many tcp, connections full, from:");
+ log_addr(&rep.addr, rep.addrlen);
+ close(new_fd);
+ return;
+ }
+ /* grab it */
+ c_hdl = c->tcp_free;
+ c->tcp_free = c_hdl->tcp_free;
+ if(!c->tcp_free) {
+ /* stop accepting incoming queries for now. */
+ comm_point_stop_listening(c);
+ }
+ /* addr is dropped. Not needed for tcp reply. */
+ setup_tcp_handler(c_hdl, new_fd);
+}
+
+/** Make tcp handler free for next assignment. */
+static void
+reclaim_tcp_handler(struct comm_point* c)
+{
+ log_assert(c->type == comm_tcp);
+ comm_point_close(c);
+ c->tcp_free = c->tcp_parent->tcp_free;
+ c->tcp_parent->tcp_free = c;
+ if(!c->tcp_free) {
+ /* re-enable listening on accept socket */
+ comm_point_start_listening(c->tcp_parent, -1, -1);
+ }
+}
+
+/** do the callback when reading is done */
+static void
+tcp_callback_reader(struct comm_point* c)
+{
+ log_assert(c->type == comm_tcp);
+ ldns_buffer_flip(c->buffer);
+ c->tcp_is_reading = 0;
+ c->tcp_byte_count = 0;
+ comm_point_stop_listening(c);
+ if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL) ) {
+ /* setup to send reply */
+ }
}
static void
-comm_point_tcp_handle_callback(int ATTR_UNUSED(fd), short ATTR_UNUSED(event),
- void* arg)
+comm_point_tcp_handle_callback(int fd, short event, void* arg)
{
struct comm_point* c = (struct comm_point*)arg;
- log_info("callback tcpaccept for %x", (int)c);
+ ssize_t r;
+ log_info("callback tcphandle for %x", (int)c);
log_assert(c->type == comm_tcp);
- /* TODO */
+
+ if(!(event&EV_READ)) {
+ if(event&EV_TIMEOUT) {
+ verbose(VERB_DETAIL, "tcp read took to long drop");
+ reclaim_tcp_handler(c);
+ return;
+ }
+ log_err("Ignored event %d for tcphdl.", event);
+ return;
+ }
+
+ if(!c->tcp_is_reading) {
+ reclaim_tcp_handler(c);
+ return;
+ }
+
+ if(c->tcp_byte_count < sizeof(uint16_t)) {
+ /* read length bytes */
+ r = read(fd, ldns_buffer_at(c->buffer, c->tcp_byte_count),
+ sizeof(uint16_t)-c->tcp_byte_count);
+ if(r == 0) {
+ reclaim_tcp_handler(c);
+ return;
+ } else if(r == -1) {
+ if(errno != EINTR && errno != EAGAIN)
+ log_err("read (in tcp s): %s", strerror(errno));
+ return;
+ }
+ c->tcp_byte_count += r;
+ if(c->tcp_byte_count != sizeof(uint16_t))
+ return;
+ ldns_buffer_set_limit(c->buffer,
+ ldns_buffer_read_u16_at(c->buffer, 0));
+ if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
+ verbose(VERB_DETAIL, "tcp: dropped bogus too short.");
+ reclaim_tcp_handler(c);
+ return;
+ }
+ log_info("Reading tcp query of length %d",
+ ldns_buffer_limit(c->buffer));
+ }
+
+ r = read(fd, ldns_buffer_current(c->buffer),
+ ldns_buffer_remaining(c->buffer));
+ if(r == 0) {
+ reclaim_tcp_handler(c);
+ return;
+ } else if(r == -1) {
+ if(errno != EINTR && errno != EAGAIN)
+ log_err("read (in tcp r): %s", strerror(errno));
+ return;
+ }
+ ldns_buffer_skip(c->buffer, r);
+ if(ldns_buffer_remaining(c->buffer) <= 0) {
+ tcp_callback_reader(c);
+ }
}
struct comm_point*
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
c->tcp_parent = NULL;
- c->cur_tcp_count = 0;
c->max_tcp_count = 0;
c->tcp_handlers = NULL;
c->tcp_free = NULL;
}
c->fd = -1;
c->buffer = ldns_buffer_new(bufsize);
- c->timeout = NULL;
+ if(!c->buffer) {
+ free(c->ev);
+ free(c);
+ return NULL;
+ }
+ c->timeout = (struct timeval*)malloc(sizeof(struct timeval));
+ if(!c->timeout) {
+ ldns_buffer_free(c->buffer);
+ free(c->ev);
+ free(c);
+ return NULL;
+ }
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
c->tcp_parent = parent;
- c->cur_tcp_count = 0;
c->max_tcp_count = 0;
c->tcp_handlers = NULL;
c->tcp_free = NULL;
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
c->tcp_parent = NULL;
- c->cur_tcp_count = 0;
c->max_tcp_count = num;
c->tcp_handlers = (struct comm_point**)calloc((size_t)num,
sizeof(struct comm_point*));
}
}
+void comm_point_stop_listening(struct comm_point* c)
+{
+ if(event_del(&c->ev->ev) != 0) {
+ log_err("event_del error to stoplisten");
+ }
+}
+
+void comm_point_start_listening(struct comm_point* c, int newfd, int sec)
+{
+ if(c->type == comm_tcp_accept && !c->tcp_free) {
+ /* no use to start listening no free slots. */
+ return;
+ }
+ if(sec != -1 && sec != 0) {
+ if(!c->timeout) {
+ c->timeout = (struct timeval*)malloc(sizeof(
+ struct timeval));
+ if(!c->timeout) {
+ log_err("cpsl: malloc failed. No net read.");
+ return;
+ }
+ }
+ c->timeout->tv_sec = sec;
+ c->timeout->tv_usec = 0;
+ }
+ if(c->fd != -1)
+ close(c->fd);
+ c->fd = newfd;
+ c->ev->ev.ev_fd = c->fd;
+ if(event_add(&c->ev->ev, sec==0?NULL:c->timeout) != 0) {
+ log_err("event_add failed. in cpsl.");
+ }
+}
+
struct comm_timer*
comm_timer_create(struct comm_base* base, void (*cb)(void*), void* cb_arg)
{
}
free(comsig);
}
+
+void
+log_addr(struct sockaddr_storage* addr, socklen_t addrlen)
+{
+ uint16_t port;
+ const char* family = "unknown";
+ char dest[100];
+ int af = (int)((struct sockaddr_in*)addr)->sin_family;
+ void* sinaddr = &((struct sockaddr_in*)addr)->sin_addr;
+ switch(af) {
+ case AF_INET: family="ip4"; break;
+ case AF_INET6: family="ip6";
+ sinaddr = &((struct sockaddr_in6*)addr)->sin6_addr;
+ break;
+ case AF_UNIX: family="unix"; break;
+ default: break;
+ }
+ if(inet_ntop(af, sinaddr, dest, (socklen_t)sizeof(dest)) == 0) {
+ strncpy(dest, "(inet_ntop error)", sizeof(dest));
+ }
+ port = ntohs(((struct sockaddr_in*)addr)->sin_port);
+ log_info("addr fam=%s port=%d dest=%s len=%d",
+ family, (int)port, dest, (int)addrlen);
+}