]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: TCP connection timeouts with protolayers
authorOto Šťáva <oto.stava@nic.cz>
Wed, 3 Aug 2022 08:03:44 +0000 (10:03 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Thu, 26 Jan 2023 11:56:07 +0000 (12:56 +0100)
daemon/io.c
daemon/session2.c
daemon/session2.h
daemon/worker.c

index c9fcc0eb2b2ab47c15a0c193d451ef0cd8bede0e..8a498174bfde934c960bd2e9d716ea7d0a7e5cae 100644 (file)
@@ -241,7 +241,6 @@ static int pl_tcp_sess_deinit(struct protolayer_manager *manager, struct protola
 static enum protolayer_cb_result pl_tcp_unwrap_timeout(
                struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
 {
-       /* TODO - connecting timeout? */
        struct session2 *s = ctx->manager->session;
 
        if (kr_fails_assert(!s->closing))
@@ -261,8 +260,7 @@ static enum protolayer_cb_result pl_tcp_unwrap_timeout(
                session2_timer_stop(s);
                session2_timer_start(s,
                                KR_RESOLVE_TIME_LIMIT / 2,
-                               KR_RESOLVE_TIME_LIMIT / 2,
-                               PROTOLAYER_UNWRAP);
+                               KR_RESOLVE_TIME_LIMIT / 2);
        } else {
                /* Normally it should not happen,
                 * but better to check if there anything in this list. */
@@ -279,9 +277,7 @@ static enum protolayer_cb_result pl_tcp_unwrap_timeout(
                if (idle_time < idle_in_timeout) {
                        idle_in_timeout -= idle_time;
                        session2_timer_stop(s);
-                       session2_timer_start(s,
-                                       idle_in_timeout, idle_in_timeout,
-                                       PROTOLAYER_UNWRAP);
+                       session2_timer_start(s, idle_in_timeout, idle_in_timeout);
                } else {
                        struct sockaddr *peer = session2_get_peer(s);
                        char *peer_str = kr_straddr(peer);
@@ -291,7 +287,7 @@ static enum protolayer_cb_result pl_tcp_unwrap_timeout(
                                worker_del_tcp_waiting(peer);
                                worker_del_tcp_connected(peer);
                        }
-                       session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), NULL, NULL, NULL);
+                       session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
                }
        }
 
@@ -553,8 +549,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
                                       uv_strerror(nread));
                }
                worker_end_tcp(s);
-               session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_FORCE_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
                return;
        }
 
@@ -688,9 +683,7 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp
        if (uv_accept(master, (uv_stream_t *)client) != 0) {
                /* close session, close underlying uv handles and
                 * deallocate (or return to memory pool) memory. */
-               session2_unwrap(s,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
                return;
        }
 
@@ -700,18 +693,14 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp
        int sa_len = sizeof(struct sockaddr_in6);
        int ret = uv_tcp_getpeername(client, sa, &sa_len);
        if (ret || sa->sa_family == AF_UNSPEC) {
-               session2_unwrap(s,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
                return;
        }
        sa = session2_get_sockname(s);
        sa_len = sizeof(struct sockaddr_in6);
        ret = uv_tcp_getsockname(client, sa, &sa_len);
        if (ret || sa->sa_family == AF_UNSPEC) {
-               session2_unwrap(s,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
                return;
        }
 
@@ -776,7 +765,7 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp
 //             }
 //     }
 //#endif
-       session2_timer_start(s, timeout, idle_in_timeout, PROTOLAYER_UNWRAP);
+       session2_timer_start(s, timeout, idle_in_timeout);
        io_start_read((uv_handle_t *)client);
 }
 
index 49dcecbda8aac4ad744124579eea84c2c695940f..9637b785f07f5d01dd0b7467066212620a5d449d 100644 (file)
@@ -206,9 +206,11 @@ static int protolayer_step(struct protolayer_cb_ctx *ctx)
                                result = cb(ldata, ctx);
                        else
                                ctx->action = PROTOLAYER_CB_ACTION_CONTINUE;
+
                        ldata->processed = true;
                } else {
-                       kr_assert(false && "Repeated protocol layer step");
+                       //kr_assert(false && "Repeated protocol layer step");
+                       kr_log_debug(PROTOLAYER, "Repeated protocol layer step\n");
                }
 
                if (kr_fails_assert(result == PROTOLAYER_CB_RESULT_MAGIC)) {
@@ -522,8 +524,8 @@ int wire_buf_movestart(struct wire_buf *wb)
        size_t len = wire_buf_data_length(wb);
        if (len)
                memmove(wb->buf, wire_buf_data(wb), len);
-       wb->end -= wb->start;
        wb->start = 0;
+       wb->end = len;
        return kr_ok();
 }
 
@@ -642,15 +644,11 @@ uv_handle_t *session2_get_handle(struct session2 *s)
 static void session2_on_timeout(uv_timer_t *timer)
 {
        struct session2 *s = timer->data;
-       protolayer_manager_submit(s->layers, s->timer_direction,
-                       protolayer_event_nd(PROTOLAYER_EVENT_TIMEOUT),
-                       NULL, NULL, NULL);
+       session2_event(s, PROTOLAYER_EVENT_TIMEOUT, NULL);
 }
 
-int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat,
-                          enum protolayer_direction direction)
+int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat)
 {
-       s->timer_direction = direction;
        return uv_timer_start(&s->timer, session2_on_timeout, timeout, repeat);
 }
 
