]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/session2: put event handling into separate functions
authorOto Šťáva <oto.stava@nic.cz>
Thu, 4 Aug 2022 09:15:13 +0000 (11:15 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Thu, 26 Jan 2023 11:56:07 +0000 (12:56 +0100)
Originally, events were simply just another payload for the regular data
callbacks. That could, however, lead to problems with the control flow
of the layer iterations, e.g. when a buffer causes a layer to `_WAIT`,
but the next payload is an event instead of another buffer. This
separation should fix that problem.

daemon/io.c
daemon/session2.c
daemon/session2.h
daemon/worker.c

index 8a498174bfde934c960bd2e9d716ea7d0a7e5cae..8f6dec93377da04f7a40a6f71ae55809a97195a2 100644 (file)
@@ -147,11 +147,6 @@ static int pl_udp_iter_init(struct protolayer_manager *manager, struct protolaye
 
 static enum protolayer_cb_result pl_udp_unwrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
 {
-       if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
-               /* events should not happen in UDP (currently) */
-               return protolayer_continue(ctx);
-       }
-
        ctx->payload = protolayer_as_buffer(&ctx->payload);
        if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER)) {
                /* unsupported payload */
@@ -238,76 +233,12 @@ static int pl_tcp_sess_deinit(struct protolayer_manager *manager, struct protola
        return 0;
 }
 
-static enum protolayer_cb_result pl_tcp_unwrap_timeout(
-               struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
-{
-       struct session2 *s = ctx->manager->session;
-
-       if (kr_fails_assert(!s->closing))
-               return protolayer_continue(ctx);
-
-       if (!session2_tasklist_is_empty(s)) {
-               int finalized = session2_tasklist_finalize_expired(s);
-               the_worker->stats.timeout += finalized;
-               /* session2_tasklist_finalize_expired() may call worker_task_finalize().
-                * If session is a source session and there were IO errors,
-                * worker_task_finalize() can finalize all tasks and close session. */
-               if (s->closing)
-                       return protolayer_continue(ctx);
-       }
-
-       if (!session2_tasklist_is_empty(s)) {
-               session2_timer_stop(s);
-               session2_timer_start(s,
-                               KR_RESOLVE_TIME_LIMIT / 2,
-                               KR_RESOLVE_TIME_LIMIT / 2);
-       } else {
-               /* Normally it should not happen,
-                * but better to check if there anything in this list. */
-               while (!session2_waitinglist_is_empty(s)) {
-                       struct qr_task *t = session2_waitinglist_pop(s, false);
-                       worker_task_finalize(t, KR_STATE_FAIL);
-                       worker_task_unref(t);
-                       the_worker->stats.timeout += 1;
-                       if (s->closing)
-                               return protolayer_continue(ctx);
-               }
-               uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
-               uint64_t idle_time = kr_now() - s->last_activity;
-               if (idle_time < idle_in_timeout) {
-                       idle_in_timeout -= idle_time;
-                       session2_timer_stop(s);
-                       session2_timer_start(s, idle_in_timeout, idle_in_timeout);
-               } else {
-                       struct sockaddr *peer = session2_get_peer(s);
-                       char *peer_str = kr_straddr(peer);
-                       kr_log_debug(IO, "=> closing connection to '%s'\n",
-                                      peer_str ? peer_str : "");
-                       if (s->outgoing) {
-                               worker_del_tcp_waiting(peer);
-                               worker_del_tcp_connected(peer);
-                       }
-                       session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
-               }
-       }
-
-       return protolayer_continue(ctx);
-}
-
 static enum protolayer_cb_result pl_tcp_unwrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
 {
        struct session2 *s = ctx->manager->session;
        struct pl_tcp_sess_data *tcp = protolayer_sess_data(layer);
        struct sockaddr *peer = session2_get_peer(s);
 
-       if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
-               if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
-                       return pl_tcp_unwrap_timeout(layer, ctx);
-
-               /* pass thru */
-               return protolayer_continue(ctx);
-       }
-
        if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
                const char *buf = ctx->payload.buffer.buf;
                const size_t len = ctx->payload.buffer.len;
@@ -356,8 +287,8 @@ static enum protolayer_cb_result pl_tcp_unwrap(struct protolayer_data *layer, st
                                                kr_straddr(peer));
                        }
                        worker_end_tcp(s);
-                       ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
-                       return protolayer_push(ctx);
+                       session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+                       return protolayer_break(ctx, ECONNRESET);
                }
 
                ssize_t trimmed = proxy_process_header(&tcp->proxy, data, data_len);
