/** number of messages to process in one output callback */
#define DTIO_MESSAGES_PER_CALLBACK 100
+/** the msec to wait for reconnect (if not immediate, the first attempt) */
+#define DTIO_RECONNECT_TIMEOUT_MIN 10
+/** the msec to wait for reconnect max after backoff */
+#define DTIO_RECONNECT_TIMEOUT_MAX 1000
+
/** DTIO command channel commands */
enum {
/** DTIO command channel stop */
DTIO_COMMAND_WAKEUP = 1
} dtio_channel_command;
+/** open the output channel */
+static void dtio_open_output(struct dt_io_thread* dtio);
+/** add output event for read and write */
+static void dtio_add_output_event_write(struct dt_io_thread* dtio);
+/** start reconnection attempts */
+static void dtio_reconnect_enable(struct dt_io_thread* dtio);
+
void* fstrm_create_control_frame_start(char* contenttype, size_t* len)
{
uint32_t* control;
return 0;
}
+/** callback for the dnstap reconnect, to start reconnecting to output */
+static void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
+ short ATTR_UNUSED(bits), void* arg)
+{
+ struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+ dtio->reconnect_is_added = 0;
+ verbose(VERB_ALGO, "dnstap io: reconnect timer");
+
+ dtio_open_output(dtio);
+ if(dtio->event) {
+ dtio_add_output_event_write(dtio);
+ /* nothing wrong so far, wait on the output event */
+ return;
+ }
+ /* exponential backoff and retry on timer */
+ dtio_reconnect_enable(dtio);
+}
+
+/** attempt to reconnect to the output, after a timeout */
+static void dtio_reconnect_enable(struct dt_io_thread* dtio)
+{
+ struct timeval tv;
+ int msec;
+ if(dtio->reconnect_is_added)
+ return; /* already done */
+
+ /* exponential backoff, store the value for next timeout */
+ msec = dtio->reconnect_timeout;
+ if(msec == 0) {
+ dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
+ } else {
+ dtio->reconnect_timeout = msec*2;
+ if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
+ dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
+ }
+ verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
+ msec);
+
+ /* setup wait timer */
+ memset(&tv, 0, sizeof(tv));
+ tv.tv_sec = msec/1000;
+ tv.tv_usec = (msec%1000)*1000;
+ if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
+ &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
+ log_err("dnstap io: could not reconnect ev timer add");
+ return;
+ }
+ dtio->reconnect_is_added = 1;
+}
+
+/** remove dtio reconnect timer */
+static void dtio_reconnect_del(struct dt_io_thread* dtio)
+{
+ if(!dtio->reconnect_is_added)
+ return;
+ ub_timer_del(dtio->reconnect_timer);
+ dtio->reconnect_is_added = 0;
+}
+
+/** clear the reconnect exponential backoff timer.
+ * We have successfully connected so we can try again with short timeouts. */
+static void dtio_reconnect_clear(struct dt_io_thread* dtio)
+{
+ dtio->reconnect_timeout = 0;
+ dtio_reconnect_del(dtio);
+}
+
/** delete the current message in the dtio, and reset counters */
static void dtio_cur_msg_free(struct dt_io_thread* dtio)
{
if(dtio->cur_msg) {
dtio_cur_msg_free(dtio);
}
+ dtio_reconnect_enable(dtio);
}
/** check for pending nonblocking connect errors,
}
verbose(VERB_ALGO, "dnstap io: connected to \"%s\"", dtio->socket_path);
+ dtio_reconnect_clear(dtio);
dtio->check_nb_connect = 0;
return 1; /* everything okay */
}
}
}
+/** setup the reconnect event for dnstap io */
+static void dtio_setup_reconnect(struct dt_io_thread* dtio)
+{
+ dtio_reconnect_clear(dtio);
+ dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
+ UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
+ if(!dtio->reconnect_timer) {
+ fatal_exit("dnstap io: out of memory");
+ }
+}
+
/**
* structure to keep track of information during stop flush
*/
_close(dtio->commandpipe[0]);
#endif
dtio->commandpipe[0] = -1;
+ dtio_reconnect_del(dtio);
+ ub_event_free(dtio->reconnect_timer);
dtio_cur_msg_free(dtio);
ub_event_base_free(dtio->event_base);
}
closesocket(dtio->fd);
#endif
dtio->fd = -1;
+ dtio_reconnect_enable(dtio);
return;
}
dtio->check_nb_connect = 1;
UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
dtio);
if(!ev) {
+ log_err("dnstap io: out of memory");
#ifndef USE_WINSOCK
close(dtio->fd);
#else
closesocket(dtio->fd);
#endif
dtio->fd = -1;
- log_err("dnstap io: out of memory");
+ dtio_reconnect_enable(dtio);
return;
}
dtio->event = ev;
/* setup protocol control message to start */
if(!dtio_control_start_send(dtio)) {
+ log_err("dnstap io: out of memory");
ub_event_free(dtio->event);
dtio->event = NULL;
#ifndef USE_WINSOCK
closesocket(dtio->fd);
#endif
dtio->fd = -1;
- log_err("dnstap io: out of memory");
+ dtio_reconnect_enable(dtio);
return;
}
}
struct timeval now;
/* setup */
+ verbose(VERB_ALGO, "start dnstap io thread");
dtio_setup_base(dtio, &secs, &now);
dtio_setup_cmd(dtio);
+ dtio_setup_reconnect(dtio);
dtio_open_output(dtio);
dtio_add_output_event_write(dtio);
- verbose(VERB_ALGO, "start dnstap io thread");
/* run */
if(ub_event_base_dispatch(dtio->event_base) < 0) {