@@ -1079,7 +1077,8 @@ struct event_ctx {
 
 static void session2_transport_io_event_finished(uv_handle_t *handle)
 {
-       struct event_ctx *ctx = handle->data;
+       struct session2 *s = handle->data;
+       struct event_ctx *ctx = s->data;
        if (ctx->cb)
                ctx->cb(kr_ok(), ctx->session, ctx->target, ctx->baton);
        free(ctx);
@@ -1100,7 +1099,7 @@ static int session2_handle_close(struct session2 *s, uv_handle_t *handle,
                                  struct event_ctx *ctx)
 {
        io_stop_read(handle);
-       handle->data = ctx;
+       s->data = ctx;
        uv_close(handle, session2_transport_io_event_finished);
 
        return kr_ok();
@@ -1145,11 +1144,20 @@ static int session2_transport_event(struct session2 *s,
                if (is_close_event)
                        return session2_handle_close(s, handle, ctx);
 
+               if (ctx->cb)
+                       ctx->cb(kr_ok(), s, target, baton);
+
+               free(ctx);
                return kr_ok();
 
-       case SESSION2_TRANSPORT_PARENT:
-               session2_wrap(s, protolayer_event(event), target,
+       case SESSION2_TRANSPORT_PARENT:;
+               int ret = session2_wrap(s, protolayer_event(event), target,
                                session2_transport_parent_event_finished, ctx);
+               if (ret < 0) {
+                       free(ctx);
+                       return ret;
+               }
+
                return kr_ok();
 
        default:
@@ -1169,7 +1177,5 @@ void session2_kill_ioreq(struct session2 *session, struct qr_task *task)
                return;
        session2_tasklist_del(session, task);
        if (session->transport.io.handle->type == UV_UDP)
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
 }
index efb40897af441c782f4522407d63333e2c37dd3a..fbbd2b194f95c17d641201b4f47f87af09b2c964 100644 (file)
@@ -145,7 +145,8 @@ typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
                         * session - i.e. layers SHOULD NOT add
                         * any disconnection ceremony, if
                         * avoidable. */\
-       XX(TIMEOUT) /**< Signal that the session has timed out. */
+       XX(TIMEOUT) /**< Signal that the session has timed out. */\
+       XX(CONNECT) /**< Signal that a connection has been established. */
 
 /** Event type, to be interpreted by a layer. */
 enum protolayer_event_type {
@@ -506,21 +507,28 @@ struct session2 {
        struct protolayer_manager *layers; /**< Protocol layers of this session. */
        knot_mm_t pool;
 
-       uv_timer_t timer;
-       enum protolayer_direction timer_direction; /**< Timeout event direction. */
+       uv_timer_t timer; /**< For session-wide timeout events. */
 
-       trie_t *tasks; /**< list of tasks associated with given session. */
-       queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */
+       trie_t *tasks; /**< List of tasks associated with given session. */
+       queue_t(struct qr_task *) waiting; /**< List of tasks waiting for
+                                           * sending to upstream. */
 
        struct wire_buf wire_buf;
 
        uint64_t last_activity; /**< Time of last IO activity (if any occurs).
                                 * Otherwise session creation time. */
 
-       bool closing : 1;
-       bool throttled : 1;
-       bool outgoing : 1;
-       bool secure : 1; /**< Whether encryption takes place in this session.
+       void *data; /**< Pointer to arbitrary data for callbacks. */
+
+       bool outgoing : 1; /**< True: session's transport is towards an upstream
+                           * server. Otherwise, it is towards a client
+                           * connected to the resolver. */
+       bool closing : 1; /**< True: session is at the end of its lifecycle and
+                          * is going to close. */
+       bool connected : 1; /**< For connection-based sessions. True: connection
+                            * is established. */
+       bool throttled : 1; /**< True: session is being rate-limited. */
+       bool secure : 1; /**< True: encryption takes place in this session.
                          * Layers may use this to determine whether padding
                          * should be applied. */
 };
@@ -582,12 +590,11 @@ struct sockaddr *session2_get_sockname(struct session2 *s);
  * needed.
  *
  * May return `NULL` if no peer is set.  */
-uv_handle_t *session2_get_handle(struct session2 *s);
+KR_EXPORT uv_handle_t *session2_get_handle(struct session2 *s);
 
 /** Start the session timer. When the timer ends, a `_TIMEOUT` event is sent
- * in the specified `direction`. */
-int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat,
-                          enum protolayer_direction direction);
+ * in the `_UNWRAP` direction. */
+int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat);
 
 /** Restart the session timer without changing any of its parameters. */
 int session2_timer_restart(struct session2 *s);
@@ -663,6 +670,20 @@ int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
 int session2_wrap(struct session2 *s, struct protolayer_payload payload,
                   const void *target, protolayer_finished_cb cb, void *baton);
 
+/** Convenience function to send the specified event to be processed in the
+ * `unwrap` direction. `data` may be `NULL`.
+ *
+ * See `session2_unwrap` for more information. */
+static inline int session2_event(struct session2 *s,
+                                 enum protolayer_event_type type, void *data)
+{
+       struct protolayer_event event = {
+               .type = type,
+               .data = { .ptr = data }
+       };
+       return session2_unwrap(s, protolayer_event(event), NULL, NULL, NULL);
+}
+
 /** Removes the specified request task from the session's tasklist. The session
  * must be outgoing. If the session is UDP, a signal to close is also sent to it. */
 void session2_kill_ioreq(struct session2 *session, struct qr_task *task);
index f946fc761fa7672861cb7ab41485755d0553f685..24fd8b3263b04484795271f2e6bd65134f50f140 100644 (file)
@@ -105,8 +105,6 @@ static int qr_task_send(struct qr_task *task, struct session2 *session,
 static int qr_task_finalize(struct qr_task *task, int state);
 static void qr_task_complete(struct qr_task *task);
 static int worker_add_tcp_waiting(const struct sockaddr* addr, struct session2 *session);
-static void on_tcp_connect_timeout(uv_timer_t *timer);
-static void on_udp_timeout(uv_timer_t *timer);
 static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt);
 
 
@@ -128,9 +126,8 @@ static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family,
        /* Create connection for iterative query */
        uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
                                        ? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
-       if (!handle) {
-               return NULL;
-       }
+       kr_require(handle);
+
        int ret = io_create(the_worker->loop, handle, socktype, family, grp, true);
        if (ret) {
                if (ret == UV_EMFILE) {
@@ -351,6 +348,7 @@ static struct request_ctx *request_create(struct session2 *session,
                req->qsource.comm_flags.tcp = session2_get_handle(session)->type == UV_TCP;
                req->qsource.comm_flags.tls = session->secure;
 //             req->qsource.comm_flags.http = session->has_http; /* TODO */
+               req->qsource.comm_flags.http = false;
 
                req->qsource.flags = req->qsource.comm_flags;
                if (proxy) {
@@ -590,7 +588,7 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
                if (kr_fails_assert(qry && task->transport))
                        return status;
                size_t timeout = task->transport->timeout;
-               int ret = session2_timer_start(s, timeout, 0, PROTOLAYER_UNWRAP);
+               int ret = session2_timer_start(s, timeout, 0);
                /* Start next step with timeout, fatal if can't start a timer. */
                if (ret != 0) {
                        subreq_finalize(task, &task->transport->address.ip, task->pktbuf);
@@ -615,6 +613,7 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
                                                peer_str, uv_strerror(status));
                        }
                        worker_end_tcp(s);
+                       session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
                        return status;
                }
 
@@ -633,24 +632,6 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
        return status;
 }
 
-static void on_send(uv_udp_send_t *req, int status)
-{
-       struct qr_task *task = req->data;
-       uv_handle_t *h = (uv_handle_t *)req->handle;
-       qr_task_on_send(task, h, status);
-       qr_task_unref(task);
-       free(req);
-}
-
-static void on_write(uv_write_t *req, int status)
-{
-       struct qr_task *task = req->data;
-       uv_handle_t *h = (uv_handle_t *)req->handle;
-       qr_task_on_send(task, h, status);
-       qr_task_unref(task);
-       free(req);
-}
-
 static void qr_task_wrap_finished(int status, struct session2 *session, const void *target, void *baton)
 {
        struct qr_task *task = baton;
@@ -933,9 +914,7 @@ static int send_waiting(struct session2 *session)
                        session2_waitinglist_finalize(session, KR_STATE_FAIL);
                        session2_tasklist_finalize(session, KR_STATE_FAIL);
                        worker_del_tcp_connected(peer);
-                       session2_unwrap(session,
-                                       protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                                       NULL, NULL, NULL);
+                       session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                        break;
                }
                session2_waitinglist_pop(session, true);
@@ -978,9 +957,7 @@ static void on_connect(uv_connect_t *req, int status)
                }
                kr_assert(session2_tasklist_is_empty(session));
                session2_waitinglist_retry(session, false);
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                return;
        }
 
@@ -997,9 +974,7 @@ static void on_connect(uv_connect_t *req, int status)
                }
                kr_assert(session2_tasklist_is_empty(session));
                session2_waitinglist_retry(session, false);
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                return;
        }
 
