]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: protolayer timeout and stream fixes
authorOto Šťáva <oto.stava@nic.cz>
Fri, 5 Aug 2022 06:22:37 +0000 (08:22 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Thu, 26 Jan 2023 11:56:07 +0000 (12:56 +0100)
Incoming TCP sessions (from clients) were not marked as `connected`,
causing the wrong timeout function to be called. This may have resulted
in assertion failures in some cases when using TCP.

Some error codes were not wrapped in `kr_error` - this probably did not
cause any real issues currently, but for the sake of consistency this is
now fixed.

Stream wire buffers were not being processed in a loop, which could
potentially cause input data to be discarded or processed with a delay.
This is now fixed.

daemon/io.c
daemon/session2.h
daemon/worker.c

index 8f6dec93377da04f7a40a6f71ae55809a97195a2..e66bac9fb780e9b350dbed00e8bf25e5805adb6d 100644 (file)
@@ -287,8 +287,7 @@ static enum protolayer_cb_result pl_tcp_unwrap(struct protolayer_data *layer, st
                                                kr_straddr(peer));
                        }
                        worker_end_tcp(s);
-                       session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
-                       return protolayer_break(ctx, ECONNRESET);
+                       return protolayer_break(ctx, kr_error(ECONNRESET));
                }
 
                ssize_t trimmed = proxy_process_header(&tcp->proxy, data, data_len);
@@ -305,11 +304,10 @@ static enum protolayer_cb_result pl_tcp_unwrap(struct protolayer_data *layer, st
                                }
                        }
                        worker_end_tcp(s);
-                       session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
-                       return protolayer_break(ctx, ECONNRESET);
+                       return protolayer_break(ctx, kr_error(ECONNRESET));
                } else if (trimmed == 0) {
                        session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
-                       return protolayer_break(ctx, ECONNRESET);
+                       return protolayer_break(ctx, kr_error(ECONNRESET));
                }
 
                if (tcp->proxy.command != PROXY2_CMD_LOCAL && tcp->proxy.family != AF_UNSPEC) {
@@ -696,6 +694,7 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp
 //             }
 //     }
 //#endif
+       session2_event(s, PROTOLAYER_EVENT_CONNECT, NULL);
        session2_timer_start(s, timeout, idle_in_timeout);
        io_start_read((uv_handle_t *)client);
 }
index d527ae5484d7fd00ac3a78dccd08ff4105112328..ec5f40c1aa5d4ca1e84e62574c9adf53dea36aec 100644 (file)
@@ -147,7 +147,8 @@ typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
                         * any disconnection ceremony, if
                         * avoidable. */\
        XX(TIMEOUT) /**< Signal that the session has timed out. */\
-       XX(CONNECT) /**< Signal that a connection has been established. */
+       XX(CONNECT) /**< Signal that a connection has been established. */\
+       XX(DISCONNECT) /**< Signal that a connection has ended. */
 
 /** Event type, to be interpreted by a layer. */
 enum protolayer_event_type {
index 0705baf6daa1a05953b6842eebe2d93755149d4a..4374f65e3ae05aee059a701de348b18dc794e3d2 100644 (file)
@@ -613,7 +613,6 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
                                                peer_str, uv_strerror(status));
                        }
                        worker_end_tcp(s);
-                       session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
                        return status;
                }
 
@@ -632,10 +631,11 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
        return status;
 }
 
-static void qr_task_wrap_finished(int status, struct session2 *session, const void *target, void *baton)
+static void qr_task_wrap_finished(int status, struct session2 *session,
+                                  const void *target, void *baton)
 {
        struct qr_task *task = baton;
-       qr_task_on_send(task, NULL, status);
+       qr_task_on_send(task, session2_get_handle(session), status);
        qr_task_unref(task);
        wire_buf_reset(&session->wire_buf);
 }
@@ -945,8 +945,8 @@ static void on_connect(uv_connect_t *req, int status)
         * If no, most likely this is timed out connection
         * which was removed from waiting list by
         * on_tcp_connect_timeout() callback. */
