return 0;
}
+/** close and stop the output file descriptor event */
+static void dtio_close_output(struct dt_io_thread* dtio)
+{
+ if(!dtio->event)
+ return;
+ ub_event_free(dtio->event);
+ dtio->event = NULL;
+#ifndef USE_WINSOCK
+ close(dtio->fd);
+#else
+ closesocket(dtio->fd);
+#endif
+ dtio->fd = -1;
+}
+
+/** write buffer to output.
+ * returns number of bytes written, 0 if nothing happened,
+ * try again later, or -1 if the channel is to be closed. */
+static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
+ size_t len)
+{
+ ssize_t ret;
+ if(dtio->fd == -1)
+ return -1;
+ ret = send(dtio->fd, buf, len, 0);
+ if(ret == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ return 0;
+ log_err("dnstap io: failed send: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ return 0;
+ if(WSAGetLastError() == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock(dtio->event, UB_EV_WRITE);
+ return 0;
+ }
+ log_err("dnstap io: failed send: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ return -1;
+ }
+ return ret;
+}
+
+#ifdef HAVE_WRITEV
+/** write with writev, len and message, in one write, if possible.
+ * return true if message is done, false if incomplete */
+static int dtio_write_with_writev(struct dt_io_thread* dtio)
+{
+ uint32_t sendlen = htonl(dtio->cur_msg_len);
+ struct iovec iov[2];
+ ssize_t r;
+ iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
+ iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
+ iov[1].iov_base = dtio->cur_msg;
+ iov[1].iov_len = dtio->cur_msg_len;
+ log_assert(iov[0].iov_len > 0);
+ r = writev(dtio->fd, iov, 2);
+ if(r == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ return 0;
+ log_err("dnstap io: failed writev: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ return 0;
+ if(WSAGetLastError() == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock(dtio->event, UB_EV_WRITE);
+ return 0;
+ }
+ log_err("dnstap io: failed writev: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ return -1;
+ }
+ /* written r bytes */
+ dtio->cur_msg_len_done += r;
+ if(dtio->cur_msg_len_done < 4)
+ return 0;
+ if(dtio->cur_msg_len_done > 4) {
+ dtio->cur_msg_done = dtio->cur_msg_len_done-4;
+ dtio->cur_msg_len_done = 4;
+ }
+ if(dtio->cur_msg_done < dtio->cur_msg_len)
+ return 0;
+ return 1;
+}
+#endif /* HAVE_WRITEV */
+
+/** write more of the length, preceding the data frame.
+ * return true if message is done, false if incomplete. */
+static int dtio_write_more_of_len(struct dt_io_thread* dtio)
+{
+#ifndef HAVE_WRITEV
+ uint32_t sendlen = htonl(dtio->cur_msg_len);
+ int r;
+#endif
+ if(dtio->cur_msg_len_done >= 4)
+ return 1;
+#ifdef HAVE_WRITEV
+ /* we try writev for everything.*/
+ return dtio_write_with_writev(dtio);
+#else
+ r = dtio_write_buf(dtio,
+ ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
+ sizeof(sendlen)-dtio->cur_msg_len_done);
+ if(r == -1) {
+ /* close the channel */
+ dtio_close_output(dtio);
+ return 0;
+ } else if(r == 0) {
+ /* try again later */
+ return 0;
+ }
+ dtio->cur_msg_len_done += r;
+ if(dtio->cur_msg_len_done < 4)
+ return 0;
+ return 1;
+#endif /* HAVE_WRITEV */
+}
+
+/** write more of the data frame.
+ * return true if message is done, false if incomplete. */
+static int dtio_write_more_of_data(struct dt_io_thread* dtio)
+{
+ int r;
+ if(dtio->cur_msg_done >= dtio->cur_msg_len)
+ return 1;
+ r = dtio_write_buf(dtio,
+ ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
+ dtio->cur_msg_len - dtio->cur_msg_done);
+ if(r == -1) {
+ /* close the channel */
+ dtio_close_output(dtio);
+ return 0;
+ } else if(r == 0) {
+ /* try again later */
+ return 0;
+ }
+ dtio->cur_msg_done += r;
+ if(dtio->cur_msg_done < dtio->cur_msg_len)
+ return 0;
+ return 1;
+}
+
/** write more of the current messsage. false if incomplete, true if
* the message is done */
-static int dtio_write_more(int fd, struct dt_io_thread* dtio)
+static int dtio_write_more(struct dt_io_thread* dtio)
{
- return 0;
+ if(dtio->cur_msg_len_done < 4) {
+ if(!dtio_write_more_of_len(dtio))
+ return 0;
+ }
+ if(dtio->cur_msg_done < dtio->cur_msg_len) {
+ if(!dtio_write_more_of_data(dtio))
+ return 0;
+ }
+ return 1;
}
/** callback for the dnstap events, to write to the output */
-static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg)
+static void dtio_output_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
+ void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
/* write it */
if(dtio->cur_msg_done < dtio->cur_msg_len) {
- if(!dtio_write_more(fd, dtio))
+ if(!dtio_write_more(dtio))
return;
}
return;
r = read(fd, &cmd, sizeof(cmd));
if(r == -1) {
+#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return; /* ignore this */
log_err("dnstap io: failed to read: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ return;
+ if(WSAGetLastError() == WSAEWOULDBLOCK)
+ return;
+ log_err("dnstap io: failed to read: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
/* and then fall through to quit the thread */
- }
- if(r == 0) {
+ } else if(r == 0) {
verbose(VERB_ALGO, "dnstap io: cmd channel closed");
} else if(r == 1 && cmd == 0) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
+ } else if(r == 1) {
+ verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
}
dtio->want_to_exit = 1;
if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
dtio->event_added = 0;
}
-/** close and stop the output file descriptor event */
-static void dtio_close_output(struct dt_io_thread* dtio)
+/**
+ * structure to keep track of information during stop flush
+ */
+struct stop_flush_info {
+ /** the event base during stop flush */
+ struct ub_event_base* base;
+ /** did we already want to exit this stop-flush event base */
+ int want_to_exit_flush;
+ /** has the timer fired */
+ int timer_done;
+ /** the dtio */
+ struct dt_io_thread* dtio;
+ /** the stop control frame */
+ void* stop_frame;
+ /** length of the stop frame */
+ size_t stop_frame_len;
+ /** how much we have done of the stop frame */
+ size_t stop_frame_done;
+};
+
+/** exit the stop flush base */
+static void dtio_stop_flush_exit(struct stop_flush_info* info)
{
- if(!dtio->event)
+ if(info->want_to_exit_flush)
return;
- ub_event_free(dtio->event);
- dtio->event = NULL;
- close(dtio->fd);
- dtio->fd = -1;
+ info->want_to_exit_flush = 1;
+ if(ub_event_base_loopexit(info->base) != 0) {
+ log_err("dnstap io: could not loopexit");
+ }
+}
+
+/** send the stop control,
+ * return true if completed the frame. */
+static int dtio_control_stop_send(struct stop_flush_info* info)
+{
+ struct dt_io_thread* dtio = info->dtio;
+ int r;
+ if(info->stop_frame_done >= info->stop_frame_len)
+ return 1;
+ r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
+ info->stop_frame_done, info->stop_frame_len -
+ info->stop_frame_done);
+ if(r == -1) {
+ verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
+ dtio_stop_flush_exit(info);
+ return 0;
+ }
+ if(r == 0) {
+ /* try again later, or timeout */
+ return 0;
+ }
+ info->stop_frame_done += r;
+ if(info->stop_frame_done < info->stop_frame_len)
+ return 0; /* not done yet */
+ return 1;
+}
+
+static void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
+ void* arg)
+{
+ struct stop_flush_info* info = (struct stop_flush_info*)arg;
+ if(info->want_to_exit_flush)
+ return;
+ verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
+ info->timer_done = 1;
+ dtio_stop_flush_exit(info);
+}
+
+static void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
+ void* arg)
+{
+ struct stop_flush_info* info = (struct stop_flush_info*)arg;
+ struct dt_io_thread* dtio = info->dtio;
+ if(info->want_to_exit_flush)
+ return;
+ /* write remainder of last frame */
+ if(dtio->cur_msg) {
+ if(dtio->cur_msg_done < dtio->cur_msg_len) {
+ if(!dtio_write_more(dtio)) {
+ if(dtio->fd == -1) {
+ verbose(VERB_ALGO, "dnstap io: "
+ "stop flush: output closed");
+ dtio_stop_flush_exit(info);
+ }
+ return;
+ }
+ }
+ verbose(VERB_ALGO, "dnstap io: stop flush completed "
+ "last frame");
+ free(dtio->cur_msg);
+ dtio->cur_msg = NULL;
+ dtio->cur_msg_len = 0;
+ dtio->cur_msg_done = 0;
+ dtio->cur_msg_len_done = 0;
+ }
+ /* write stop frame */
+ if(info->stop_frame_done < info->stop_frame_len) {
+ if(!dtio_control_stop_send(info))
+ return;
+ verbose(VERB_ALGO, "dnstap io: stop flush completed "
+ "stop control frame");
+ }
+ /* when last frame and stop frame are sent, exit */
+ dtio_stop_flush_exit(info);
+}
+
+/** flush at end, last packet and stop control */
+static void dtio_control_stop_flush(struct dt_io_thread* dtio)
+{
+ /* briefly attempt to flush the previous packet to the output,
+ * this could be a partial packet, or even the start control frame */
+ time_t secs = 0;
+ struct timeval now;
+ struct stop_flush_info info;
+ struct timeval tv;
+ struct ub_event* timer, *stopev;
+ memset(&info, 0, sizeof(info));
+ memset(&now, 0, sizeof(now));
+ info.base = ub_default_event_base(0, &secs, &now);
+ if(!info.base) {
+ log_err("dnstap io: malloc failure");
+ return;
+ }
+ timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
+ &dtio_stop_timer_cb, &info);
+ if(!timer) {
+ log_err("dnstap io: malloc failure");
+ ub_event_base_free(info.base);
+ return;
+ }
+ memset(&tv, 0, sizeof(tv));
+ tv.tv_sec = 2;
+ if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
+ &tv) != 0) {
+ log_err("dnstap io: cannot event_timer_add");
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+ return;
+ }
+ stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
+ UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
+ if(!stopev) {
+ log_err("dnstap io: malloc failure");
+ ub_event_del(timer);
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+ return;
+ }
+ if(ub_event_add(stopev, NULL) != 0) {
+ log_err("dnstap io: cannot event_add");
+ ub_event_free(stopev);
+ ub_event_del(timer);
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+ return;
+ }
+ info.stop_frame = fstrm_create_control_frame_stop(
+ &info.stop_frame_len);
+ if(!info.stop_frame) {
+ log_err("dnstap io: malloc failure");
+ ub_event_del(stopev);
+ ub_event_free(stopev);
+ ub_event_del(timer);
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
+ return;
+ }
+
+ /* wait briefly, or until finished */
+ verbose(VERB_ALGO, "dnstap io: stop flush started");
+ if(ub_event_base_dispatch(info.base) < 0) {
+ log_err("dnstap io: dispatch flush failed, errno is %s",
+ strerror(errno));
+ }
+ verbose(VERB_ALGO, "dnstap io: stop flush ended");
+ free(info.stop_frame);
+ ub_event_del(stopev);
+ ub_event_free(stopev);
+ ub_event_del(timer);
+ ub_event_free(timer);
+ ub_event_base_free(info.base);
}
/** perform desetup and free stuff when the dnstap io thread exits */
static void dtio_desetup(struct dt_io_thread* dtio)
{
+ dtio_control_stop_flush(dtio);
dtio_del_output_event(dtio);
dtio_close_output(dtio);
ub_event_del(dtio->command_event);
ub_event_free(dtio->command_event);
+#ifndef USE_WINSOCK
close(dtio->commandpipe[0]);
+#else
+ _close(dtio->commandpipe[0]);
+#endif
dtio->commandpipe[0] = -1;
+ free(dtio->cur_msg);
+ dtio->cur_msg = NULL;
ub_event_base_free(dtio->event_base);
}
struct sockaddr_un s;
dtio->fd = socket(AF_LOCAL, SOCK_STREAM, SOCK_CLOEXEC);
if(dtio->fd == -1) {
+#ifndef USE_WINSOCK
log_err("dnstap io: failed to create socket: %s",
strerror(errno));
+#else
+ log_err("dnstap io: failed to create socket: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
return;
}
memset(&s, 0, sizeof(s));
(void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
== -1) {
+#ifndef USE_WINSOCK
log_err("dnstap io: failed to connect: %s", strerror(errno));
+#else
+ log_err("dnstap io: failed to connect: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+#ifndef USE_WINSOCK
close(dtio->fd);
+#else
+ closesocket(dtio->fd);
+#endif
dtio->fd = -1;
return;
}
UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
dtio);
if(!ev) {
+#ifndef USE_WINSOCK
close(dtio->fd);
+#else
+ closesocket(dtio->fd);
+#endif
dtio->fd = -1;
log_err("dnstap io: out of memory");
return;
if(!dtio_control_start_send(dtio)) {
ub_event_free(dtio->event);
dtio->event = NULL;
+#ifndef USE_WINSOCK
close(dtio->fd);
+#else
+ closesocket(dtio->fd);
+#endif
dtio->fd = -1;
log_err("dnstap io: out of memory");
return;
int dt_io_thread_start(struct dt_io_thread* dtio)
{
/* set up the thread, can fail */
+#ifndef USE_WINSOCK
if(pipe(dtio->commandpipe) == -1) {
log_err("failed to create pipe: %s", strerror(errno));
return 0;
}
+#else
+ if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
+ log_err("failed to create _pipe: %s",
+ wsa_strerror(WSAGetLastError()));
+ return 0;
+ }
+#endif
/* start the thread */
ub_thread_create(&dtio->tid, dnstap_io, dtio);
while(1) {
ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
if(r == -1) {
+#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
continue;
log_err("dnstap io stop: write: %s", strerror(errno));
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS)
+ continue;
+ if(WSAGetLastError() == WSAEWOULDBLOCK)
+ continue;
+ log_err("dnstap io stop: write: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
break;
}
break;
}
+#ifndef USE_WINSOCK
close(dtio->commandpipe[1]);
+#else
+ _close(dtio->commandpipe[1]);
+#endif
dtio->commandpipe[1] = -1;
ub_thread_join(dtio->tid);
}