@@ -1020,9 +995,7 @@ static void on_connect(uv_connect_t *req, int status)
                }
                kr_assert(session2_tasklist_is_empty(session));
                session2_waitinglist_retry(session, false);
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                return;
        }
 
@@ -1034,9 +1007,7 @@ static void on_connect(uv_connect_t *req, int status)
                         * something gone wrong */
                        session2_waitinglist_finalize(session, KR_STATE_FAIL);
                        kr_assert(session2_tasklist_is_empty(session));
-                       session2_unwrap(session,
-                                       protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                                       NULL, NULL, NULL);
+                       session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                        return;
                }
        }
@@ -1047,7 +1018,7 @@ static void on_connect(uv_connect_t *req, int status)
        }
 
        /* TODO */
-//     session->connected = true;
+       session->connected = true;
        session2_start_read(session);
 
        int ret = kr_ok();
@@ -1070,76 +1041,7 @@ static void on_connect(uv_connect_t *req, int status)
        }
 
        session2_timer_stop(session);
-       session2_timer_start(session,
-                           MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY,
-                           PROTOLAYER_UNWRAP);
-}
-
-static void on_tcp_connect_timeout(uv_timer_t *timer)
-{
-       struct session2 *session = timer->data;
-
-       uv_timer_stop(timer);
-       kr_require(the_worker);
-
-       kr_assert(session2_tasklist_is_empty(session));
-
-       struct sockaddr *peer = session2_get_peer(session);
-       worker_del_tcp_waiting(peer);
-
-       struct qr_task *task = session2_waitinglist_get(session);
-       if (!task) {
-               /* Normally shouldn't happen. */
-               const char *peer_str = kr_straddr(peer);
-               VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
-                           peer_str ? peer_str : "");
-               return;
-       }
-
-       struct kr_query *qry = task_get_last_pending_query(task);
-       if (kr_log_is_debug_qry(WORKER, qry)) {
-               const char *peer_str = kr_straddr(peer);
-               VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n",
-                           peer_str ? peer_str : "");
-       }
-
-       qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
-
-       the_worker->stats.timeout += session2_waitinglist_get_len(session);
-       session2_waitinglist_retry(session, true);
-       kr_assert(session2_tasklist_is_empty(session));
-       /* uv_cancel() doesn't support uv_connect_t request,
-        * so that we can't cancel it.
-        * There still exists possibility of successful connection
-        * for this request.
-        * So connection callback (on_connect()) must check
-        * if connection is in the list of waiting connection.
-        * If no, most likely this is timed out connection even if
-        * it was successful. */
-}
-
-/* This is called when I/O timeouts */
-static void on_udp_timeout(uv_timer_t *timer)
-{
-       struct session2 *session = timer->data;
-       kr_assert(session2_get_handle(session)->data == session);
-       kr_assert(session2_tasklist_get_len(session) == 1);
-       kr_assert(session2_waitinglist_is_empty(session));
-
-       uv_timer_stop(timer);
-
-       struct qr_task *task = session2_tasklist_get_first(session);
-       if (!task)
-               return;
-
-       if (task->leading && task->pending_count > 0) {
-               struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
-               qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT);
-       }
-
-       task->timeouts += 1;
-       the_worker->stats.timeout += 1;
-       qr_task_step(task, NULL, NULL);
+       session2_timer_start(session, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
 }
 
 static uv_handle_t *transmit(struct qr_task *task)