-       struct session2 *s = worker_find_tcp_waiting(peer);
-       if (!s || s != session) {
+       struct session2 *found_session = worker_find_tcp_waiting(peer);
+       if (!found_session || found_session != session) {
                /* session isn't on the waiting list.
                 * it's timed out session. */
                if (log_debug) {
@@ -961,8 +961,8 @@ static void on_connect(uv_connect_t *req, int status)
                return;
        }
 
-       s = worker_find_tcp_connected(peer);
-       if (s) {
+       found_session = worker_find_tcp_connected(peer);
+       if (found_session) {
                /* session already in the connected list.
                 * Something went wrong, it can be due to races when kresd has tried
                 * to reconnect to upstream after unsuccessful attempt. */
@@ -1018,7 +1018,7 @@ static void on_connect(uv_connect_t *req, int status)
        }
 
        /* TODO */
-       session->connected = true;
+       session2_event(session, PROTOLAYER_EVENT_CONNECT, NULL);
        session2_start_read(session);
 
        int ret = kr_ok();
@@ -1520,7 +1520,7 @@ static int qr_task_step(struct qr_task *task,
 
        /* Close pending I/O requests */
        subreq_finalize(task, packet_source, packet);
-       if ((kr_now() - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
+       if ((kr_now() - task->creation_time) >= KR_RESOLVE_TIME_LIMIT) {
                struct kr_request *req = worker_task_request(task);
                if (!kr_fails_assert(req))
                        kr_query_inform_timeout(req, req->current_query);
@@ -1800,7 +1800,7 @@ int worker_end_tcp(struct session2 *session)
 
        worker_del_tcp_waiting(peer);
        worker_del_tcp_connected(peer);
-       session->connected = false;
+       session2_event(session, PROTOLAYER_EVENT_DISCONNECT, NULL);
 
        while (!session2_waitinglist_is_empty(session)) {
                struct qr_task *task = session2_waitinglist_pop(session, false);
@@ -1841,6 +1841,8 @@ int worker_end_tcp(struct session2 *session)
                worker_task_unref(task);
        }
 
+       session2_event(session, PROTOLAYER_EVENT_DISCONNECT, NULL);
+       session2_event(session, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
        return kr_ok();
 
 //     session_flags(session)->connected = false;
@@ -2288,7 +2290,8 @@ static bool pl_dns_stream_connection_timeout(struct session2 *session)
                            peer_str ? peer_str : "");
        }
 
-       qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
+       if (qry)
+               qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
 
        the_worker->stats.timeout += session2_waitinglist_get_len(session);
        session2_waitinglist_retry(session, true);
@@ -2311,13 +2314,35 @@ static bool pl_dns_stream_event_unwrap(enum protolayer_event_type event,
                                        struct protolayer_data *layer)
 {
        struct session2 *session = manager->session;
-       if (session->closing || event != PROTOLAYER_EVENT_TIMEOUT)
+       if (session->closing)
+               return true;
+
+       if (event == PROTOLAYER_EVENT_TIMEOUT) {
+               if (session->connected)
+                       return pl_dns_stream_resolution_timeout(manager->session);
+               else
+                       return pl_dns_stream_connection_timeout(manager->session);
+       } else if (event == PROTOLAYER_EVENT_CONNECT) {
+               session->connected = true;
+               return true;
+       } else if (event == PROTOLAYER_EVENT_DISCONNECT) {
+               session->connected = false;
                return true;
+       }
 
-       if (session->connected)
-               return pl_dns_stream_resolution_timeout(manager->session);
-       else
-               return pl_dns_stream_connection_timeout(manager->session);
+       return true;
+}
+
+static knot_pkt_t *produce_stream_packet(struct wire_buf *wb)
+{
+       uint16_t pkt_len = knot_wire_read_u16(wire_buf_data(wb));
+       if (wire_buf_data_length(wb) < pkt_len + sizeof(uint16_t))
+               return NULL;
+
+       wire_buf_trim(wb, sizeof(uint16_t));
+       knot_pkt_t *pkt = produce_packet(wire_buf_data(wb), pkt_len);
+       wire_buf_trim(wb, pkt_len);
+       return pkt;
 }
 
 static enum protolayer_cb_result pl_dns_stream_unwrap(
@@ -2328,34 +2353,33 @@ static enum protolayer_cb_result pl_dns_stream_unwrap(
                return protolayer_break(ctx, kr_error(EINVAL));
        }
 
+       struct session2 *session = ctx->manager->session;
        struct pl_dns_stream_sess_data *stream = protolayer_sess_data(layer);
-
-       if (stream->single && stream->produced) {
-               if (kr_log_is_debug(WORKER, NULL)) {
-                       kr_log_debug(WORKER, "Unexpected extra data from %s\n",
-                                   kr_straddr(ctx->comm.src_addr));
-               }
-               return protolayer_break(ctx, KNOT_EMALF);
-       }
-
        struct wire_buf *wb = ctx->payload.wire_buf;
-       if (wire_buf_data_length(wb) < sizeof(uint16_t))
-               return protolayer_break(ctx, KNOT_EMALF);
 
-       uint16_t pkt_len = knot_wire_read_u16(wire_buf_data(wb));
-       if (wire_buf_data_length(wb) < pkt_len + sizeof(uint16_t))
-               return protolayer_wait(ctx);
+       knot_pkt_t *pkt;
+       while ((pkt = produce_stream_packet(wb))) {
+               if (stream->single && stream->produced) {
+                       if (kr_log_is_debug(WORKER, NULL)) {
+                               kr_log_debug(WORKER, "Unexpected extra data from %s\n",
+                                               kr_straddr(ctx->comm.src_addr));
+                       }
+                       worker_end_tcp(session);
+                       return protolayer_break(ctx, KNOT_EMALF);
+               }
 
-       wire_buf_trim(wb, sizeof(uint16_t));
-       knot_pkt_t *pkt = produce_packet(wire_buf_data(wb), pkt_len);
-       wire_buf_trim(wb, pkt_len);
-       stream->produced = true;
-       if (!pkt)
-               return protolayer_break(ctx, KNOT_EMALF);
+               stream->produced = true;
+               if (!pkt)
+                       return protolayer_break(ctx, KNOT_EMALF);
 
-       int ret = worker_submit(ctx->manager->session, &ctx->comm, NULL, NULL, pkt);
+               int ret = worker_submit(session, &ctx->comm, NULL, NULL, pkt);
+               if (ret) {
+                       worker_end_tcp(session);
+                       return protolayer_break(ctx, ret);
+               }
+       }
        wire_buf_movestart(wb);
-       return protolayer_break(ctx, ret);
+       return protolayer_break(ctx, kr_ok());
 }
 
 struct sized_iovs {