return 1;
}
+/** check if the output fd has been closed */
+static void dtio_check_close(struct dt_io_thread* dtio)
+{
+ /* we don't want to read any packets, but if there are we can
+ * discard the input (ignore it), which is okay for a framestream,
+ * and also, the read call can return that the stream has been
+ * closed by the other side. */
+ ssize_t r;
+ uint8_t buf[1024];
+ if(dtio->fd == -1) return;
+ while(1) {
+ r = recv(dtio->fd, buf, sizeof(buf), 0);
+ if(r == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EINTR || errno == EAGAIN)
+ return; /* try later */
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS) {
+ return; /* try later */
+ } else if(WSAGetLastError() == WSAEWOULDBLOCK) {
+ ub_winsock_tcp_wouldblock(dtio->event, UB_EV_READ);
+ return; /* try later */
+ }
+#endif
+ log_err("dnstap io: output recv: %s", strerror(errno));
+ /* and close below */
+ break;
+ }
+ if(r == 0) {
+ verbose(VERB_ALGO, "dnstap io: output closed by the other side");
+ /* and close below */
+ break;
+ }
+ /* something was received, ignore it */
+ }
+ /* the other end has been closed */
+ /* close the channel */
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+}
+
/** callback for the dnstap events, to write to the output */
-static void dtio_output_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
- void* arg)
+static void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+
+ if((bits&UB_EV_READ)) {
+ dtio_check_close(dtio);
+ return;
+ }
/* see if there are messages that need writing */
if(!dtio->cur_msg) {
dtio_stop_flush_exit(info);
}
-static void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
- void* arg)
+static void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
{
struct stop_flush_info* info = (struct stop_flush_info*)arg;
struct dt_io_thread* dtio = info->dtio;
if(info->want_to_exit_flush)
return;
+ if((bits&UB_EV_READ)) {
+ dtio_check_close(dtio);
+ if(dtio->fd == -1) {
+ verbose(VERB_ALGO, "dnstap io: "
+ "stop flush: output closed");
+ dtio_stop_flush_exit(info);
+ }
+ return;
+ }
/* write remainder of last frame */
if(dtio->cur_msg) {
if(dtio->cur_msg_done < dtio->cur_msg_len) {