@@ -1174,9 +1076,7 @@ static uv_handle_t *transmit(struct qr_task *task)
                memcpy(peer, addr, kr_sockaddr_len(addr));
                if (qr_task_send(task, session, (struct sockaddr *)choice,
                                 task->pktbuf) != 0) {
-                       session2_unwrap(session,
-                                       protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                                       NULL, NULL, NULL);
+                       session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                        ret = NULL;
                } else {
                        task->pending[task->pending_count] = session;
@@ -1388,9 +1288,7 @@ static int qr_task_finalize(struct qr_task *task, int state)
                         * (ie. task->leading is true) */
                        worker_task_unref(t);
                }
-               session2_unwrap(source_session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(source_session, PROTOLAYER_EVENT_CLOSE, NULL);
        }
 
        qr_task_unref(task);
@@ -1459,9 +1357,7 @@ static int tcp_task_existing_connection(struct session2 *session, struct qr_task
                 * close connection to upstream. */
                session2_tasklist_finalize(session, KR_STATE_FAIL);
                worker_del_tcp_connected(session2_get_peer(session));
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                return kr_error(EINVAL);
        }
 
@@ -1471,6 +1367,7 @@ static int tcp_task_existing_connection(struct session2 *session, struct qr_task
 static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr *addr)
 {
        /* Check if there must be TLS */
+       /* TODO: tls */
 //     struct tls_client_ctx *tls_ctx = NULL;
 //     tls_client_param_t *entry = tls_client_param_get(
 //                     the_network->tls_client_params, addr);
@@ -1515,9 +1412,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr
        int ret = worker_add_tcp_waiting(addr, session);
        if (ret < 0) {
                free(conn);
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                return kr_error(EINVAL);
        }
 
@@ -1527,14 +1422,11 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr
        memcpy(peer, addr, kr_sockaddr_len(addr));
 
        /*  Start watchdog to catch eventual connection timeout. */
-       ret = session2_timer_start(session,
-                                 KR_CONN_RTT_MAX, 0, PROTOLAYER_UNWRAP);
+       ret = session2_timer_start(session, KR_CONN_RTT_MAX, 0);
        if (ret != 0) {
                worker_del_tcp_waiting(addr);
                free(conn);
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                return kr_error(EINVAL);
        }
 
@@ -1550,9 +1442,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr
                session2_timer_stop(session);
                worker_del_tcp_waiting(addr);
                free(conn);
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_FAILED);
                return kr_error(EAGAIN);
        }
