From 8ad2db6e4af6b2537be3e4e2ec7d12a5943c614a Mon Sep 17 00:00:00 2001 From: Tomas Krizek Date: Thu, 3 Sep 2020 13:10:04 +0200 Subject: [PATCH] daemon: handle IO error when processing wire buffer This fixes the following assert: daemon/worker.c:1157: qr_task_finalize: Assertion `!session_flags(source_session)->closing' failed. Scenario which leads to the above error: 1. We're using a stateful protocol. 2. Enough data arrive in a single tcp_recv() call to put at least two queries into the session's wire buffer. 3. session_wirebuf_process() calls worker_submit() which calls qr_task_step(). 4. In the qr_task_step() the query state changes to KR_STATE_DONE, then qr_task_finalize() is called. 5. qr_task_send() is called, but it fails. This is where qr_task_finalize() closes the session, but used to return no error. 6. When the next query is processed in session_wirebuf_process(), steps 3 and 4 are followed and qr_task_finalize() is called. 7. Since the session is already closed, the assert is triggered. Debugging this was a lot of fun... All hail the rr debugger! --- daemon/session.c | 24 +++++++++++++++--------- daemon/worker.c | 15 +++++++++------ 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/daemon/session.c b/daemon/session.c index 606b13992..d606fa451 100644 --- a/daemon/session.c +++ b/daemon/session.c @@ -707,29 +707,35 @@ void session_unpoison(struct session *session) int session_wirebuf_process(struct session *session, const struct sockaddr *peer) { int ret = 0; - if (session->wire_buf_start_idx == session->wire_buf_end_idx) { + if (session->wire_buf_start_idx == session->wire_buf_end_idx) return ret; - } + size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx; - uint32_t max_iterations = (wirebuf_data_size / (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1; + uint32_t max_iterations = (wirebuf_data_size / + (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1; knot_pkt_t *query = NULL; + while (((query = session_produce_packet(session, &the_worker->pkt_pool)) != NULL) && (ret < max_iterations)) { assert (!session_wirebuf_error(session)); int res = worker_submit(session, peer, query); - if (res != kr_error(EILSEQ)) { - /* Packet has been successfully parsed. */ + /* Errors from worker_submit() are intetionally *not* handled in order to + * ensure the entire wire buffer is processed. */ + if (res == kr_ok()) ret += 1; - } if (session_discard_packet(session, query) < 0) { /* Packet data isn't stored in memory as expected. - something went wrong, normally should not happen. */ + * something went wrong, normally should not happen. */ break; } } - if (session_wirebuf_error(session)) { + + /* worker_submit() may cause the session to close (e.g. due to IO + * write error when the packet triggers an immediate answer). This is + * an error state, as well as any wirebuf error. */ + if (session->sflags.closing || session_wirebuf_error(session)) ret = -1; - } + return ret; } diff --git a/daemon/worker.c b/daemon/worker.c index 011e852e7..e7c7fc833 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -1154,7 +1154,7 @@ static int qr_task_finalize(struct qr_task *task, int state) { assert(task && task->leading == false); if (task->finished) { - return 0; + return kr_ok(); } struct request_ctx *ctx = task->ctx; struct session *source_session = ctx->source.session; @@ -1163,16 +1163,17 @@ static int qr_task_finalize(struct qr_task *task, int state) task->finished = true; if (source_session == NULL) { (void) qr_task_on_send(task, NULL, kr_error(EIO)); - return state == KR_STATE_DONE ? 0 : kr_error(EIO); + return state == KR_STATE_DONE ? kr_ok() : kr_error(EIO); } + if (session_flags(source_session)->closing || + ctx->source.addr.ip.sa_family == AF_UNSPEC) + return kr_error(EINVAL); + /* Reference task as the callback handler can close it */ qr_task_ref(task); /* Send back answer */ - assert(!session_flags(source_session)->closing); - assert(ctx->source.addr.ip.sa_family != AF_UNSPEC); - int ret; const uv_handle_t *src_handle = session_get_handle(source_session); if (src_handle->type != UV_UDP && src_handle->type != UV_TCP) { @@ -1207,7 +1208,9 @@ static int qr_task_finalize(struct qr_task *task, int state) qr_task_unref(task); - return state == KR_STATE_DONE ? 0 : kr_error(EIO); + if (ret != kr_ok() || state != KR_STATE_DONE) + return kr_error(EIO); + return kr_ok(); } static int udp_task_step(struct qr_task *task, -- 2.47.2