@@ -374,11 +305,11 @@ static enum protolayer_cb_result pl_tcp_unwrap(struct protolayer_data *layer, st
                                }
                        }
                        worker_end_tcp(s);
-                       ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
-                       return protolayer_push(ctx);
+                       session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+                       return protolayer_break(ctx, ECONNRESET);
                } else if (trimmed == 0) {
-                       ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
-                       return protolayer_push(ctx);
+                       session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+                       return protolayer_break(ctx, ECONNRESET);
                }
 
                if (tcp->proxy.command != PROXY2_CMD_LOCAL && tcp->proxy.family != AF_UNSPEC) {
index 9637b785f07f5d01dd0b7467066212620a5d449d..81ba5ce610b89fa68984c8194b28cec22a7e7cf8 100644 (file)
@@ -22,9 +22,8 @@ static inline int session2_transport_push(struct session2 *s,
                                           const void *target,
                                           protolayer_finished_cb cb, void *baton);
 static int session2_transport_event(struct session2 *s,
-                                    struct protolayer_event event,
-                                    const void *target,
-                                    protolayer_finished_cb cb, void *baton);
+                                    enum protolayer_event_type event,
+                                    void *baton);
 
 struct protolayer_globals protolayer_globals[PROTOLAYER_PROTOCOL_COUNT] = {0};
 