@@ -1564,9 +1454,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr
                session2_timer_stop(session);
                worker_del_tcp_waiting(addr);
                free(conn);
-               session2_unwrap(session,
-                               protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
-                               NULL, NULL, NULL);
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                return kr_error(EINVAL);
        }
 
@@ -1752,7 +1640,7 @@ int worker_submit(struct session2 *session, struct comm_info *comm,
 
        struct http_ctx *http_ctx = NULL;
 #if ENABLE_DOH2
-       /* TODO: devise a way to do this... don't know yet */
+       /* TODO: http. Devise a way to do this... don't know yet */
 //     http_ctx = session_http_get_server_ctx(session);
 //
 //     /* Badly formed query when using DoH leads to a Bad Request */
@@ -1912,6 +1800,7 @@ int worker_end_tcp(struct session2 *session)
 
        worker_del_tcp_waiting(peer);
        worker_del_tcp_connected(peer);
+       session->connected = false;
 
        while (!session2_waitinglist_is_empty(session)) {
                struct qr_task *task = session2_waitinglist_pop(session, false);
@@ -2136,7 +2025,8 @@ void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
        knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
        knot_wire_set_id(pktbuf->wire, msgid);
        struct kr_query *q = task_get_last_pending_query(task);
-       q->id = msgid;
+       if (q)
+               q->id = msgid;
 }
 
 uint64_t worker_task_creation_time(struct qr_task *task)
@@ -2190,16 +2080,46 @@ static inline knot_pkt_t *produce_packet_dgram(char *buf, size_t buf_len)
        return knot_pkt_new(buf, buf_len, &the_worker->pkt_pool);
 }
 
