#include <errno.h>
/* -------- Start of local definitions -------- */
-/** The TCP reading query timeout in seconds */
-#define TCP_READ_TIMEOUT 120
+/** The TCP reading or writing query timeout in seconds */
+#define TCP_QUERY_TIMEOUT 120
/* We define libevent structures here to hide the libevent stuff. */
ldns_buffer_clear(c->buffer);
c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
- comm_point_start_listening(c, fd, TCP_READ_TIMEOUT);
+ comm_point_start_listening(c, fd, TCP_QUERY_TIMEOUT);
}
static void
}
}
+/** do the callback when writing is done */
+static void
+tcp_callback_writer(struct comm_point* c)
+{
+ log_assert(c->type == comm_tcp);
+ ldns_buffer_clear(c->buffer);
+ c->tcp_is_reading = 1;
+ c->tcp_byte_count = 0;
+ comm_point_stop_listening(c);
+ /* for listening socket */
+ reclaim_tcp_handler(c);
+}
+
/** do the callback when reading is done */
static void
tcp_callback_reader(struct comm_point* c)
{
+ struct comm_reply rep;
log_assert(c->type == comm_tcp);
ldns_buffer_flip(c->buffer);
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
comm_point_stop_listening(c);
- if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL) ) {
- /* setup to send reply */
+ rep.c = c;
+ rep.addrlen = 0;
+ if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &rep) ) {
+ comm_point_start_listening(c, -1, TCP_QUERY_TIMEOUT);
}
}
-static void
-comm_point_tcp_handle_callback(int fd, short event, void* arg)
+/** Handle tcp reading callback.
+ * @param fd: file descriptor of socket.
+ * @param c: comm point to read from into buffer.
+ * @return: 0 on error
+ */
+static int
+comm_point_tcp_handle_read(int fd, struct comm_point* c)
{
- struct comm_point* c = (struct comm_point*)arg;
ssize_t r;
- log_info("callback tcphandle for %x", (int)c);
log_assert(c->type == comm_tcp);
-
- if(!(event&EV_READ)) {
- if(event&EV_TIMEOUT) {
- verbose(VERB_DETAIL, "tcp read took to long drop");
- reclaim_tcp_handler(c);
- return;
- }
- log_err("Ignored event %d for tcphdl.", event);
- return;
- }
-
- if(!c->tcp_is_reading) {
- reclaim_tcp_handler(c);
- return;
- }
+ if(!c->tcp_is_reading)
+ return 0;
if(c->tcp_byte_count < sizeof(uint16_t)) {
/* read length bytes */
r = read(fd, ldns_buffer_at(c->buffer, c->tcp_byte_count),
sizeof(uint16_t)-c->tcp_byte_count);
- if(r == 0) {
- reclaim_tcp_handler(c);
- return;
- } else if(r == -1) {
- if(errno != EINTR && errno != EAGAIN)
- log_err("read (in tcp s): %s", strerror(errno));
- return;
+ if(r == 0)
+ return 0;
+ else if(r == -1) {
+ if(errno == EINTR || errno == EAGAIN)
+ return 1;
+ log_err("read (in tcp s): %s", strerror(errno));
+ return 0;
}
c->tcp_byte_count += r;
if(c->tcp_byte_count != sizeof(uint16_t))
- return;
+ return 1;
+ if(ldns_buffer_read_u16_at(c->buffer, 0) >
+ ldns_buffer_capacity(c->buffer)) {
+ verbose(VERB_DETAIL, "tcp: dropped larger than buffer");
+ return 0;
+ }
ldns_buffer_set_limit(c->buffer,
ldns_buffer_read_u16_at(c->buffer, 0));
if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
verbose(VERB_DETAIL, "tcp: dropped bogus too short.");
- reclaim_tcp_handler(c);
- return;
+ return 0;
}
log_info("Reading tcp query of length %d",
ldns_buffer_limit(c->buffer));
r = read(fd, ldns_buffer_current(c->buffer),
ldns_buffer_remaining(c->buffer));
if(r == 0) {
- reclaim_tcp_handler(c);
- return;
+ return 0;
} else if(r == -1) {
- if(errno != EINTR && errno != EAGAIN)
- log_err("read (in tcp r): %s", strerror(errno));
- return;
+ if(errno == EINTR || errno == EAGAIN)
+ return 1;
+ log_err("read (in tcp r): %s", strerror(errno));
+ return 0;
}
ldns_buffer_skip(c->buffer, r);
if(ldns_buffer_remaining(c->buffer) <= 0) {
tcp_callback_reader(c);
}
+ return 1;
+}
+
+/** Handle tcp writing callback.
+ * @param fd: file descriptor of socket.
+ * @param c: comm point to write buffer out of.
+ * @return: 0 on error
+ */
+static int
+comm_point_tcp_handle_write(int fd, struct comm_point* c)
+{
+ ssize_t r;
+ log_assert(c->type == comm_tcp);
+ if(c->tcp_is_reading)
+ return 0;
+
+ if(c->tcp_byte_count < sizeof(uint16_t)) {
+ uint16_t len = htons(ldns_buffer_limit(c->buffer));
+ r = write(fd, &len, sizeof(uint16_t)-c->tcp_byte_count);
+ if(r == -1) {
+ if(errno == EINTR || errno == EAGAIN)
+ return 1;
+ log_err("tcp write(s): %s", strerror(errno));
+ return 0;
+ }
+ c->tcp_byte_count += r;
+ if(c->tcp_byte_count != sizeof(uint16_t))
+ return 1;
+ ldns_buffer_set_position(c->buffer, 0);
+ }
+ r = write(fd, ldns_buffer_current(c->buffer),
+ ldns_buffer_remaining(c->buffer));
+ if(r == -1) {
+ if(errno == EINTR || errno == EAGAIN)
+ return 1;
+ log_err("tcp write(w): %s", strerror(errno));
+ return 0;
+ }
+ ldns_buffer_skip(c->buffer, r);
+
+ if(ldns_buffer_remaining(c->buffer) == 0) {
+ tcp_callback_writer(c);
+ }
+
+ return 1;
+}
+
+static void
+comm_point_tcp_handle_callback(int fd, short event, void* arg)
+{
+ struct comm_point* c = (struct comm_point*)arg;
+ log_assert(c->type == comm_tcp);
+
+ if(event&EV_READ) {
+ if(!comm_point_tcp_handle_read(fd, c)) {
+ reclaim_tcp_handler(c);
+ if(!c->tcp_do_close)
+ (void)(*c->callback)(c, c->cb_arg,
+ NETEVENT_CLOSED, NULL);
+ }
+ return;
+ }
+ if(event&EV_WRITE) {
+ if(!comm_point_tcp_handle_write(fd, c)) {
+ reclaim_tcp_handler(c);
+ if(!c->tcp_do_close)
+ (void)(*c->callback)(c, c->cb_arg,
+ NETEVENT_CLOSED, NULL);
+ }
+ return;
+ }
+ if(event&EV_TIMEOUT) {
+ verbose(VERB_DETAIL, "tcp took too long, dropped");
+ reclaim_tcp_handler(c);
+ if(!c->tcp_do_close)
+ (void)(*c->callback)(c, c->cb_arg,
+ NETEVENT_TIMEOUT, NULL);
+ return;
+ }
+ log_err("Ignored event %d for tcphdl.", event);
}
struct comm_point*
comm_point_send_udp_msg(repinfo->c, repinfo->c->buffer,
(struct sockaddr*)&repinfo->addr, repinfo->addrlen);
} else {
- log_info("tcp reply");
- /* TODO */
+ comm_point_start_listening(repinfo->c, -1, TCP_QUERY_TIMEOUT);
}
}
void comm_point_stop_listening(struct comm_point* c)
{
+ log_info("comm point stop listening %x", (int)c);
if(event_del(&c->ev->ev) != 0) {
log_err("event_del error to stoplisten");
}
void comm_point_start_listening(struct comm_point* c, int newfd, int sec)
{
+ log_info("comm point start listening %x", (int)c);
if(c->type == comm_tcp_accept && !c->tcp_free) {
/* no use to start listening no free slots. */
return;
return;
}
}
+#ifndef S_SPLINT_S /* splint fails on struct timeval. */
c->timeout->tv_sec = sec;
c->timeout->tv_usec = 0;
+#endif /* S_SPLINT_S */
+ }
+ if(c->type == comm_tcp) {
+ c->ev->ev.ev_events &= ~(EV_READ|EV_WRITE);
+ if(c->tcp_is_reading)
+ c->ev->ev.ev_events |= EV_READ;
+ else c->ev->ev.ev_events |= EV_WRITE;
+ }
+ if(newfd != -1) {
+ if(c->fd != -1)
+ close(c->fd);
+ c->fd = newfd;
+ c->ev->ev.ev_fd = c->fd;
}
- if(c->fd != -1)
- close(c->fd);
- c->fd = newfd;
- c->ev->ev.ev_fd = c->fd;
if(event_add(&c->ev->ev, sec==0?NULL:c->timeout) != 0) {
log_err("event_add failed. in cpsl.");
}