]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
dnstap io, reconnect attempts with exponential backoff to once per second.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 23 Jan 2020 14:11:08 +0000 (15:11 +0100)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Thu, 23 Jan 2020 14:11:08 +0000 (15:11 +0100)
dnstap/dtstream.c
dnstap/dtstream.h

index bbf202c57ad20068ae73378e342138ceb701ae33..f3d50a0275a286c82d7b4631eab4cef94265fd38 100644 (file)
 
 /** number of messages to process in one output callback */
 #define DTIO_MESSAGES_PER_CALLBACK 100
+/** the msec to wait for reconnect (if not immediate, the first attempt) */
+#define DTIO_RECONNECT_TIMEOUT_MIN 10
+/** the msec to wait for reconnect max after backoff */
+#define DTIO_RECONNECT_TIMEOUT_MAX 1000
+
 /** DTIO command channel commands */
 enum {
        /** DTIO command channel stop */
@@ -60,6 +65,13 @@ enum {
        DTIO_COMMAND_WAKEUP = 1
 } dtio_channel_command;
 
+/** open the output channel */
+static void dtio_open_output(struct dt_io_thread* dtio);
+/** add output event for read and write */
+static void dtio_add_output_event_write(struct dt_io_thread* dtio);
+/** start reconnection attempts */
+static void dtio_reconnect_enable(struct dt_io_thread* dtio);
+
 void* fstrm_create_control_frame_start(char* contenttype, size_t* len)
 {
        uint32_t* control;
@@ -359,6 +371,73 @@ static int dtio_find_msg(struct dt_io_thread* dtio)
        return 0;
 }
 
+/** callback for the dnstap reconnect, to start reconnecting to output */
+static void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
+       short ATTR_UNUSED(bits), void* arg)
+{
+       struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
+       dtio->reconnect_is_added = 0;
+       verbose(VERB_ALGO, "dnstap io: reconnect timer");
+
+       dtio_open_output(dtio);
+       if(dtio->event) {
+               dtio_add_output_event_write(dtio);
+               /* nothing wrong so far, wait on the output event */
+               return;
+       }
+       /* exponential backoff and retry on timer */
+       dtio_reconnect_enable(dtio);
+}
+
+/** attempt to reconnect to the output, after a timeout */
+static void dtio_reconnect_enable(struct dt_io_thread* dtio)
+{
+       struct timeval tv;
+       int msec;
+       if(dtio->reconnect_is_added)
+               return; /* already done */
+
+       /* exponential backoff, store the value for next timeout */
+       msec = dtio->reconnect_timeout;
+       if(msec == 0) {
+               dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
+       } else {
+               dtio->reconnect_timeout = msec*2;
+               if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
+                       dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
+       }
+       verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
+               msec);
+
+       /* setup wait timer */
+       memset(&tv, 0, sizeof(tv));
+       tv.tv_sec = msec/1000;
+       tv.tv_usec = (msec%1000)*1000;
+       if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
+               &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
+               log_err("dnstap io: could not reconnect ev timer add");
+               return;
+       }
+       dtio->reconnect_is_added = 1;
+}
+
+/** remove dtio reconnect timer */
+static void dtio_reconnect_del(struct dt_io_thread* dtio)
+{
+       if(!dtio->reconnect_is_added)
+               return;
+       ub_timer_del(dtio->reconnect_timer);
+       dtio->reconnect_is_added = 0;
+}
+
+/** clear the reconnect exponential backoff timer.
+ * We have successfully connected so we can try again with short timeouts. */
+static void dtio_reconnect_clear(struct dt_io_thread* dtio)
+{
+       dtio->reconnect_timeout = 0;
+       dtio_reconnect_del(dtio);
+}
+
 /** delete the current message in the dtio, and reset counters */
 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
 {
@@ -399,6 +478,7 @@ static void dtio_close_output(struct dt_io_thread* dtio)
        if(dtio->cur_msg) {
                dtio_cur_msg_free(dtio);
        }
+       dtio_reconnect_enable(dtio);
 }
 
 /** check for pending nonblocking connect errors,
@@ -443,6 +523,7 @@ static int dtio_check_nb_connect(struct dt_io_thread* dtio)
        }
 
        verbose(VERB_ALGO, "dnstap io: connected to \"%s\"", dtio->socket_path);
+       dtio_reconnect_clear(dtio);
        dtio->check_nb_connect = 0;
        return 1; /* everything okay */
 }