@@ -162,14 +161,6 @@ static int protolayer_cb_ctx_finish(struct protolayer_cb_ctx *ctx, int ret,
                ctx->finished_cb(ret, session, ctx->finished_cb_target,
                                ctx->finished_cb_baton);
 
-       /* events bounce back from unwrap to wrap */
-       bool bounce_back = (ctx->direction == PROTOLAYER_UNWRAP
-                       && ret == PROTOLAYER_RET_NORMAL
-                       && !ctx->status
-                       && ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT);
-       if (bounce_back)
-               session2_wrap(session, ctx->payload, NULL, NULL, NULL);
-
        free(ctx);
 
        return ret;
@@ -277,14 +268,9 @@ static int protolayer_manager_submit(
 
        if (kr_log_is_debug(PROTOLAYER, NULL)) {
                const char *sess_dir = manager->session->outgoing ? "out" : "in";
-               const char *event_name = (payload.type == PROTOLAYER_PAYLOAD_EVENT)
-                       ? protolayer_event_names[payload.event.type]
-                       : "";
-               const char *event_space = (payload.type == PROTOLAYER_PAYLOAD_EVENT) ? " " : "";
-               kr_log_debug(PROTOLAYER, "[%s] %s%s%s submitted to grp '%s' in %s direction\n",
+               kr_log_debug(PROTOLAYER, "[%s] %s submitted to grp '%s' in %s direction\n",
                                sess_dir,
                                protolayer_payload_names[payload.type],
-                               event_space, event_name,
                                protolayer_grp_names[manager->grp],
                                (direction == PROTOLAYER_UNWRAP) ? "unwrap" : "wrap");
        }
@@ -446,10 +432,6 @@ enum protolayer_cb_result protolayer_push(struct protolayer_cb_ctx *ctx)
                ret = session2_transport_pushv(session,
                                ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
                                ctx->target, protolayer_push_finished, ctx);
-       } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
-               ret = session2_transport_event(session,
-                               ctx->payload.event,
-                               ctx->target, protolayer_push_finished, ctx);
        } else {
                kr_assert(false && "Invalid payload type");
                ret = kr_error(EINVAL);
@@ -886,6 +868,50 @@ int session2_wrap(struct session2 *s, struct protolayer_payload payload,
                        payload, target, cb, baton);
 }
 
+static void session2_event_wrap(struct session2 *s, enum protolayer_event_type event, void *baton)
+{
+       bool cont;
+       struct protolayer_manager *m = s->layers;
+       for (ssize_t i = m->num_layers - 1; i >= 0; i--) {
+               struct protolayer_data *data = protolayer_manager_get(m, i);
+               struct protolayer_globals *globals = &protolayer_globals[data->protocol];
+               if (globals->event_wrap)
+                       cont = globals->event_wrap(event, &baton, m, data);
+               else
+                       cont = true;
+
+               if (!cont)
+                       return;
+       }
+
+       session2_transport_event(s, event, baton);
+}
+
+void session2_event(struct session2 *s, enum protolayer_event_type event, void *baton)
+{
+       bool cont;
+       struct protolayer_manager *m = s->layers;
+       for (ssize_t i = 0; i < m->num_layers; i++) {
+               struct protolayer_data *data = protolayer_manager_get(m, i);
+               struct protolayer_globals *globals = &protolayer_globals[data->protocol];
+               if (globals->event_unwrap)
+                       cont = globals->event_unwrap(event, &baton, m, data);
+               else
+                       cont = true;
+
+               if (!cont)
+                       return;
+       }
+
+       /* Immediately bounce back in the `wrap` direction.
+        *
+        * TODO: This might be undesirable for cases with sub-sessions - the
+        * current idea is for the layers managing sub-sessions to just return
+        * `false` on `event_unwrap`, but a more "automatic" mechanism may be
+        * added when this is relevant, to make it less error-prone. */
+       session2_event_wrap(s, event, baton);
+}
+
 
 struct parent_pushv_ctx {
        struct session2 *session;
@@ -1068,101 +1094,47 @@ static inline int session2_transport_push(struct session2 *s,
                        session2_transport_single_push_finished, ctx);
 }
 
-struct event_ctx {
-       struct session2 *session;
-       protolayer_finished_cb cb;
-       void *baton;
-       const void *target;
-};
-
-static void session2_transport_io_event_finished(uv_handle_t *handle)
-{
-       struct session2 *s = handle->data;
-       struct event_ctx *ctx = s->data;
-       if (ctx->cb)
-               ctx->cb(kr_ok(), ctx->session, ctx->target, ctx->baton);
-       free(ctx);
-}
-
-static void session2_transport_parent_event_finished(int status,
-                                                     struct session2 *session,
-                                                     const void *target,
-                                                     void *baton)
-{
-       struct event_ctx *ctx = baton;
-       if (ctx->cb)
-               ctx->cb(status, ctx->session, target, ctx->baton);
-       free(ctx);
-}
-
-static int session2_handle_close(struct session2 *s, uv_handle_t *handle,
-                                 struct event_ctx *ctx)
+static int session2_handle_close(struct session2 *s, uv_handle_t *handle)
 {
        io_stop_read(handle);
-       s->data = ctx;
-       uv_close(handle, session2_transport_io_event_finished);
+       uv_close(handle, NULL);
 
        return kr_ok();
 }
 
 static int session2_transport_event(struct session2 *s,
-                                    struct protolayer_event event,
-                                    const void *target,
-                                    protolayer_finished_cb cb, void *baton)
+                                    enum protolayer_event_type event,
+                                    void *baton)
 {
-       if (s->closing) {
-               if (cb)
-                       cb(kr_error(ESTALE), s, target, baton);
+       if (s->closing)
                return kr_ok();
-       }
 
-       bool is_close_event = (event.type == PROTOLAYER_EVENT_CLOSE ||
-                       event.type == PROTOLAYER_EVENT_FORCE_CLOSE);
+       bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE ||
+                       event == PROTOLAYER_EVENT_FORCE_CLOSE);
        if (is_close_event) {
                kr_require(session2_is_empty(s));
                session2_timer_stop(s);
                s->closing = true;
        }
 
