]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
Unificate handling of TCP timeouts for input and output connections; avoid redundant...
authorGrigorii Demidov <grigorii.demidov@nic.cz>
Thu, 11 Oct 2018 15:30:10 +0000 (17:30 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Fri, 12 Oct 2018 15:36:47 +0000 (17:36 +0200)
daemon/io.c
daemon/io.h
daemon/session.c
daemon/session.h
daemon/worker.c
daemon/worker.h

index 496d43a9d7f9cadf741c7b3ed842161aa712f42e..30d3723ce7076d80b29d6cc7651cc90c8f0a1a5e 100644 (file)
@@ -140,16 +140,45 @@ int udp_bindfd(uv_udp_t *handle, int fd)
        return udp_bind_finalize((uv_handle_t *)handle);
 }
 
-static void tcp_timeout_trigger(uv_timer_t *timer)
+void tcp_timeout_trigger(uv_timer_t *timer)
 {
        struct session *s = timer->data;
 
-       assert(!session_flags(s)->outgoing);
+       assert(!session_flags(s)->closing);
+       assert(session_waitinglist_is_empty(s));
+
+       struct worker_ctx *worker = timer->loop->data;
+
+       if (!session_tasklist_is_empty(s)) {
+               int finalized = session_tasklist_finalize_expired(s);
+               worker->stats.timeout += finalized;
+               /* session_tasklist_finalize_expired() may call worker_task_finalize().
+                * If session is a source session and there were IO errors,
+                * worker_task_finalize() can filnalize all tasks and close session. */
+               if (session_flags(s)->closing) {
+                       return;
+               }
+
+       }
        if (!session_tasklist_is_empty(s)) {
-               uv_timer_again(timer);
-       } else if (!session_flags(s)->closing) {
                uv_timer_stop(timer);
-               session_close(s);
+               session_timer_start(s, tcp_timeout_trigger,
+                                   KR_RESOLVE_TIME_LIMIT / 2,
+                                   KR_RESOLVE_TIME_LIMIT / 2);
+       } else {
+               const struct engine *engine = worker->engine;
+               const struct network *net = &engine->net;
+               uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
+               uint64_t last_activity = session_last_input_activity(s);
+               uint64_t idle_time = kr_now() - last_activity;
+               if (idle_time < idle_in_timeout) {
+                       idle_in_timeout -= idle_time;
+                       uv_timer_stop(timer);
+                       session_timer_start(s, tcp_timeout_trigger,
+                                           idle_in_timeout, idle_in_timeout);
+               } else {
+                       session_close(s);
+               }
        }
 }
 
@@ -206,14 +235,6 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
        if (ret < 0) {
                /* An error has occurred, close the session. */
                worker_end_tcp(s);
-       } else if (ret > 0 && !session_flags(s)->closing) {
-               /* Connection spawned at least one request
-                * or
-                * valid answer has been received from upstream.
-                * Reset deadline for next query.
-                * https://tools.ietf.org/html/rfc7766#section-6.2.3
-                */
-               session_timer_restart(s);
        }
        session_wirebuf_compress(s);
        mp_flush(worker->pkt_pool.ctx);
index 1b5e5791d32506b46ff2abeea99aed0b317d417b..c81b1c996fba295d6bdcef3abd4cfd249a9e3913 100644 (file)
@@ -31,6 +31,7 @@ int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr);
 int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr);
 int tcp_bindfd(uv_tcp_t *handle, int fd);
 int tcp_bindfd_tls(uv_tcp_t *handle, int fd);
