]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
write data and stop flush for dnstap io.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Wed, 22 Jan 2020 14:20:48 +0000 (15:20 +0100)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Wed, 22 Jan 2020 14:20:48 +0000 (15:20 +0100)
dnstap/dtstream.c

index 1f833f1f985381720abd5e42445cd8b4b512e804..9d7a465cd5a9bf332d5a9ca327fe3dfaa9dccba0 100644 (file)
@@ -292,15 +292,170 @@ static int dtio_find_msg(struct dt_io_thread* dtio)
        return 0;
 }
 
+/** close and stop the output file descriptor event */
+static void dtio_close_output(struct dt_io_thread* dtio)
+{
+       if(!dtio->event)
+               return;
+       ub_event_free(dtio->event);
+       dtio->event = NULL;
+#ifndef USE_WINSOCK
+       close(dtio->fd);
+#else
+       closesocket(dtio->fd);
+#endif
+       dtio->fd = -1;
+}
+
+/** write buffer to output.
+ * returns number of bytes written, 0 if nothing happened,
+ * try again later, or -1 if the channel is to be closed. */
+static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
+       size_t len)
+{
+       ssize_t ret;
+       if(dtio->fd == -1)
+               return -1;
+       ret = send(dtio->fd, buf, len, 0);
+       if(ret == -1) {
+#ifndef USE_WINSOCK
+               if(errno == EINTR || errno == EAGAIN)
+                       return 0;
+               log_err("dnstap io: failed send: %s", strerror(errno));
+#else
+               if(WSAGetLastError() == WSAEINPROGRESS)
+                       return 0;
+               if(WSAGetLastError() == WSAEWOULDBLOCK) {
+                       ub_winsock_tcp_wouldblock(dtio->event, UB_EV_WRITE);
+                       return 0;
+               }
+               log_err("dnstap io: failed send: %s",
+                       wsa_strerror(WSAGetLastError()));
+#endif
+               return -1;
+       }
+       return ret;
+}
+
+#ifdef HAVE_WRITEV
+/** write with writev, len and message, in one write, if possible.
+ * return true if message is done, false if incomplete */
+static int dtio_write_with_writev(struct dt_io_thread* dtio)
+{
+       uint32_t sendlen = htonl(dtio->cur_msg_len);
+       struct iovec iov[2];
+       ssize_t r;
+       iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
+       iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
+       iov[1].iov_base = dtio->cur_msg;
+       iov[1].iov_len = dtio->cur_msg_len;
+       log_assert(iov[0].iov_len > 0);
+       r = writev(dtio->fd, iov, 2);
+       if(r == -1) {
+#ifndef USE_WINSOCK
+               if(errno == EINTR || errno == EAGAIN)
+                       return 0;
+               log_err("dnstap io: failed writev: %s", strerror(errno));
+#else
+               if(WSAGetLastError() == WSAEINPROGRESS)
+                       return 0;
+               if(WSAGetLastError() == WSAEWOULDBLOCK) {
+                       ub_winsock_tcp_wouldblock(dtio->event, UB_EV_WRITE);
+                       return 0;
+               }
+               log_err("dnstap io: failed writev: %s",
+                       wsa_strerror(WSAGetLastError()));
+#endif
+               return -1;
+       }
+       /* written r bytes */
+       dtio->cur_msg_len_done += r;
+       if(dtio->cur_msg_len_done < 4)
+               return 0;
+       if(dtio->cur_msg_len_done > 4) {
+               dtio->cur_msg_done = dtio->cur_msg_len_done-4;
+               dtio->cur_msg_len_done = 4;
+       }
+       if(dtio->cur_msg_done < dtio->cur_msg_len)
+               return 0;
+       return 1;
+}
+#endif /* HAVE_WRITEV */
+
+/** write more of the length, preceding the data frame.
+ * return true if message is done, false if incomplete. */
+static int dtio_write_more_of_len(struct dt_io_thread* dtio)
+{
+#ifndef HAVE_WRITEV
+       uint32_t sendlen = htonl(dtio->cur_msg_len);
+       int r;
+#endif
+       if(dtio->cur_msg_len_done >= 4)
+               return 1;
+#ifdef HAVE_WRITEV
+       /* we try writev for everything.*/
+       return dtio_write_with_writev(dtio);
+#else
+       r = dtio_write_buf(dtio,
+               ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
+               sizeof(sendlen)-dtio->cur_msg_len_done);
+       if(r == -1) {
+               /* close the channel */
+               dtio_close_output(dtio);
+               return 0;
+       } else if(r == 0) {
+               /* try again later */
+               return 0;
+       }
+       dtio->cur_msg_len_done += r;
+       if(dtio->cur_msg_len_done < 4)
+               return 0;
+       return 1;
+#endif /* HAVE_WRITEV */
+}
+
+/** write more of the data frame.
+ * return true if message is done, false if incomplete. */
+static int dtio_write_more_of_data(struct dt_io_thread* dtio)
+{
+       int r;
+       if(dtio->cur_msg_done >= dtio->cur_msg_len)
+               return 1;
+       r = dtio_write_buf(dtio,
+               ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
+               dtio->cur_msg_len - dtio->cur_msg_done);
+       if(r == -1) {
+               /* close the channel */
+               dtio_close_output(dtio);
+               return 0;
+       } else if(r == 0) {
+               /* try again later */
+               return 0;
+       }
+       dtio->cur_msg_done += r;
+       if(dtio->cur_msg_done < dtio->cur_msg_len)
+               return 0;
+       return 1;
+}
+
 /** write more of the current messsage. false if incomplete, true if
  * the message is done */
