/** the msec to wait for reconnect max after backoff */
#define DTIO_RECONNECT_TIMEOUT_MAX 1000
+struct stop_flush_info;
/** DTIO command channel commands */
enum {
/** DTIO command channel stop */
/** open the output channel */
static void dtio_open_output(struct dt_io_thread* dtio);
/** add output event for read and write */
-static void dtio_add_output_event_write(struct dt_io_thread* dtio);
+static int dtio_add_output_event_write(struct dt_io_thread* dtio);
/** start reconnection attempts */
static void dtio_reconnect_enable(struct dt_io_thread* dtio);
+/** stop from stop_flush event loop */
+static void dtio_stop_flush_exit(struct stop_flush_info* info);
struct dt_msg_queue*
dt_msg_queue_create(void)
dtio_open_output(dtio);
if(dtio->event) {
- dtio_add_output_event_write(dtio);
+ if(!dtio_add_output_event_write(dtio))
+ return;
/* nothing wrong so far, wait on the output event */
return;
}
ssize_t ret;
if(dtio->fd == -1)
return -1;
- if(dtio->check_nb_connect) {
- int connect_err = dtio_check_nb_connect(dtio);
- if(connect_err == -1) {
- return -1;
- } else if(connect_err == 0) {
- return 0;
- }
- }
ret = send(dtio->fd, (void*)buf, len, 0);
if(ret == -1) {
#ifndef USE_WINSOCK
uint32_t sendlen = htonl(dtio->cur_msg_len);
struct iovec iov[2];
ssize_t r;
- if(dtio->check_nb_connect) {
- int connect_err = dtio_check_nb_connect(dtio);
- if(connect_err == -1) {
- /* close the channel */
- dtio_del_output_event(dtio);
- dtio_close_output(dtio);
- return 0;
- } else if(connect_err == 0) {
- return 0;
- }
- }
iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
iov[1].iov_base = dtio->cur_msg;
}
/** add the output file descriptor event for listening, read only */
-static void dtio_add_output_event_read(struct dt_io_thread* dtio)
+static int dtio_add_output_event_read(struct dt_io_thread* dtio)
{
if(!dtio->event)
- return;
+ return 0;
if(dtio->event_added && !dtio->event_added_is_write)
- return;
+ return 1;
/* we have to (re-)register the event */
if(dtio->event_added)
ub_event_del(dtio->event);
dtio->event_added_is_write = 0;
/* close output and start reattempts to open it */
dtio_close_output(dtio);
- return;
+ return 0;
}
dtio->event_added = 1;
dtio->event_added_is_write = 0;
+ return 1;
}
/** add the output file descriptor event for listening, read and write */
-static void dtio_add_output_event_write(struct dt_io_thread* dtio)
+static int dtio_add_output_event_write(struct dt_io_thread* dtio)
{
if(!dtio->event)
- return;
+ return 0;
if(dtio->event_added && dtio->event_added_is_write)
- return;
+ return 1;
/* we have to (re-)register the event */
if(dtio->event_added)
ub_event_del(dtio->event);
dtio->event_added_is_write = 0;
/* close output and start reattempts to open it */
dtio_close_output(dtio);
- return;
+ return 0;
}
dtio->event_added = 1;
dtio->event_added_is_write = 1;
+ return 1;
}
/** put the dtio thread to sleep */
{
/* unregister the event polling for write, because there is
* nothing to be written */
- dtio_add_output_event_read(dtio);
+ (void)dtio_add_output_event_read(dtio);
+}
+
+/** enable the brief read condition */
+static int dtio_enable_brief_read(struct dt_io_thread* dtio)
+{
+ dtio->ssl_brief_read = 1;
+ if(dtio->stop_flush_event) {
+ ub_event_del(dtio->stop_flush_event);
+ ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
+ if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
+ log_err("dnstap io, stop flush, could not ub_event_add");
+ return 0;
+ }
+ return 1;
+ }
+ return dtio_add_output_event_read(dtio);
+}
+
+/** disable the brief read condition */
+static int dtio_disable_brief_read(struct dt_io_thread* dtio)
+{
+ dtio->ssl_brief_read = 0;
+ if(dtio->stop_flush_event) {
+ ub_event_del(dtio->stop_flush_event);
+ ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
+ if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
+ log_err("dnstap io, stop flush, could not ub_event_add");
+ return 0;
+ }
+ return 1;
+ }
+ return dtio_add_output_event_write(dtio);
+}
+
+/** perform ssl handshake, returns 1 if okay, 0 to stop */
+static int dtio_ssl_handshake(struct dt_io_thread* dtio,
+ struct stop_flush_info* info)
+{
+ int r;
+ if(dtio->ssl_brief_read) {
+ /* assume the brief read condition is satisfied,
+ * if we need more or again, we can set it again */
+ if(!dtio_disable_brief_read(dtio)) {
+ if(info) dtio_stop_flush_exit(info);
+ return 0;
+ }
+ }
+ if(dtio->ssl_handshake_done)
+ return 1;
+
+ ERR_clear_error();
+ r = SSL_do_handshake(dtio->ssl);
+ if(r != 1) {
+ int want = SSL_get_error(dtio->ssl, r);
+ if(want == SSL_ERROR_WANT_READ) {
+ /* we want to read on the connection */
+ if(!dtio_enable_brief_read(dtio)) {
+ if(info) dtio_stop_flush_exit(info);
+ return 0;
+ }
+ return 0;
+ } else if(want == SSL_ERROR_WANT_WRITE) {
+ /* we want to write on the connection */
+ return 0;
+ } else if(r == 0) {
+ /* closed */
+ if(info) dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return 0;
+ } else if(want == SSL_ERROR_SYSCALL) {
+ /* SYSCALL and errno==0 means closed uncleanly */
+ int silent = 0;
+#ifdef EPIPE
+ if(errno == EPIPE && verbosity < 2)
+ silent = 1; /* silence 'broken pipe' */
+#endif
+#ifdef ECONNRESET
+ if(errno == ECONNRESET && verbosity < 2)
+ silent = 1; /* silence reset by peer */
+#endif
+ if(errno == 0)
+ silent = 1;
+ if(!silent)
+ log_err("dnstap io, SSL_handshake syscall: %s",
+ strerror(errno));
+ /* closed */
+ if(info) dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return 0;
+ } else {
+ unsigned long err = ERR_get_error();
+ if(!squelch_err_ssl_handshake(err)) {
+ log_crypto_err_code("dnstap io, ssl handshake failed",
+ err);
+ verbose(VERB_OPS, "dnstap io, ssl handshake failed "
+ "from %s", dtio->ip_str);
+ }
+ /* closed */
+ if(info) dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return 0;
+ }
+
+ }
+ /* check peer verification */
+ dtio->ssl_handshake_done = 1;
+ return 1;
}
/** callback for the dnstap events, to write to the output */
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
int i;
+ if(dtio->check_nb_connect) {
+ int connect_err = dtio_check_nb_connect(dtio);
+ if(connect_err == -1) {
+ /* close the channel */
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return;
+ } else if(connect_err == 0) {
+ /* try again later */
+ return;
+ }
+ /* nonblocking connect check passed, continue */
+ }
+
+ if(dtio->ssl &&
+ (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
+ if(!dtio_ssl_handshake(dtio, NULL))
+ return;
+ }
+
if((bits&UB_EV_READ)) {
if(!dtio_check_close(dtio))
return;
} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
/* reregister event */
- dtio_add_output_event_write(dtio);
+ if(!dtio_add_output_event_write(dtio))
+ return;
return;
} else if(r == 1) {
verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
struct dt_io_thread* dtio = info->dtio;
if(info->want_to_exit_flush)
return;
+ if(dtio->check_nb_connect) {
+ /* we don't start the stop_flush if connect still
+ * in progress, but the check code is here, just in case */
+ int connect_err = dtio_check_nb_connect(dtio);
+ if(connect_err == -1) {
+ /* close the channel, exit the stop flush */
+ dtio_stop_flush_exit(info);
+ dtio_del_output_event(dtio);
+ dtio_close_output(dtio);
+ return;
+ } else if(connect_err == 0) {
+ /* try again later */
+ return;
+ }
+ /* nonblocking connect check passed, continue */
+ }
+ if(dtio->ssl &&
+ (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
+ if(!dtio_ssl_handshake(dtio, info))
+ return;
+ }
+
if((bits&UB_EV_READ)) {
if(!dtio_check_close(dtio)) {
if(dtio->fd == -1) {
* sent yet, so nothing to stop or flush */
return;
}
+ if(dtio->ssl && !dtio->ssl_handshake_done) {
+ /* no SSL connection has been established yet */
+ return;
+ }
memset(&info, 0, sizeof(info));
memset(&now, 0, sizeof(now));
{
dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
if(!dtio->ssl) return 0;
+ dtio->ssl_handshake_done = 0;
+ dtio->ssl_brief_read = 0;
return 1;
}
dtio_setup_cmd(dtio);
dtio_setup_reconnect(dtio);
dtio_open_output(dtio);
- dtio_add_output_event_write(dtio);
+ if(!dtio_add_output_event_write(dtio))
+ return;
}
#ifndef THREADS_DISABLED