@@ -822,6 +903,17 @@ static void dtio_setup_cmd(struct dt_io_thread* dtio)
        }
 }
 
+/** setup the reconnect event for dnstap io */
+static void dtio_setup_reconnect(struct dt_io_thread* dtio)
+{
+       dtio_reconnect_clear(dtio);
+       dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
+               UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
+       if(!dtio->reconnect_timer) {
+               fatal_exit("dnstap io: out of memory");
+       }
+}
+
 /**
  * structure to keep track of information during stop flush
  */
@@ -1032,6 +1124,8 @@ static void dtio_desetup(struct dt_io_thread* dtio)
        _close(dtio->commandpipe[0]);
 #endif
        dtio->commandpipe[0] = -1;
+       dtio_reconnect_del(dtio);
+       ub_event_free(dtio->reconnect_timer);
        dtio_cur_msg_free(dtio);
        ub_event_base_free(dtio->event_base);
 }
@@ -1096,6 +1190,7 @@ static void dtio_open_output(struct dt_io_thread* dtio)
                closesocket(dtio->fd);
 #endif
                dtio->fd = -1;
+               dtio_reconnect_enable(dtio);
                return;
        }
        dtio->check_nb_connect = 1;
@@ -1105,19 +1200,21 @@ static void dtio_open_output(struct dt_io_thread* dtio)
                UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
                dtio);
        if(!ev) {
+               log_err("dnstap io: out of memory");
 #ifndef USE_WINSOCK
                close(dtio->fd);
 #else
                closesocket(dtio->fd);
 #endif
                dtio->fd = -1;
-               log_err("dnstap io: out of memory");
+               dtio_reconnect_enable(dtio);
                return;
        }
        dtio->event = ev;
 
        /* setup protocol control message to start */
        if(!dtio_control_start_send(dtio)) {
+               log_err("dnstap io: out of memory");
                ub_event_free(dtio->event);
                dtio->event = NULL;
 #ifndef USE_WINSOCK
@@ -1126,7 +1223,7 @@ static void dtio_open_output(struct dt_io_thread* dtio)
                closesocket(dtio->fd);
 #endif
                dtio->fd = -1;
-               log_err("dnstap io: out of memory");
+               dtio_reconnect_enable(dtio);
                return;
        }
 }
@@ -1139,11 +1236,12 @@ static void* dnstap_io(void* arg)
        struct timeval now;
 
        /* setup */
+       verbose(VERB_ALGO, "start dnstap io thread");
        dtio_setup_base(dtio, &secs, &now);
        dtio_setup_cmd(dtio);
+       dtio_setup_reconnect(dtio);
        dtio_open_output(dtio);
        dtio_add_output_event_write(dtio);
-       verbose(VERB_ALGO, "start dnstap io thread");
 
        /* run */
        if(ub_event_base_dispatch(dtio->event_base) < 0) {
index 77bda6ae0bf3cb14cf761d93870998a48255b691..2f15465277ca0048e67d64ebe8ab0cacad9bd7e6 100644 (file)
@@ -102,6 +102,7 @@ struct dt_io_thread {
        struct dt_io_list_item* io_list_iter;
        /** thread id, of the io thread */
        ub_thread_type tid;
+
        /** file descriptor that the thread writes to */
        int fd;
        /** event structure that the thread uses */
@@ -112,6 +113,7 @@ struct dt_io_thread {
        int event_added_is_write;
        /** check for nonblocking connect errors on fd */
        int check_nb_connect;
+
        /** the buffer that currently getting written, or NULL if no
         * (partial) message written now */
        void* cur_msg;
@@ -131,6 +133,14 @@ struct dt_io_thread {
        /** the io thread wants to exit */
        int want_to_exit;
 
+       /** the timer event for connection retries */
+       void* reconnect_timer;
+       /** if the reconnect timer is added to the event base */
+       int reconnect_is_added;
+       /** the current reconnection timeout, it is increased with
+        * exponential backoff, in msec */
+       int reconnect_timeout;
+
        /** If the log server is connected to over unix domain sockets,
         * eg. a file is named that is created to log onto. */
        int upstream_is_unix;