+void tcp_timeout_trigger(uv_timer_t *timer);
 
 /** Initialize the handle, incl. ->data = struct session * instance.
  * \param type = SOCK_*
index 8ea88b9051a51f068d4189a943c3b8019d0a948e..c1a4d6f2f999f2d7bc9fcc225cbf29020b2f69ab 100644 (file)
  * that exists between remote counterpart and a local socket.
  */
 struct session {
-       struct session_flags sflags; /**< miscellaneous flags. */
-       union inaddr peer;           /**< address of peer; is not set for client's UDP sessions. */
-       uv_handle_t *handle;         /**< libuv handle for IO operations. */
-       uv_timer_t timeout;          /**< libuv handle for timer. */
+       struct session_flags sflags;  /**< miscellaneous flags. */
+       union inaddr peer;            /**< address of peer; is not set for client's UDP sessions. */
+       uv_handle_t *handle;          /**< libuv handle for IO operations. */
+       uv_timer_t timeout;           /**< libuv handle for timer. */
 
-       struct tls_ctx_t *tls_ctx;   /**< server side tls-related data. */
+       struct tls_ctx_t *tls_ctx;    /**< server side tls-related data. */
        struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
 
-       trie_t *tasks;               /**< list of tasks assotiated with given session. */
+       trie_t *tasks;                /**< list of tasks assotiated with given session. */
        queue_t(struct qr_task *) waiting;  /**< list of tasks waiting for sending to upstream. */
 
-       uint8_t *wire_buf;           /**< Buffer for DNS message. */
-       ssize_t wire_buf_size;       /**< Buffer size. */
-       ssize_t wire_buf_start_idx;  /**< Data start offset in wire_buf. */
-       ssize_t wire_buf_end_idx;    /**< Data end offset in wire_buf. */
+       uint8_t *wire_buf;            /**< Buffer for DNS message. */
+       ssize_t wire_buf_size;        /**< Buffer size. */
+       ssize_t wire_buf_start_idx;   /**< Data start offset in wire_buf. */
+       ssize_t wire_buf_end_idx;     /**< Data end offset in wire_buf. */
+       uint64_t last_input_activity; /**< Either creatoion time or time of peer's last activity */
 };
 
 static void on_session_close(uv_handle_t *handle)
@@ -160,7 +161,7 @@ int session_tasklist_add(struct session *session, struct qr_task *task)
                worker_task_ref(task);
        } else if (*v != task) {
                assert(false);
-               return kr_error(ENOMEM);
+               return kr_error(EINVAL);
        }
        return kr_ok();
 }
@@ -216,9 +217,10 @@ struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16
        trie_val_t val;
        int res = trie_del(t, key, key_len, &val);
        if (res == kr_ok()) {
-               ret = val;
-               assert(worker_task_numrefs(ret) > 1);
-               worker_task_unref(ret);
+               if (worker_task_numrefs(val) > 1) {
+                       ret = val;
+               }
+               worker_task_unref(val);
        }
        return ret;
 }
@@ -322,6 +324,7 @@ struct session *session_new(uv_handle_t *handle)
        session->handle = handle;
        handle->data = session;
        session->timeout.data = session;
+       session_touch(session);
 
        return session;
 }
@@ -366,12 +369,11 @@ void session_waitinglist_retry(struct session *session, bool increase_timeout_cn
 {
        while (!session_waitinglist_is_empty(session)) {
                struct qr_task *task = session_waitinglist_pop(session, false);
-               assert(worker_task_numrefs(task) > 1);
                if (increase_timeout_cnt) {
                        worker_task_timeout_inc(task);
                }
-               worker_task_unref(task);
                worker_task_step(task, NULL, NULL);
+               worker_task_unref(task);
        }
 }
 
@@ -379,13 +381,7 @@ void session_waitinglist_finalize(struct session *session, int status)
 {
        while (!session_waitinglist_is_empty(session)) {
                struct qr_task *t = session_waitinglist_pop(session, false);
-               if (session->sflags.outgoing) {
-                       worker_task_finalize(t, status);
-               } else {
-                       struct request_ctx *ctx = worker_task_get_request(t);
-                       assert(worker_request_get_source_session(ctx) == session);
-                       worker_request_set_source_session(ctx, NULL);
-               }
+               worker_task_finalize(t, status);
                worker_task_unref(t);
        }
 }
@@ -395,21 +391,62 @@ void session_tasklist_finalize(struct session *session, int status)
        while (session_tasklist_get_len(session) > 0) {
                struct qr_task *t = session_tasklist_del_first(session, false);
                assert(worker_task_numrefs(t) > 0);
-               if (session->sflags.outgoing) {
-                       worker_task_finalize(t, status);
-               } else {
-                       struct request_ctx *ctx = worker_task_get_request(t);
-                       assert(worker_request_get_source_session(ctx) == session);
-                       worker_request_set_source_session(ctx, NULL);
-               }
+               worker_task_finalize(t, status);
                worker_task_unref(t);
        }
 }
 
