]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: handle IO error when processing wire buffer
authorTomas Krizek <tomas.krizek@nic.cz>
Thu, 3 Sep 2020 11:10:04 +0000 (13:10 +0200)
committerTomas Krizek <tomas.krizek@nic.cz>
Wed, 23 Sep 2020 10:08:18 +0000 (12:08 +0200)
This fixes the following assert:
daemon/worker.c:1157: qr_task_finalize: Assertion `!session_flags(source_session)->closing' failed.

Scenario which leads to the above error:
1. We're using a stateful protocol.
2. Enough data arrive in a single tcp_recv() call to put at least
   two queries into the session's wire buffer.
3. session_wirebuf_process() calls worker_submit() which calls
   qr_task_step().
4. In the qr_task_step() the query state changes to KR_STATE_DONE,
   then qr_task_finalize() is called.
5. qr_task_send() is called, but it fails. This is where
   qr_task_finalize() closes the session, but used to return no error.
6. When the next query is processed in session_wirebuf_process(),
   steps 3 and 4 are followed and qr_task_finalize() is called.
7. Since the session is already closed, the assert is triggered.

Debugging this was a lot of fun... All hail the rr debugger!

daemon/session.c
daemon/worker.c

index 606b1399262bd2d4dc3813a920fa42ceaf61a77e..d606fa451f91f5cd77c67e7cae15bd22de8f439e 100644 (file)
@@ -707,29 +707,35 @@ void session_unpoison(struct session *session)
 int session_wirebuf_process(struct session *session, const struct sockaddr *peer)
 {
        int ret = 0;
-       if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
+       if (session->wire_buf_start_idx == session->wire_buf_end_idx)
                return ret;
-       }
+
        size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
-       uint32_t max_iterations = (wirebuf_data_size / (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
+       uint32_t max_iterations = (wirebuf_data_size /
+               (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
        knot_pkt_t *query = NULL;
+
        while (((query = session_produce_packet(session, &the_worker->pkt_pool)) != NULL) &&
               (ret < max_iterations)) {
                assert (!session_wirebuf_error(session));
                int res = worker_submit(session, peer, query);
-               if (res != kr_error(EILSEQ)) {
-                       /* Packet has been successfully parsed. */
+               /* Errors from worker_submit() are intetionally *not* handled in order to
+                * ensure the entire wire buffer is processed. */
+               if (res == kr_ok())
                        ret += 1;
-               }
                if (session_discard_packet(session, query) < 0) {
                        /* Packet data isn't stored in memory as expected.
-                          something went wrong, normally should not happen. */
+                        * something went wrong, normally should not happen. */
                        break;
                }
        }
-       if (session_wirebuf_error(session)) {
+
+       /* worker_submit() may cause the session to close (e.g. due to IO
+        * write error when the packet triggers an immediate answer). This is
+        * an error state, as well as any wirebuf error. */
+       if (session->sflags.closing || session_wirebuf_error(session))
                ret = -1;
-       }
+
        return ret;
 }
 
index 011e852e70c47ac1ba348106076d5d8306a4b81c..e7c7fc83346f9e8906d8531df45601d46fdb1f75 100644 (file)
@@ -1154,7 +1154,7 @@ static int qr_task_finalize(struct qr_task *task, int state)
 {
        assert(task && task->leading == false);
        if (task->finished) {
-               return 0;
+               return kr_ok();
        }
        struct request_ctx *ctx = task->ctx;
        struct session *source_session = ctx->source.session;
@@ -1163,16 +1163,17 @@ static int qr_task_finalize(struct qr_task *task, int state)
        task->finished = true;
        if (source_session == NULL) {
                (void) qr_task_on_send(task, NULL, kr_error(EIO));
-               return state == KR_STATE_DONE ? 0 : kr_error(EIO);
+               return state == KR_STATE_DONE ? kr_ok() : kr_error(EIO);
        }
 
+       if (session_flags(source_session)->closing ||
+           ctx->source.addr.ip.sa_family == AF_UNSPEC)
+               return kr_error(EINVAL);
+
        /* Reference task as the callback handler can close it */
        qr_task_ref(task);
 
        /* Send back answer */
-       assert(!session_flags(source_session)->closing);
-       assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
-
        int ret;
        const uv_handle_t *src_handle = session_get_handle(source_session);
        if (src_handle->type != UV_UDP && src_handle->type != UV_TCP) {
@@ -1207,7 +1208,9 @@ static int qr_task_finalize(struct qr_task *task, int state)
 
        qr_task_unref(task);
 
-       return state == KR_STATE_DONE ? 0 : kr_error(EIO);
+       if (ret != kr_ok() || state != KR_STATE_DONE)
+               return kr_error(EIO);
+       return kr_ok();
 }
 
 static int udp_task_step(struct qr_task *task,