-       struct event_ctx *ctx = malloc(sizeof(*ctx));
-       kr_require(ctx);
-       *ctx = (struct event_ctx){
-               .session = s,
-               .cb = cb,
-               .baton = baton,
-               .target = target
-       };
-
        switch (s->transport.type) {
        case SESSION2_TRANSPORT_IO:;
                uv_handle_t *handle = s->transport.io.handle;
                if (kr_fails_assert(handle)) {
-                       free(ctx);
                        return kr_error(EINVAL);
                }
 
                if (is_close_event)
-                       return session2_handle_close(s, handle, ctx);
+                       return session2_handle_close(s, handle);
 
-               if (ctx->cb)
-                       ctx->cb(kr_ok(), s, target, baton);
-
-               free(ctx);
                return kr_ok();
 
        case SESSION2_TRANSPORT_PARENT:;
-               int ret = session2_wrap(s, protolayer_event(event), target,
-                               session2_transport_parent_event_finished, ctx);
-               if (ret < 0) {
-                       free(ctx);
-                       return ret;
-               }
-
+               session2_event_wrap(s, event, baton);
                return kr_ok();
 
        default:
                kr_assert(false && "Invalid transport");
-               free(ctx);
                return kr_error(EINVAL);
        }
 }
index fbbd2b194f95c17d641201b4f47f87af09b2c964..d527ae5484d7fd00ac3a78dccd08ff4105112328 100644 (file)
@@ -137,6 +137,7 @@ enum protolayer_ret {
 typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
                                        const void *target, void *baton);
 
+
 #define PROTOLAYER_EVENT_MAP(XX) \
        XX(CLOSE) /**< Signal to gracefully close the session -
                   * i.e. layers add their standard disconnection
@@ -159,20 +160,10 @@ enum protolayer_event_type {
 
 extern char *protolayer_event_names[];
 
-/** Event, with optional auxiliary data. */
-struct protolayer_event {
-       enum protolayer_event_type type;
-       union {
-               void *ptr;
-               char raw[sizeof(void *)];
-       } data; /**< Optional data supplied with the event.
-                * May be used by a layer. */
-};
 
 #define PROTOLAYER_PAYLOAD_MAP(XX) \
        XX(BUFFER, "Buffer") \
        XX(IOVEC, "IOVec") \
-       XX(EVENT, "Event") \
        XX(WIRE_BUF, "Wire buffer")
 
 /** Defines whether the data for a `struct protolayer_cb_ctx` is represented
@@ -206,9 +197,6 @@ struct protolayer_payload {
                        int cnt;
                } iovec;
 
-               /** Only valid if `type` is `_EVENT`. */
-               struct protolayer_event event;
-
                /** Only valid if `type` is `_WIRE_BUF`. */
                struct wire_buf *wire_buf;
        };
@@ -267,26 +255,6 @@ static inline struct protolayer_payload protolayer_iovec(
        };
 }
 