-static int dtio_write_more(int fd, struct dt_io_thread* dtio)
+static int dtio_write_more(struct dt_io_thread* dtio)
 {
-       return 0;
+       if(dtio->cur_msg_len_done < 4) {
+               if(!dtio_write_more_of_len(dtio))
+                       return 0;
+       }
+       if(dtio->cur_msg_done < dtio->cur_msg_len) {
+               if(!dtio_write_more_of_data(dtio))
+                       return 0;
+       }
+       return 1;
 }
 
 /** callback for the dnstap events, to write to the output */
-static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg)
+static void dtio_output_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
+       void* arg)
 {
        struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
        
@@ -312,7 +467,7 @@ static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg)
 
        /* write it */
        if(dtio->cur_msg_done < dtio->cur_msg_len) {
-               if(!dtio_write_more(fd, dtio))
+               if(!dtio_write_more(dtio))
                        return;
        }
 
@@ -334,15 +489,25 @@ static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
                return;
        r = read(fd, &cmd, sizeof(cmd));
        if(r == -1) {
+#ifndef USE_WINSOCK
                if(errno == EINTR || errno == EAGAIN)
                        return; /* ignore this */
                log_err("dnstap io: failed to read: %s", strerror(errno));
+#else
+               if(WSAGetLastError() == WSAEINPROGRESS)
+                       return;
+               if(WSAGetLastError() == WSAEWOULDBLOCK)
+                       return;
+               log_err("dnstap io: failed to read: %s",
+                       wsa_strerror(WSAGetLastError()));
+#endif
                /* and then fall through to quit the thread */
-       }
-       if(r == 0) {
+       } else if(r == 0) {
                verbose(VERB_ALGO, "dnstap io: cmd channel closed");
        } else if(r == 1 && cmd == 0) {
                verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
+       } else if(r == 1) {
+               verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
        }
        dtio->want_to_exit = 1;
        if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
@@ -387,26 +552,205 @@ static void dtio_del_output_event(struct dt_io_thread* dtio)
        dtio->event_added = 0;
 }
 
