From: W.C.A. Wijngaards Date: Wed, 22 Jan 2020 14:20:48 +0000 (+0100) Subject: write data and stop flush for dnstap io. X-Git-Tag: 1.11.0rc1~120^2~100 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a21ac9838deef24581d51c1a62b4194182712a38;p=thirdparty%2Funbound.git write data and stop flush for dnstap io. --- diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index 1f833f1f9..9d7a465cd 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -292,15 +292,170 @@ static int dtio_find_msg(struct dt_io_thread* dtio) return 0; } +/** close and stop the output file descriptor event */ +static void dtio_close_output(struct dt_io_thread* dtio) +{ + if(!dtio->event) + return; + ub_event_free(dtio->event); + dtio->event = NULL; +#ifndef USE_WINSOCK + close(dtio->fd); +#else + closesocket(dtio->fd); +#endif + dtio->fd = -1; +} + +/** write buffer to output. + * returns number of bytes written, 0 if nothing happened, + * try again later, or -1 if the channel is to be closed. */ +static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, + size_t len) +{ + ssize_t ret; + if(dtio->fd == -1) + return -1; + ret = send(dtio->fd, buf, len, 0); + if(ret == -1) { +#ifndef USE_WINSOCK + if(errno == EINTR || errno == EAGAIN) + return 0; + log_err("dnstap io: failed send: %s", strerror(errno)); +#else + if(WSAGetLastError() == WSAEINPROGRESS) + return 0; + if(WSAGetLastError() == WSAEWOULDBLOCK) { + ub_winsock_tcp_wouldblock(dtio->event, UB_EV_WRITE); + return 0; + } + log_err("dnstap io: failed send: %s", + wsa_strerror(WSAGetLastError())); +#endif + return -1; + } + return ret; +} + +#ifdef HAVE_WRITEV +/** write with writev, len and message, in one write, if possible. + * return true if message is done, false if incomplete */ +static int dtio_write_with_writev(struct dt_io_thread* dtio) +{ + uint32_t sendlen = htonl(dtio->cur_msg_len); + struct iovec iov[2]; + ssize_t r; + iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; + iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; + iov[1].iov_base = dtio->cur_msg; + iov[1].iov_len = dtio->cur_msg_len; + log_assert(iov[0].iov_len > 0); + r = writev(dtio->fd, iov, 2); + if(r == -1) { +#ifndef USE_WINSOCK + if(errno == EINTR || errno == EAGAIN) + return 0; + log_err("dnstap io: failed writev: %s", strerror(errno)); +#else + if(WSAGetLastError() == WSAEINPROGRESS) + return 0; + if(WSAGetLastError() == WSAEWOULDBLOCK) { + ub_winsock_tcp_wouldblock(dtio->event, UB_EV_WRITE); + return 0; + } + log_err("dnstap io: failed writev: %s", + wsa_strerror(WSAGetLastError())); +#endif + return -1; + } + /* written r bytes */ + dtio->cur_msg_len_done += r; + if(dtio->cur_msg_len_done < 4) + return 0; + if(dtio->cur_msg_len_done > 4) { + dtio->cur_msg_done = dtio->cur_msg_len_done-4; + dtio->cur_msg_len_done = 4; + } + if(dtio->cur_msg_done < dtio->cur_msg_len) + return 0; + return 1; +} +#endif /* HAVE_WRITEV */ + +/** write more of the length, preceding the data frame. + * return true if message is done, false if incomplete. */ +static int dtio_write_more_of_len(struct dt_io_thread* dtio) +{ +#ifndef HAVE_WRITEV + uint32_t sendlen = htonl(dtio->cur_msg_len); + int r; +#endif + if(dtio->cur_msg_len_done >= 4) + return 1; +#ifdef HAVE_WRITEV + /* we try writev for everything.*/ + return dtio_write_with_writev(dtio); +#else + r = dtio_write_buf(dtio, + ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, + sizeof(sendlen)-dtio->cur_msg_len_done); + if(r == -1) { + /* close the channel */ + dtio_close_output(dtio); + return 0; + } else if(r == 0) { + /* try again later */ + return 0; + } + dtio->cur_msg_len_done += r; + if(dtio->cur_msg_len_done < 4) + return 0; + return 1; +#endif /* HAVE_WRITEV */ +} + +/** write more of the data frame. + * return true if message is done, false if incomplete. */ +static int dtio_write_more_of_data(struct dt_io_thread* dtio) +{ + int r; + if(dtio->cur_msg_done >= dtio->cur_msg_len) + return 1; + r = dtio_write_buf(dtio, + ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, + dtio->cur_msg_len - dtio->cur_msg_done); + if(r == -1) { + /* close the channel */ + dtio_close_output(dtio); + return 0; + } else if(r == 0) { + /* try again later */ + return 0; + } + dtio->cur_msg_done += r; + if(dtio->cur_msg_done < dtio->cur_msg_len) + return 0; + return 1; +} + /** write more of the current messsage. false if incomplete, true if * the message is done */ -static int dtio_write_more(int fd, struct dt_io_thread* dtio) +static int dtio_write_more(struct dt_io_thread* dtio) { - return 0; + if(dtio->cur_msg_len_done < 4) { + if(!dtio_write_more_of_len(dtio)) + return 0; + } + if(dtio->cur_msg_done < dtio->cur_msg_len) { + if(!dtio_write_more_of_data(dtio)) + return 0; + } + return 1; } /** callback for the dnstap events, to write to the output */ -static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg) +static void dtio_output_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), + void* arg) { struct dt_io_thread* dtio = (struct dt_io_thread*)arg; @@ -312,7 +467,7 @@ static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg) /* write it */ if(dtio->cur_msg_done < dtio->cur_msg_len) { - if(!dtio_write_more(fd, dtio)) + if(!dtio_write_more(dtio)) return; } @@ -334,15 +489,25 @@ static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) return; r = read(fd, &cmd, sizeof(cmd)); if(r == -1) { +#ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return; /* ignore this */ log_err("dnstap io: failed to read: %s", strerror(errno)); +#else + if(WSAGetLastError() == WSAEINPROGRESS) + return; + if(WSAGetLastError() == WSAEWOULDBLOCK) + return; + log_err("dnstap io: failed to read: %s", + wsa_strerror(WSAGetLastError())); +#endif /* and then fall through to quit the thread */ - } - if(r == 0) { + } else if(r == 0) { verbose(VERB_ALGO, "dnstap io: cmd channel closed"); } else if(r == 1 && cmd == 0) { verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); + } else if(r == 1) { + verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); } dtio->want_to_exit = 1; if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) @@ -387,26 +552,205 @@ static void dtio_del_output_event(struct dt_io_thread* dtio) dtio->event_added = 0; } -/** close and stop the output file descriptor event */ -static void dtio_close_output(struct dt_io_thread* dtio) +/** + * structure to keep track of information during stop flush + */ +struct stop_flush_info { + /** the event base during stop flush */ + struct ub_event_base* base; + /** did we already want to exit this stop-flush event base */ + int want_to_exit_flush; + /** has the timer fired */ + int timer_done; + /** the dtio */ + struct dt_io_thread* dtio; + /** the stop control frame */ + void* stop_frame; + /** length of the stop frame */ + size_t stop_frame_len; + /** how much we have done of the stop frame */ + size_t stop_frame_done; +}; + +/** exit the stop flush base */ +static void dtio_stop_flush_exit(struct stop_flush_info* info) { - if(!dtio->event) + if(info->want_to_exit_flush) return; - ub_event_free(dtio->event); - dtio->event = NULL; - close(dtio->fd); - dtio->fd = -1; + info->want_to_exit_flush = 1; + if(ub_event_base_loopexit(info->base) != 0) { + log_err("dnstap io: could not loopexit"); + } +} + +/** send the stop control, + * return true if completed the frame. */ +static int dtio_control_stop_send(struct stop_flush_info* info) +{ + struct dt_io_thread* dtio = info->dtio; + int r; + if(info->stop_frame_done >= info->stop_frame_len) + return 1; + r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + + info->stop_frame_done, info->stop_frame_len - + info->stop_frame_done); + if(r == -1) { + verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); + dtio_stop_flush_exit(info); + return 0; + } + if(r == 0) { + /* try again later, or timeout */ + return 0; + } + info->stop_frame_done += r; + if(info->stop_frame_done < info->stop_frame_len) + return 0; /* not done yet */ + return 1; +} + +static void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), + void* arg) +{ + struct stop_flush_info* info = (struct stop_flush_info*)arg; + if(info->want_to_exit_flush) + return; + verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); + info->timer_done = 1; + dtio_stop_flush_exit(info); +} + +static void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), + void* arg) +{ + struct stop_flush_info* info = (struct stop_flush_info*)arg; + struct dt_io_thread* dtio = info->dtio; + if(info->want_to_exit_flush) + return; + /* write remainder of last frame */ + if(dtio->cur_msg) { + if(dtio->cur_msg_done < dtio->cur_msg_len) { + if(!dtio_write_more(dtio)) { + if(dtio->fd == -1) { + verbose(VERB_ALGO, "dnstap io: " + "stop flush: output closed"); + dtio_stop_flush_exit(info); + } + return; + } + } + verbose(VERB_ALGO, "dnstap io: stop flush completed " + "last frame"); + free(dtio->cur_msg); + dtio->cur_msg = NULL; + dtio->cur_msg_len = 0; + dtio->cur_msg_done = 0; + dtio->cur_msg_len_done = 0; + } + /* write stop frame */ + if(info->stop_frame_done < info->stop_frame_len) { + if(!dtio_control_stop_send(info)) + return; + verbose(VERB_ALGO, "dnstap io: stop flush completed " + "stop control frame"); + } + /* when last frame and stop frame are sent, exit */ + dtio_stop_flush_exit(info); +} + +/** flush at end, last packet and stop control */ +static void dtio_control_stop_flush(struct dt_io_thread* dtio) +{ + /* briefly attempt to flush the previous packet to the output, + * this could be a partial packet, or even the start control frame */ + time_t secs = 0; + struct timeval now; + struct stop_flush_info info; + struct timeval tv; + struct ub_event* timer, *stopev; + memset(&info, 0, sizeof(info)); + memset(&now, 0, sizeof(now)); + info.base = ub_default_event_base(0, &secs, &now); + if(!info.base) { + log_err("dnstap io: malloc failure"); + return; + } + timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, + &dtio_stop_timer_cb, &info); + if(!timer) { + log_err("dnstap io: malloc failure"); + ub_event_base_free(info.base); + return; + } + memset(&tv, 0, sizeof(tv)); + tv.tv_sec = 2; + if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, + &tv) != 0) { + log_err("dnstap io: cannot event_timer_add"); + ub_event_free(timer); + ub_event_base_free(info.base); + return; + } + stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | + UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); + if(!stopev) { + log_err("dnstap io: malloc failure"); + ub_event_del(timer); + ub_event_free(timer); + ub_event_base_free(info.base); + return; + } + if(ub_event_add(stopev, NULL) != 0) { + log_err("dnstap io: cannot event_add"); + ub_event_free(stopev); + ub_event_del(timer); + ub_event_free(timer); + ub_event_base_free(info.base); + return; + } + info.stop_frame = fstrm_create_control_frame_stop( + &info.stop_frame_len); + if(!info.stop_frame) { + log_err("dnstap io: malloc failure"); + ub_event_del(stopev); + ub_event_free(stopev); + ub_event_del(timer); + ub_event_free(timer); + ub_event_base_free(info.base); + return; + } + + /* wait briefly, or until finished */ + verbose(VERB_ALGO, "dnstap io: stop flush started"); + if(ub_event_base_dispatch(info.base) < 0) { + log_err("dnstap io: dispatch flush failed, errno is %s", + strerror(errno)); + } + verbose(VERB_ALGO, "dnstap io: stop flush ended"); + free(info.stop_frame); + ub_event_del(stopev); + ub_event_free(stopev); + ub_event_del(timer); + ub_event_free(timer); + ub_event_base_free(info.base); } /** perform desetup and free stuff when the dnstap io thread exits */ static void dtio_desetup(struct dt_io_thread* dtio) { + dtio_control_stop_flush(dtio); dtio_del_output_event(dtio); dtio_close_output(dtio); ub_event_del(dtio->command_event); ub_event_free(dtio->command_event); +#ifndef USE_WINSOCK close(dtio->commandpipe[0]); +#else + _close(dtio->commandpipe[0]); +#endif dtio->commandpipe[0] = -1; + free(dtio->cur_msg); + dtio->cur_msg = NULL; ub_event_base_free(dtio->event_base); } @@ -436,8 +780,13 @@ static void dtio_open_output(struct dt_io_thread* dtio) struct sockaddr_un s; dtio->fd = socket(AF_LOCAL, SOCK_STREAM, SOCK_CLOEXEC); if(dtio->fd == -1) { +#ifndef USE_WINSOCK log_err("dnstap io: failed to create socket: %s", strerror(errno)); +#else + log_err("dnstap io: failed to create socket: %s", + wsa_strerror(WSAGetLastError())); +#endif return; } memset(&s, 0, sizeof(s)); @@ -450,8 +799,17 @@ static void dtio_open_output(struct dt_io_thread* dtio) (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) == -1) { +#ifndef USE_WINSOCK log_err("dnstap io: failed to connect: %s", strerror(errno)); +#else + log_err("dnstap io: failed to connect: %s", + wsa_strerror(WSAGetLastError())); +#endif +#ifndef USE_WINSOCK close(dtio->fd); +#else + closesocket(dtio->fd); +#endif dtio->fd = -1; return; } @@ -462,7 +820,11 @@ static void dtio_open_output(struct dt_io_thread* dtio) UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, dtio); if(!ev) { +#ifndef USE_WINSOCK close(dtio->fd); +#else + closesocket(dtio->fd); +#endif dtio->fd = -1; log_err("dnstap io: out of memory"); return; @@ -473,7 +835,11 @@ static void dtio_open_output(struct dt_io_thread* dtio) if(!dtio_control_start_send(dtio)) { ub_event_free(dtio->event); dtio->event = NULL; +#ifndef USE_WINSOCK close(dtio->fd); +#else + closesocket(dtio->fd); +#endif dtio->fd = -1; log_err("dnstap io: out of memory"); return; @@ -521,10 +887,18 @@ static void* dnstap_io(void* arg) int dt_io_thread_start(struct dt_io_thread* dtio) { /* set up the thread, can fail */ +#ifndef USE_WINSOCK if(pipe(dtio->commandpipe) == -1) { log_err("failed to create pipe: %s", strerror(errno)); return 0; } +#else + if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { + log_err("failed to create _pipe: %s", + wsa_strerror(WSAGetLastError())); + return 0; + } +#endif /* start the thread */ ub_thread_create(&dtio->tid, dnstap_io, dtio); @@ -540,15 +914,28 @@ void dt_io_thread_stop(struct dt_io_thread* dtio) while(1) { ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); if(r == -1) { +#ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) continue; log_err("dnstap io stop: write: %s", strerror(errno)); +#else + if(WSAGetLastError() == WSAEINPROGRESS) + continue; + if(WSAGetLastError() == WSAEWOULDBLOCK) + continue; + log_err("dnstap io stop: write: %s", + wsa_strerror(WSAGetLastError())); +#endif break; } break; } +#ifndef USE_WINSOCK close(dtio->commandpipe[1]); +#else + _close(dtio->commandpipe[1]); +#endif dtio->commandpipe[1] = -1; ub_thread_join(dtio->tid); }