-void session_tasks_finalize(struct session *session, int status)
+int session_tasklist_finalize_expired(struct session *session)
 {
-       session_waitinglist_finalize(session, status);
-       session_tasklist_finalize(session, status);
+       int ret = 0;
+       queue_t(struct qr_task *) q;
+       uint64_t now = kr_now();
+       trie_t *t = session->tasks;
+       trie_it_t *it;
+       queue_init(q);
+       for (it = trie_it_begin(t); !trie_it_finished(it); trie_it_next(it)) {
+               trie_val_t *v = trie_it_val(it);
+               struct qr_task *task = (struct qr_task *)*v;
+               if ((now - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
+                       queue_push(q, task);
+                       worker_task_ref(task);
+               }
+       }
+       trie_it_free(it);
+
+       struct qr_task *task = NULL;
+       uint16_t msg_id = 0;
+       char *key = (char *)&task;
+       int32_t keylen = sizeof(struct qr_task *);
+       if (session->sflags.outgoing) {
+               key = (char *)&msg_id;
+               keylen = sizeof(msg_id);
+       }
+       while (queue_len(q) > 0) {
+               task = queue_head(q);
+               if (session->sflags.outgoing) {
+                       knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+                       msg_id = knot_wire_get_id(pktbuf->wire);
+               }
+               int res = trie_del(t, key, keylen, NULL);
+               if (!worker_task_finished(task)) {
+                       /* task->pending_count must be zero,
+                        * but there are can be followers,
+                        * so run worker_task_subreq_finalize() to ensure retrying
+                        * for all the followers. */
+                       worker_task_subreq_finalize(task);
+                       worker_task_finalize(task, KR_STATE_FAIL);
+               }
+               if (res == KNOT_EOK) {
+                       worker_task_unref(task);
+               }
+               queue_pop(q);
+               worker_task_unref(task);
+               ++ret;
+       }
+
+       queue_deinit(q);
+       return ret;
 }
 
 int session_timer_start(struct session *session, uv_timer_cb cb,
@@ -673,55 +710,30 @@ int session_wirebuf_process(struct session *session)
        return ret;
 }
 
-static void on_session_idle_timeout(uv_timer_t *timer)
+void session_kill_ioreq(struct session *s, struct qr_task *task)
 {
-       struct session *s = timer->data;
-       assert(s);
-       uv_timer_stop(timer);
-       if (s->sflags.closing) {
+       if (!s) {
                return;
        }
-       /* session was not in use during timer timeout
-        * remove it from connection list and close
-        */
-       assert(session_is_empty(s));
-       session_close(s);
-}
-
-void session_kill_ioreq(struct session *s, struct qr_task *task)
-{
-       assert(s && s->sflags.outgoing && s->handle);
+       assert(s->sflags.outgoing && s->handle);
        if (s->sflags.closing) {
                return;
        }
+       session_tasklist_del(s, task);
        if (s->handle->type == UV_UDP) {
-               uv_timer_stop(&s->timeout);
-               session_tasklist_del(s, task);
                assert(session_tasklist_is_empty(s));
                session_close(s);
                return;
        }
-       /* TCP-specific code now. */
-       if (s->handle->type != UV_TCP) abort();
-
-       int res = 0;
-
-       const struct sockaddr *peer = &s->peer.ip;
-       if (peer->sa_family != AF_UNSPEC && session_is_empty(s) && !s->sflags.closing) {
-               assert(peer->sa_family == AF_INET || peer->sa_family == AF_INET6);
-               res = 1;
-               if (s->sflags.connected) {
-                       /* This is outbound TCP connection which can be reused.
-                       * Close it after timeout */
-                       s->timeout.data = s;
-                       uv_timer_stop(&s->timeout);
-                       res = uv_timer_start(&s->timeout, on_session_idle_timeout,
-                                            KR_CONN_RTT_MAX, 0);
-               }
-       }
+}
 
-       if (res != 0) {
-               /* if any errors, close the session immediately */
-               session_close(s);
-       }
+/** Update timestamp */
+void session_touch(struct session *s)
+{
+       s->last_input_activity = kr_now();
+}
+
+uint64_t session_last_input_activity(struct session *s)
+{
+       return s->last_input_activity;
 }
index c0f68039a6e93ecd0c1f4a4269ef9c6e1bb239fb..5855be0a198b752e0ca6e1616005307c62eb14a8 100644 (file)
@@ -80,12 +80,12 @@ struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16
 struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
 /** Finalize all tasks in the list. */
 void session_tasklist_finalize(struct session *session, int status);
+/** Finalize all expired tasks in the list. */
+int session_tasklist_finalize_expired(struct session *session);
 
 /** Both of task lists (associated & waiting). */
 /** Check if empty. */
 bool session_is_empty(const struct session *session);
-/** Finalize all tasks. */
-void session_tasks_finalize(struct session *session, int status);
 /** Get pointer to session flags */
 struct session_flags *session_flags(struct session *session);
 /** Get peer address. */
@@ -141,3 +141,6 @@ knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
 int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
 
 void session_kill_ioreq(struct session *s, struct qr_task *task);
+/** Update timestamp */
+void session_touch(struct session *s);
+uint64_t session_last_input_activity(struct session *s);
index fccfc274816eff6abbddae508a46b134721d357f..389ac801e93c9744473caae5e974a23143c0a4cb 100644 (file)
@@ -89,6 +89,7 @@ struct qr_task
        uint32_t refs;
        bool finished : 1;
        bool leading  : 1;
+       uint64_t creation_time;
 };
 
 
@@ -97,8 +98,6 @@ struct qr_task
        do { ++(task)->refs; } while(0)
 #define qr_task_unref(task) \
        do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
-#define qr_valid_handle(task, checked) \
-       (!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))
 
 /** @internal get key for tcp session
  *  @note kr_straddr() return pointer to static string
@@ -124,7 +123,6 @@ static int worker_del_tcp_waiting(struct worker_ctx *worker,
 static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
                                               const struct sockaddr *addr);
 static void on_tcp_connect_timeout(uv_timer_t *timer);
-static void on_tcp_watchdog_timeout(uv_timer_t *timer);
 
 /** @internal Get singleton worker. */
 static inline struct worker_ctx *get_worker(void)