-/** Convenience function to get an event-type payload. */
-static inline struct protolayer_payload protolayer_event(struct protolayer_event event)
-{
-       return (struct protolayer_payload){
-               .type = PROTOLAYER_PAYLOAD_EVENT,
-               .event = event
-       };
-}
-
-/** Convenience function to get an event-type payload without auxiliary data. */
-static inline struct protolayer_payload protolayer_event_nd(enum protolayer_event_type event)
-{
-       return (struct protolayer_payload){
-               .type = PROTOLAYER_PAYLOAD_EVENT,
-               .event = {
-                       .type = event
-               }
-       };
-}
-
 /** Convenience function to get a wire-buf-type payload. */
 static inline struct protolayer_payload protolayer_wire_buf(struct wire_buf *wire_buf)
 {
@@ -333,8 +301,26 @@ enum protolayer_cb_result {
        PROTOLAYER_CB_RESULT_MAGIC = 0x364F392E,
 };
 
+/** Function type for `wrap` and `unwrap` callbacks of layers. Return value
+ * determines the flow of iteration; see the enum docs for more info. */
 typedef enum protolayer_cb_result (*protolayer_cb)(
                struct protolayer_data *layer, struct protolayer_cb_ctx *ctx);
+
+/** Function type for `event_wrap` and `event_unwrap` callbacks of layers.
+ * `baton` always points to some memory; it may be modified accommodate for
+ * the behaviour of the next layer in the sequence.
+ *
+ * When `true` is returned, iteration proceeds as normal. When `false` is
+ * returned, iteration stops. */
+typedef bool (*protolayer_event_cb)(enum protolayer_event_type event,
+                                    void **baton,
+                                    struct protolayer_manager *manager,
+                                    struct protolayer_data *layer);
+
+/** Function type for (de)initialization callbacks of layers.
+ *
+ * Returning 0 means success, other return values mean error and halt the
+ * initialization. */
 typedef int (*protolayer_data_cb)(struct protolayer_manager *manager,
                                   struct protolayer_data *layer);
 
@@ -373,12 +359,16 @@ struct protolayer_globals {
        protolayer_data_cb iter_deinit; /**< Called at the end of a layer
                                         * sequence to deinitialize
                                         * layer-specific iteration data. */
+
        protolayer_cb unwrap; /**< Strips the buffer of protocol-specific
                               * data. E.g. a HTTP layer removes HTTP
                               * status and headers. */
        protolayer_cb wrap;   /**< Wraps the buffer into protocol-specific
                               * data. E.g. a HTTP layer adds HTTP status
                               * and headers. */
+
+       protolayer_event_cb event_unwrap; /**< Processes events in the unwrap order. */
+       protolayer_event_cb event_wrap; /**< Processes events in the wrap order. */
 };
 
 /** Global data about layered protocols. Indexed by `enum protolayer_protocol`. */
@@ -456,22 +446,22 @@ int wire_buf_movestart(struct wire_buf *wb);
 /** Resets the valid bytes of the buffer to zero, as well as the error flag. */
 int wire_buf_reset(struct wire_buf *wb);
 
-static void *wire_buf_data(const struct wire_buf *wb)
+static inline void *wire_buf_data(const struct wire_buf *wb)
 {
        return &wb->buf[wb->start];
 }
 
-static size_t wire_buf_data_length(const struct wire_buf *wb)
+static inline size_t wire_buf_data_length(const struct wire_buf *wb)
 {
        return wb->end - wb->start;
 }
 
-static void *wire_buf_free_space(const struct wire_buf *wb)
+static inline void *wire_buf_free_space(const struct wire_buf *wb)
 {
        return &wb->buf[wb->end];
 }
 
-static size_t wire_buf_free_space_length(const struct wire_buf *wb)
+static inline size_t wire_buf_free_space_length(const struct wire_buf *wb)
 {
        return wb->size - wb->end;
 }
@@ -670,19 +660,10 @@ int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
 int session2_wrap(struct session2 *s, struct protolayer_payload payload,
                   const void *target, protolayer_finished_cb cb, void *baton);
 
-/** Convenience function to send the specified event to be processed in the
- * `unwrap` direction. `data` may be `NULL`.
- *
- * See `session2_unwrap` for more information. */
-static inline int session2_event(struct session2 *s,
-                                 enum protolayer_event_type type, void *data)
-{
-       struct protolayer_event event = {
-               .type = type,
-               .data = { .ptr = data }
-       };
-       return session2_unwrap(s, protolayer_event(event), NULL, NULL, NULL);
-}
+/** Sends an event to be synchronously processed by the protocol layers of the
+ * specified session. The layers are first iterated through in the `_UNWRAP`
+ * direction, then bounced back in the `_WRAP` direction. */
+void session2_event(struct session2 *s, enum protolayer_event_type type, void *baton);
 
 /** Removes the specified request task from the session's tasklist. The session
  * must be outgoing. If the session is UDP, a signal to close is also sent to it. */
index 24fd8b3263b04484795271f2e6bd65134f50f140..1e150de99cea683871d0106f77f1ebc0ec96b3c5 100644 (file)
@@ -2080,19 +2080,24 @@ static inline knot_pkt_t *produce_packet_dgram(char *buf, size_t buf_len)
        return knot_pkt_new(buf, buf_len, &the_worker->pkt_pool);
 }
 