-/** close and stop the output file descriptor event */
-static void dtio_close_output(struct dt_io_thread* dtio)
+/**
+ * structure to keep track of information during stop flush
+ */
+struct stop_flush_info {
+       /** the event base during stop flush */
+       struct ub_event_base* base;
+       /** did we already want to exit this stop-flush event base */
+       int want_to_exit_flush;
+       /** has the timer fired */
+       int timer_done;
+       /** the dtio */
+       struct dt_io_thread* dtio;
+       /** the stop control frame */
+       void* stop_frame;
+       /** length of the stop frame */
+       size_t stop_frame_len;
+       /** how much we have done of the stop frame */
+       size_t stop_frame_done;
+};
+
+/** exit the stop flush base */
+static void dtio_stop_flush_exit(struct stop_flush_info* info)
 {
-       if(!dtio->event)
+       if(info->want_to_exit_flush)
                return;
-       ub_event_free(dtio->event);
-       dtio->event = NULL;
-       close(dtio->fd);
-       dtio->fd = -1;
+       info->want_to_exit_flush = 1;
+       if(ub_event_base_loopexit(info->base) != 0) {
+               log_err("dnstap io: could not loopexit");
+       }
+}
+
+/** send the stop control,
+ * return true if completed the frame. */
+static int dtio_control_stop_send(struct stop_flush_info* info)
+{
+       struct dt_io_thread* dtio = info->dtio;
+       int r;
+       if(info->stop_frame_done >= info->stop_frame_len)
+               return 1;
+       r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
+               info->stop_frame_done, info->stop_frame_len -
+               info->stop_frame_done);
+       if(r == -1) {
+               verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
+               dtio_stop_flush_exit(info);
+               return 0;
+       }
+       if(r == 0) {
+               /* try again later, or timeout */
+               return 0;
+       }
+       info->stop_frame_done += r;
+       if(info->stop_frame_done < info->stop_frame_len)
+               return 0; /* not done yet */
+       return 1;
+}
+
+static void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
+       void* arg)
+{
+       struct stop_flush_info* info = (struct stop_flush_info*)arg;
+       if(info->want_to_exit_flush)
+               return;
+       verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
+       info->timer_done = 1;
+       dtio_stop_flush_exit(info);
+}
+
+static void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
+       void* arg)
+{
+       struct stop_flush_info* info = (struct stop_flush_info*)arg;
+       struct dt_io_thread* dtio = info->dtio;
+       if(info->want_to_exit_flush)
+               return;
+       /* write remainder of last frame */
+       if(dtio->cur_msg) {
+               if(dtio->cur_msg_done < dtio->cur_msg_len) {
+                       if(!dtio_write_more(dtio)) {
+                               if(dtio->fd == -1) {
+                                       verbose(VERB_ALGO, "dnstap io: "
+                                               "stop flush: output closed");
+                                       dtio_stop_flush_exit(info);
+                               }
+                               return;
+                       }
+               }
+               verbose(VERB_ALGO, "dnstap io: stop flush completed "
+                       "last frame");
+               free(dtio->cur_msg);
+               dtio->cur_msg = NULL;
+               dtio->cur_msg_len = 0;
+               dtio->cur_msg_done = 0;
+               dtio->cur_msg_len_done = 0;
+       }
+       /* write stop frame */
+       if(info->stop_frame_done < info->stop_frame_len) {
+               if(!dtio_control_stop_send(info))
+                       return;
+               verbose(VERB_ALGO, "dnstap io: stop flush completed "
+                       "stop control frame");
+       }
+       /* when last frame and stop frame are sent, exit */
+       dtio_stop_flush_exit(info);
+}
+
+/** flush at end, last packet and stop control */
+static void dtio_control_stop_flush(struct dt_io_thread* dtio)
+{
+       /* briefly attempt to flush the previous packet to the output,
+        * this could be a partial packet, or even the start control frame */
+       time_t secs = 0;
+       struct timeval now;
+       struct stop_flush_info info;
+       struct timeval tv;
+       struct ub_event* timer, *stopev;
+       memset(&info, 0, sizeof(info));
+       memset(&now, 0, sizeof(now));
+       info.base = ub_default_event_base(0, &secs, &now);
+       if(!info.base) {
+               log_err("dnstap io: malloc failure");
+               return;
+       }
+       timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
+               &dtio_stop_timer_cb, &info);
+       if(!timer) {
+               log_err("dnstap io: malloc failure");
+               ub_event_base_free(info.base);
+               return;
+       }
+       memset(&tv, 0, sizeof(tv));
+       tv.tv_sec = 2;
+       if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
+               &tv) != 0) {
+               log_err("dnstap io: cannot event_timer_add");
+               ub_event_free(timer);
+               ub_event_base_free(info.base);
+               return;
+       }
+       stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
+               UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
+       if(!stopev) {
+               log_err("dnstap io: malloc failure");
+               ub_event_del(timer);
+               ub_event_free(timer);
+               ub_event_base_free(info.base);
+               return;
+       }
+       if(ub_event_add(stopev, NULL) != 0) {
+               log_err("dnstap io: cannot event_add");
+               ub_event_free(stopev);
+               ub_event_del(timer);
+               ub_event_free(timer);
+               ub_event_base_free(info.base);
+               return;
+       }
+       info.stop_frame = fstrm_create_control_frame_stop(
+               &info.stop_frame_len);
+       if(!info.stop_frame) {
+               log_err("dnstap io: malloc failure");
+               ub_event_del(stopev);
+               ub_event_free(stopev);
+               ub_event_del(timer);
+               ub_event_free(timer);
+               ub_event_base_free(info.base);
+               return;
+       }
+
+       /* wait briefly, or until finished */
+       verbose(VERB_ALGO, "dnstap io: stop flush started");
+       if(ub_event_base_dispatch(info.base) < 0) {
+               log_err("dnstap io: dispatch flush failed, errno is %s",
+                       strerror(errno));
+       }
+       verbose(VERB_ALGO, "dnstap io: stop flush ended");
+       free(info.stop_frame);
+       ub_event_del(stopev);
+       ub_event_free(stopev);
+       ub_event_del(timer);
+       ub_event_free(timer);
+       ub_event_base_free(info.base);
 }
 
 /** perform desetup and free stuff when the dnstap io thread exits */
 static void dtio_desetup(struct dt_io_thread* dtio)
 {
+       dtio_control_stop_flush(dtio);
        dtio_del_output_event(dtio);
        dtio_close_output(dtio);
        ub_event_del(dtio->command_event);
        ub_event_free(dtio->command_event);
+#ifndef USE_WINSOCK
        close(dtio->commandpipe[0]);
+#else
+       _close(dtio->commandpipe[0]);
+#endif
        dtio->commandpipe[0] = -1;
+       free(dtio->cur_msg);
+       dtio->cur_msg = NULL;
        ub_event_base_free(dtio->event_base);
 }
 