@@ -442,6 +440,7 @@ static struct qr_task *qr_task_create(struct request_ctx *ctx)
        ctx->task = task;
        /* Make the primary reference to task. */
        qr_task_ref(task);
+       task->creation_time = kr_now();
        ctx->worker->stats.concurrent += 1;
        return task;
 }
@@ -453,24 +452,8 @@ static void qr_task_free(struct qr_task *task)
 
        assert(ctx);
 
-       /* Process outbound session. */
-       struct session *s = ctx->source.session;
        struct worker_ctx *worker = ctx->worker;
 
-       /* Process source session. */
-       if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
-           !session_flags(s)->closing && session_flags(s)->throttled) {
-               uv_handle_t *handle = session_get_handle(s);
-               /* Start reading again if the session is throttled and
-                * the number of outgoing requests is below watermark. */
-               if (handle) {
-                       io_start_read(handle);
-                       session_flags(s)->throttled = false;
-               }
-       }
-
-       task->ctx = NULL;
-
        if (ctx->task == NULL) {
                request_free(ctx);
        }
@@ -515,6 +498,7 @@ static void qr_task_complete(struct qr_task *task)
        struct session *s = ctx->source.session;
        if (s) {
                assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
+               ctx->source.session = NULL;
                session_tasklist_del(s, task);
        }
 