-static enum protolayer_cb_result pl_dns_dgram_unwrap_timeout(
-               struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+static bool pl_dns_dgram_event_unwrap(enum protolayer_event_type event,
+                                      void **baton,
+                                      struct protolayer_manager *manager,
+                                      struct protolayer_data *layer)
 {
-       struct session2 *session = ctx->manager->session;
-       kr_assert(session2_get_handle(session)->data == session);
-       kr_assert(session2_tasklist_get_len(session) == 1);
-       kr_assert(session2_waitinglist_is_empty(session));
+       if (event != PROTOLAYER_EVENT_TIMEOUT)
+               return true;
+
+       struct session2 *session = manager->session;
+       if (session2_tasklist_get_len(session) != 1 ||
+                       !session2_waitinglist_is_empty(session))
+               return true;
 
        session2_timer_stop(session);
 
        struct qr_task *task = session2_tasklist_get_first(session);
        if (!task)
-               return protolayer_continue(ctx);
+               return true;
 
        if (task->leading && task->pending_count > 0) {
                struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
@@ -2103,21 +2108,12 @@ static enum protolayer_cb_result pl_dns_dgram_unwrap_timeout(
        the_worker->stats.timeout += 1;
        qr_task_step(task, NULL, NULL);
 
-       return protolayer_continue(ctx);
+       return true;
 }
 
 static enum protolayer_cb_result pl_dns_dgram_unwrap(
                struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
 {
-
-       if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
-               if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
-                       return pl_dns_dgram_unwrap_timeout(layer, ctx);
-
-               /* pass thru */
-               return protolayer_continue(ctx);
-       }
-
        struct session2 *session = ctx->manager->session;
 
        if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
@@ -2214,14 +2210,61 @@ static int pl_dns_stream_iter_deinit(struct protolayer_manager *manager,
        return kr_ok();
 }
 
-static enum protolayer_cb_result pl_dns_stream_unwrap_timeout(
-               struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+static bool pl_dns_stream_resolution_timeout(struct session2 *s)
 {
-       struct session2 *session = ctx->manager->session;
-       if (session->connected || session->closing)
-               return protolayer_continue(ctx);
+       if (kr_fails_assert(!s->closing))
+               return true;
+
+       if (!session2_tasklist_is_empty(s)) {
+               int finalized = session2_tasklist_finalize_expired(s);
+               the_worker->stats.timeout += finalized;
+               /* session2_tasklist_finalize_expired() may call worker_task_finalize().
+                * If session is a source session and there were IO errors,
+                * worker_task_finalize() can finalize all tasks and close session. */
+               if (s->closing)
+                       return true;
+       }
+
+       if (!session2_tasklist_is_empty(s)) {
+               session2_timer_stop(s);
+               session2_timer_start(s,
+                               KR_RESOLVE_TIME_LIMIT / 2,
+                               KR_RESOLVE_TIME_LIMIT / 2);
+       } else {
+               /* Normally it should not happen,
+                * but better to check if there anything in this list. */
+               while (!session2_waitinglist_is_empty(s)) {
+                       struct qr_task *t = session2_waitinglist_pop(s, false);
+                       worker_task_finalize(t, KR_STATE_FAIL);
+                       worker_task_unref(t);
+                       the_worker->stats.timeout += 1;
+                       if (s->closing)
+                               return true;
+               }
+               uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
+               uint64_t idle_time = kr_now() - s->last_activity;
+               if (idle_time < idle_in_timeout) {
+                       idle_in_timeout -= idle_time;
+                       session2_timer_stop(s);
+                       session2_timer_start(s, idle_in_timeout, idle_in_timeout);
+               } else {
+                       struct sockaddr *peer = session2_get_peer(s);
+                       char *peer_str = kr_straddr(peer);
+                       kr_log_debug(IO, "=> closing connection to '%s'\n",
+                                      peer_str ? peer_str : "");
+                       if (s->outgoing) {
+                               worker_del_tcp_waiting(peer);
+                               worker_del_tcp_connected(peer);
+                       }
+                       session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+               }
+       }
+
+       return true;
+}
 
-       /* Connection timeout */
+static bool pl_dns_stream_connection_timeout(struct session2 *session)
+{
        session2_timer_stop(session);
 
        kr_assert(session2_tasklist_is_empty(session));
@@ -2235,7 +2278,7 @@ static enum protolayer_cb_result pl_dns_stream_unwrap_timeout(
                const char *peer_str = kr_straddr(peer);
                VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
                            peer_str ? peer_str : "");
-               return protolayer_continue(ctx);
+               return true;
        }
 
        struct kr_query *qry = task_get_last_pending_query(task);
@@ -2259,20 +2302,27 @@ static enum protolayer_cb_result pl_dns_stream_unwrap_timeout(
         * If no, most likely this is timed out connection even if
         * it was successful. */
 
-       return protolayer_continue(ctx);
+       return true;
 }
 
-static enum protolayer_cb_result pl_dns_stream_unwrap(
-               struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+static bool pl_dns_stream_event_unwrap(enum protolayer_event_type event,
+                                       void **baton,
+                                       struct protolayer_manager *manager,
+                                       struct protolayer_data *layer)
 {
-       if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
-               if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
-                       return pl_dns_stream_unwrap_timeout(layer, ctx);
+       struct session2 *session = manager->session;
+       if (session->closing || event != PROTOLAYER_EVENT_TIMEOUT)
+               return true;
 
-               /* pass thru */
-               return protolayer_continue(ctx);
-       }
+       if (session->connected)
+               return pl_dns_stream_resolution_timeout(manager->session);
+       else
+               return pl_dns_stream_connection_timeout(manager->session);
+}
 
+static enum protolayer_cb_result pl_dns_stream_unwrap(
+               struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
        if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF)) {
                /* DNS stream only works with a wire buffer */
                return protolayer_break(ctx, kr_error(EINVAL));
@@ -2312,11 +2362,6 @@ struct sized_iovs {
 static enum protolayer_cb_result pl_dns_stream_wrap(
                struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
 {
-       if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
-               /* pass thru */
-               return protolayer_continue(ctx);
-       }
-
        struct pl_dns_stream_iter_data *stream = protolayer_iter_data(layer);
        struct session2 *s = ctx->manager->session;
 
@@ -2388,6 +2433,7 @@ int worker_init(void)
        /* DNS protocol layers */
        protolayer_globals[PROTOLAYER_DNS_DGRAM] = (struct protolayer_globals){
                .unwrap = pl_dns_dgram_unwrap,
+               .event_unwrap = pl_dns_dgram_event_unwrap
        };
        const struct protolayer_globals stream_common = {
                .sess_size = sizeof(struct pl_dns_stream_sess_data),
@@ -2396,7 +2442,8 @@ int worker_init(void)
                .iter_init = pl_dns_stream_iter_init,
                .iter_deinit = pl_dns_stream_iter_deinit,
                .unwrap = pl_dns_stream_unwrap,
-               .wrap = pl_dns_stream_wrap
+               .wrap = pl_dns_stream_wrap,
+               .event_unwrap = pl_dns_stream_event_unwrap
        };
        protolayer_globals[PROTOLAYER_DNS_MSTREAM] = stream_common;
        protolayer_globals[PROTOLAYER_DNS_MSTREAM].sess_init = pl_dns_mstream_sess_init;