@@ -436,8 +780,13 @@ static void dtio_open_output(struct dt_io_thread* dtio)
        struct sockaddr_un s;
        dtio->fd = socket(AF_LOCAL, SOCK_STREAM, SOCK_CLOEXEC);
        if(dtio->fd == -1) {
+#ifndef USE_WINSOCK
                log_err("dnstap io: failed to create socket: %s",
                        strerror(errno));
+#else
+               log_err("dnstap io: failed to create socket: %s",
+                       wsa_strerror(WSAGetLastError()));
+#endif
                return;
        }
        memset(&s, 0, sizeof(s));
@@ -450,8 +799,17 @@ static void dtio_open_output(struct dt_io_thread* dtio)
         (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
        if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
                == -1) {
+#ifndef USE_WINSOCK
                log_err("dnstap io: failed to connect: %s", strerror(errno));
+#else
+               log_err("dnstap io: failed to connect: %s",
+                       wsa_strerror(WSAGetLastError()));
+#endif
+#ifndef USE_WINSOCK
                close(dtio->fd);
+#else
+               closesocket(dtio->fd);
+#endif
                dtio->fd = -1;
                return;
        }
@@ -462,7 +820,11 @@ static void dtio_open_output(struct dt_io_thread* dtio)
                UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
                dtio);
        if(!ev) {
+#ifndef USE_WINSOCK
                close(dtio->fd);
+#else
+               closesocket(dtio->fd);
+#endif
                dtio->fd = -1;
                log_err("dnstap io: out of memory");
                return;
@@ -473,7 +835,11 @@ static void dtio_open_output(struct dt_io_thread* dtio)
        if(!dtio_control_start_send(dtio)) {
                ub_event_free(dtio->event);
                dtio->event = NULL;
+#ifndef USE_WINSOCK
                close(dtio->fd);
+#else
+               closesocket(dtio->fd);
+#endif
                dtio->fd = -1;
                log_err("dnstap io: out of memory");
                return;
@@ -521,10 +887,18 @@ static void* dnstap_io(void* arg)
 int dt_io_thread_start(struct dt_io_thread* dtio)
 {
        /* set up the thread, can fail */
+#ifndef USE_WINSOCK
        if(pipe(dtio->commandpipe) == -1) {
                log_err("failed to create pipe: %s", strerror(errno));
                return 0;
        }
+#else
+       if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
+               log_err("failed to create _pipe: %s",
+                       wsa_strerror(WSAGetLastError()));
+               return 0;
+       }
+#endif
 
        /* start the thread */
        ub_thread_create(&dtio->tid, dnstap_io, dtio);
@@ -540,15 +914,28 @@ void dt_io_thread_stop(struct dt_io_thread* dtio)
        while(1) {
                ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
                if(r == -1) {
+#ifndef USE_WINSOCK
                        if(errno == EINTR || errno == EAGAIN)
                                continue;
                        log_err("dnstap io stop: write: %s", strerror(errno));
+#else
+                       if(WSAGetLastError() == WSAEINPROGRESS)
+                               continue;
+                       if(WSAGetLastError() == WSAEWOULDBLOCK)
+                               continue;
+                       log_err("dnstap io stop: write: %s",
+                               wsa_strerror(WSAGetLastError()));
+#endif
                        break;
                }
                break;
        }
 
+#ifndef USE_WINSOCK
        close(dtio->commandpipe[1]);
+#else
+       _close(dtio->commandpipe[1]);
+#endif
        dtio->commandpipe[1] = -1;
        ub_thread_join(dtio->tid);
 }