@@ -771,7 +755,7 @@ static int session_tls_hs_cb(struct session *session, int status)
                session_close(session);
        } else {
                session_timer_stop(session);
-               session_timer_start(session, on_tcp_watchdog_timeout,
+               session_timer_start(session, tcp_timeout_trigger,
                                    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
        }
        return kr_ok();
@@ -810,8 +794,6 @@ static void on_connect(uv_connect_t *req, int status)
                return;
        }
 
-       session_timer_stop(session);
-
        if (status != 0) {
                worker_del_tcp_waiting(worker, peer);
                assert(session_tasklist_is_empty(session));
@@ -850,7 +832,8 @@ static void on_connect(uv_connect_t *req, int status)
                struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
                ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
                if (ret == kr_error(EAGAIN)) {
-                       session_timer_start(session, on_tcp_watchdog_timeout,
+                       session_timer_stop(session);
+                       session_timer_start(session, tcp_timeout_trigger,
                                            MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
                        return;
                }
@@ -861,15 +844,16 @@ static void on_connect(uv_connect_t *req, int status)
                struct qr_task *t = session_waitinglist_get(session);
                ret = qr_task_send(t, session, NULL, NULL);
                if (ret != 0) {
-                       assert(session_tasklist_is_empty(session));
                        worker_del_tcp_connected(worker, peer);
                        session_waitinglist_finalize(session, KR_STATE_FAIL);
+                       session_tasklist_finalize(session, KR_STATE_FAIL);
                        session_close(session);
                        return;
                }
                session_waitinglist_pop(session, true);
        }
-       session_timer_start(session, on_tcp_watchdog_timeout,
+       session_timer_stop(session);
+       session_timer_start(session, tcp_timeout_trigger,
                            MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
 }
 
@@ -903,30 +887,6 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
        session_close(session);
 }
 
-static void on_tcp_watchdog_timeout(uv_timer_t *timer)
-{
-       struct session *session = timer->data;
-
-       assert(session_flags(session)->outgoing);
-       assert(!session_flags(session)->closing);
-
-       struct worker_ctx *worker =  timer->loop->data;
-       struct sockaddr *peer = session_get_peer(session);
-
-       uv_timer_stop(timer);
-
-       if (session_flags(session)->has_tls) {
-               worker_del_tcp_waiting(worker, peer);
-       }
-
-       worker_del_tcp_connected(worker, peer);
-       worker->stats.timeout += session_waitinglist_get_len(session);
-       session_waitinglist_finalize(session, KR_STATE_FAIL);
-       worker->stats.timeout += session_tasklist_get_len(session);
-       session_tasklist_finalize(session, KR_STATE_FAIL);
-       session_close(session);
-}
-
 /* This is called when I/O timeouts */
 static void on_udp_timeout(uv_timer_t *timer)
 {
@@ -1013,6 +973,9 @@ static void on_retransmit(uv_timer_t *req)
 
 static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
 {
+       if (!task || task->finished) {
+               return;
+       }
        /* Close pending timer */
        ioreq_kill_pending(task);
        /* Clear from outgoing table. */
@@ -1121,10 +1084,6 @@ static int qr_task_finalize(struct qr_task *task, int state)
                        worker_task_unref(t);
                }
                session_close(source_session);
-       } else if (session_get_handle(source_session)->type == UV_TCP) {
-               /* Don't try to close source session at least
-                * retry_interval_for_timeout_timer milliseconds */
-               session_timer_restart(source_session);
        }
 
        qr_task_unref(task);
@@ -1230,9 +1189,11 @@ static int qr_task_step(struct qr_task *task,
                }
        } else {
                assert (sock_type == SOCK_STREAM);
+               assert(task->pending_count == 0);
                const struct sockaddr *addr =
                        packet_source ? packet_source : task->addrlist;
                if (addr->sa_family == AF_UNSPEC) {
+                       /* task->pending_count is zero, but there are can be followers */
                        subreq_finalize(task, packet_source, packet);
                        return qr_task_finalize(task, KR_STATE_FAIL);
                }
@@ -1260,7 +1221,6 @@ static int qr_task_step(struct qr_task *task,
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
 
-                       session_timer_stop(session);
                        while (!session_waitinglist_is_empty(session)) {
                                struct qr_task *t = session_waitinglist_get(session);
                                ret = qr_task_send(t, session, NULL, NULL);
@@ -1286,18 +1246,6 @@ static int qr_task_step(struct qr_task *task,
                                session_close(session);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
-                       ret = session_timer_start(session, on_tcp_watchdog_timeout,
-                                                 MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
-                       if (ret < 0) {
-                               session_tasklist_finalize(session, KR_STATE_FAIL);
-                               subreq_finalize(task, packet_source, packet);
-                               session_close(session);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-
-                       assert(task->pending_count == 0);
-                       task->pending[task->pending_count] = session_get_handle(session);
-                       task->pending_count += 1;
                } else {
                        /* Make connection */
                        uv_connect_t *conn = malloc(sizeof(uv_connect_t));
@@ -1465,6 +1413,9 @@ int worker_submit(struct session *session, knot_pkt_t *query)
        }
        assert(uv_is_closing(session_get_handle(session)) == false);
 
+       /* Packet was successfully parsed.
+        * Task was created (found). */
+       session_touch(session);
        /* Consume input and produce next message */
        return qr_task_step(task, addr, query);
 }
@@ -1578,7 +1529,6 @@ int worker_end_tcp(struct session *session)
                tls_set_hs_state(&tls_ctx->c, TLS_HS_NOT_STARTED);
        }
 
-       assert(session_tasklist_get_len(session) >= session_waitinglist_get_len(session));
        while (!session_waitinglist_is_empty(session)) {
                struct qr_task *task = session_waitinglist_pop(session, false);
                assert(task->refs > 1);
@@ -1750,6 +1700,20 @@ void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
        q->id = msgid;
 }
 
+uint64_t worker_task_creation_time(struct qr_task *task)
+{
+       return task->creation_time;
+}
+
+void worker_task_subreq_finalize(struct qr_task *task)
+{
+       subreq_finalize(task, NULL, NULL);
+}
+
+bool worker_task_finished(struct qr_task *task)
+{
+       return task->finished;
+}
 /** Reserve worker buffers */
 static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 {
index 8e838ce49c1b596c303442c1913991817c22bcad..3d9ade8bc48d40af8413f24a940aefb8c3d4ba86 100644 (file)
@@ -102,6 +102,9 @@ void worker_request_set_source_session(struct request_ctx *, struct session *ses
 
 uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
 void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
+uint64_t worker_task_creation_time(struct qr_task *task);
+void worker_task_subreq_finalize(struct qr_task *task);
+bool worker_task_finished(struct qr_task *task);
 
 /** @cond internal */