-static enum protolayer_cb_result pl_dns_dgram_unwrap(
+static enum protolayer_cb_result pl_dns_dgram_unwrap_timeout(
                struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
 {
        struct session2 *session = ctx->manager->session;
+       kr_assert(session2_get_handle(session)->data == session);
+       kr_assert(session2_tasklist_get_len(session) == 1);
+       kr_assert(session2_waitinglist_is_empty(session));
+
+       session2_timer_stop(session);
+
+       struct qr_task *task = session2_tasklist_get_first(session);
+       if (!task)
+               return protolayer_continue(ctx);
+
+       if (task->leading && task->pending_count > 0) {
+               struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
+               qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT);
+       }
+
+       task->timeouts += 1;
+       the_worker->stats.timeout += 1;
+       qr_task_step(task, NULL, NULL);
+
+       return protolayer_continue(ctx);
+}
+
+static enum protolayer_cb_result pl_dns_dgram_unwrap(
+               struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
 
        if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
+               if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
+                       return pl_dns_dgram_unwrap_timeout(layer, ctx);
+
                /* pass thru */
                return protolayer_continue(ctx);
        }
 
+       struct session2 *session = ctx->manager->session;
+
        if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
                int ret = kr_ok();
                for (int i = 0; i < ctx->payload.iovec.cnt; i++) {
@@ -2294,10 +2214,61 @@ static int pl_dns_stream_iter_deinit(struct protolayer_manager *manager,
        return kr_ok();
 }
 
+static enum protolayer_cb_result pl_dns_stream_unwrap_timeout(
+               struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
+       struct session2 *session = ctx->manager->session;
+       if (session->connected || session->closing)
+               return protolayer_continue(ctx);
+
+       /* Connection timeout */
+       session2_timer_stop(session);
+
+       kr_assert(session2_tasklist_is_empty(session));
+
+       struct sockaddr *peer = session2_get_peer(session);
+       worker_del_tcp_waiting(peer);
+
+       struct qr_task *task = session2_waitinglist_get(session);
+       if (!task) {
+               /* Normally shouldn't happen. */
+               const char *peer_str = kr_straddr(peer);
+               VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
+                           peer_str ? peer_str : "");
+               return protolayer_continue(ctx);
+       }
+
+       struct kr_query *qry = task_get_last_pending_query(task);
+       if (kr_log_is_debug_qry(WORKER, qry)) {
+               const char *peer_str = kr_straddr(peer);
+               VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n",
+                           peer_str ? peer_str : "");
+       }
+
+       qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
+
+       the_worker->stats.timeout += session2_waitinglist_get_len(session);
+       session2_waitinglist_retry(session, true);
+       kr_assert(session2_tasklist_is_empty(session));
+       /* uv_cancel() doesn't support uv_connect_t request,
+        * so that we can't cancel it.
+        * There still exists possibility of successful connection
+        * for this request.
+        * So connection callback (on_connect()) must check
+        * if connection is in the list of waiting connection.
+        * If no, most likely this is timed out connection even if
+        * it was successful. */
+
+       return protolayer_continue(ctx);
+}
+
 static enum protolayer_cb_result pl_dns_stream_unwrap(
                struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
 {
        if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
+               if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
+                       return pl_dns_stream_unwrap_timeout(layer, ctx);
+
                /* pass thru */
                return protolayer_continue(ctx);
        }