]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: bugfixes
authorgrid <grigorii.demidov@nic.cz>
Mon, 27 Nov 2017 23:04:32 +0000 (00:04 +0100)
committerPetr Špaček <petr.spacek@nic.cz>
Mon, 8 Jan 2018 11:01:00 +0000 (12:01 +0100)
daemon/io.c
daemon/worker.c

index efce48d67c00a2e74b2378af6e7f2f5a2a694fcb..22185ea3adf87d3354b2ddcaabc5173a4f634d99 100644 (file)
@@ -219,6 +219,9 @@ int udp_bindfd(uv_udp_t *handle, int fd)
 static void tcp_timeout_trigger(uv_timer_t *timer)
 {
        struct session *session = timer->data;
+       struct worker_ctx *worker = timer->loop->data;
+
+       assert(session->outgoing == false);
        if (session->tasks.len > 0) {
                uv_timer_again(timer);
        } else {
@@ -258,7 +261,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
                }
        /* Connection spawned at least one request, reset its deadline for next query.
         * https://tools.ietf.org/html/rfc7766#section-6.2.3 */
-       } else if (ret > 0 && !s->outgoing) {
+       } else if (ret > 0 && !s->outgoing && !s->closing) {
                uv_timer_again(&s->timeout);
        }
        mp_flush(worker->pkt_pool.ctx);
index 39fe93be8dd1816212c0128d0b8969e521fe03ca..614ad006a48b4de05ed21cc3798163e2f44efb26 100644 (file)
@@ -352,10 +352,10 @@ static void session_close(struct session *session)
                return;
        }
 
-       if (session->buffering != NULL) {
+       if (!session->outgoing && session->buffering != NULL) {
                qr_task_complete(session->buffering);
-               session->buffering = NULL;
        }
+       session->buffering = NULL;
 
        uv_handle_t *handle = session->handle;
        io_stop_read(handle);
@@ -702,19 +702,15 @@ static void qr_task_free(struct qr_task *task)
        struct worker_ctx *worker = ctx->worker;
 
        /* Process source session. */
-       if (source_session) {
-               /* Walk the session task list and remove itself. */
-               assert(source_session->outgoing == false);
-               session_del_tasks(source_session, task);
+       if (source_session &&
+           source_session->tasks.len < worker->tcp_pipeline_max/2 &&
+           !source_session->closing && source_session->throttled) {
+               uv_handle_t *handle = source_session->handle;
                /* Start reading again if the session is throttled and
                 * the number of outgoing requests is below watermark. */
-               uv_handle_t *handle = source_session->handle;
-               if (handle && source_session->tasks.len < worker->tcp_pipeline_max/2) {
-                       if (!uv_is_closing(handle) && source_session->throttled) {
-                               assert(source_session->closing == false);
-                               io_start_read(handle);
-                               source_session->throttled = false;
-                       }
+               if (handle) {
+                       io_start_read(handle);
+                       source_session->throttled = false;
                }
        }
 
@@ -749,7 +745,7 @@ static int qr_task_register(struct qr_task *task, struct session *session)
         * when resuming reading. This is NYI.  */
        if (session->tasks.len >= task->ctx->worker->tcp_pipeline_max) {
                uv_handle_t *handle = session->handle;
-               if (handle && !session->throttled && !uv_is_closing(handle)) {
+               if (handle && !session->throttled && !session->closing) {
                        io_stop_read(handle);
                        session->throttled = true;
                }
@@ -770,6 +766,12 @@ static void qr_task_complete(struct qr_task *task)
        if (task->on_complete) {
                task->on_complete(worker, &ctx->req, task->baton);
        }
+       struct session *source_session = ctx->source.session;
+       if (source_session) {
+               assert(source_session->outgoing == false &&
+                      source_session->waiting.len == 0);
+               session_del_tasks(source_session, task);
+       }
        /* Release primary reference to task. */
        request_del_tasks(ctx, task);
 }
@@ -777,7 +779,6 @@ static void qr_task_complete(struct qr_task *task)
 /* This is called when we send subrequest / answer */
 static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
 {
-       assert(handle == NULL || uv_is_closing(handle) == false);
        if (task->finished) {
                assert(task->leading == false);
                qr_task_complete(task);
@@ -785,53 +786,68 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
                        return status;
                }
                struct session* session = handle->data;
+               assert(session);
                if (!session->outgoing ||
                    session->waiting.len == 0) {
                        return status;
                }
        }
 
-
-       if (status == 0 && handle) {
+       if (handle) {
                struct session* session = handle->data;
+               if (!session->outgoing && task->ctx->source.session) {
+                       assert (task->ctx->source.session->handle == handle);
+               }
                if (handle->type == UV_TCP && session->outgoing &&
                    session->waiting.len > 0) {
                        session_del_waiting(session, task);
+                       if (session->closing) {
+                               return status;
+                       }
+                       if (status != 0) {
+                               /* Add to the end for retry */
+                               session_add_waiting(session, task);
+                       }
                        if (session->waiting.len > 0) {
                                struct qr_task *t = session->waiting.at[0];
                                int ret = qr_task_send(t, (uv_handle_t *)handle,
                                                       &session->peer.ip, t->pktbuf);
-                               if (ret != kr_ok()) {
+                               if (ret == kr_ok()) {
+                                       uv_timer_t *timer = &session->timeout;
+                                       uv_timer_stop(timer);
+                                       session->timeout.data = session;
+                                       timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+                               } else {
                                        uv_timer_t *timer = &session->timeout;
                                        uv_timer_stop(timer);
                                        while (session->waiting.len > 0) {
-                                               struct qr_task *task = session->waiting.at[0];
+                                               struct qr_task *t = session->waiting.at[0];
                                                if (session->outgoing) {
-                                                       qr_task_finalize(task, KR_STATE_FAIL);
+                                                       qr_task_finalize(t, KR_STATE_FAIL);
                                                } else {
-                                                       assert(task->ctx->source.session == session);
-                                                       task->ctx->source.session = NULL;
+                                                       assert(t->ctx->source.session == session);
+                                                       t->ctx->source.session = NULL;
                                                }
                                                array_del(session->waiting, 0);
-                                               qr_task_unref(task);
-                                               session_del_tasks(session, task);
+                                               session_del_tasks(session, t);
+                                               qr_task_unref(t);
                                        }
                                        while (session->tasks.len > 0) {
-                                               struct qr_task *task = session->tasks.at[0];
+                                               struct qr_task *t = session->tasks.at[0];
                                                if (session->outgoing) {
-                                                       qr_task_finalize(task, KR_STATE_FAIL);
+                                                       qr_task_finalize(t, KR_STATE_FAIL);
                                                } else {
-                                                       assert(task->ctx->source.session == session);
-                                                       task->ctx->source.session = NULL;
+                                                       assert(t->ctx->source.session == session);
+                                                       t->ctx->source.session = NULL;
                                                }
-                                               session_del_tasks(session, task);
+                                               session_del_tasks(session, t);
                                        }
                                        session_close(session);
                                        return status;
                                }
                        }
                }
-               if (!uv_is_closing(handle)) {
+               if (!session->closing) {
                        io_start_read(handle); /* Start reading new query */
                }
        }
@@ -845,9 +861,7 @@ static void on_send(uv_udp_send_t *req, int status)
        struct worker_ctx *worker = loop->data;
        assert(worker == get_worker());
        struct qr_task *task = req->data;
-       if (qr_valid_handle(task, handle)) {
-               qr_task_on_send(task, handle, status);
-       }
+       qr_task_on_send(task, handle, status);
        qr_task_unref(task);
        iorequest_release(worker, req);
 }
@@ -859,9 +873,7 @@ static void on_write(uv_write_t *req, int status)
        struct worker_ctx *worker = loop->data;
        assert(worker == get_worker());
        struct qr_task *task = req->data;
-       if (qr_valid_handle(task, handle)) {
-               qr_task_on_send(task, handle, status);
-       }
+       qr_task_on_send(task, handle, status);
        qr_task_unref(task);
        iorequest_release(worker, req);
 }
@@ -977,11 +989,9 @@ static int session_next_waiting_send(struct session *session)
        if (session->waiting.len > 0) {
                struct qr_task *task = session->waiting.at[0];
                ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf);
-               if (ret == kr_ok()) {
-                       session->timeout.data = session;
-                       timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
-               }
        }
+       session->timeout.data = session;
+       timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
        return ret;
 }
 
@@ -1167,16 +1177,18 @@ static void on_tcp_watchdog_timeout(uv_timer_t *timer)
        uv_timer_stop(timer);
        struct worker_ctx *worker = get_worker();
 
-       worker_del_tcp_connected(worker, &session->peer.ip);
+       if (session->outgoing) {
+               worker_del_tcp_connected(worker, &session->peer.ip);
 
-       while (session->waiting.len > 0) {
-               struct qr_task *task = session->waiting.at[0];
-               task->timeouts += 1;
-               worker->stats.timeout += 1;
-               array_del(session->waiting, 0);
-               qr_task_unref(task);
-               session_del_tasks(session, task);
-               qr_task_finalize(task, KR_STATE_FAIL);
+               while (session->waiting.len > 0) {
+                       struct qr_task *task = session->waiting.at[0];
+                       task->timeouts += 1;
+                       worker->stats.timeout += 1;
+                       array_del(session->waiting, 0);
+                       session_del_tasks(session, task);
+                       qr_task_finalize(task, KR_STATE_FAIL);
+                       qr_task_unref(task);
+               }
        }
 
        while (session->tasks.len > 0) {
@@ -1185,8 +1197,8 @@ static void on_tcp_watchdog_timeout(uv_timer_t *timer)
                worker->stats.timeout += 1;
                assert(task->refs > 1);
                array_del(session->tasks, 0);
-               qr_task_unref(task);
                qr_task_finalize(task, KR_STATE_FAIL);
+               qr_task_unref(task);
        }
 
        session_close(session);
@@ -1229,7 +1241,7 @@ static void on_udp_timeout(uv_timer_t *timer)
 static void on_session_idle_timeout(uv_timer_t *timer)
 {
        struct session *s = timer->data;
-       assert(s && s->outgoing);
+       assert(s);
        uv_timer_stop(timer);
        if (s->closing) {
                return;
@@ -1246,6 +1258,9 @@ static uv_handle_t *retransmit(struct qr_task *task)
        uv_handle_t *ret = NULL;
        if (task && task->addrlist && task->addrlist_count > 0) {
                struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
+               if (!choice) {
+                       return ret;
+               }
                ret = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
                if (ret &&
                    qr_task_send(task, ret, (struct sockaddr *)choice,
@@ -1310,6 +1325,7 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
                        qry->secret = leader_qry->secret;
                        leader_qry->secret = 0; /* Next will be already decoded */
                }
+               struct session *follower_source_session = follower->ctx->source.session;
                qr_task_step(follower, packet_source, pkt);
                qr_task_unref(follower);
        }
@@ -1358,7 +1374,7 @@ static int qr_task_finalize(struct qr_task *task, int state)
        if (ctx->source.session != NULL) {
                uv_handle_t *handle = ctx->source.session->handle;
                assert(ctx->source.session->closing == false);
-               assert(handle->data == ctx->source.session);
+               assert(handle && handle->data == ctx->source.session);
                assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
                (void) qr_task_send(task, handle,
                                    (struct sockaddr *)&ctx->source.addr,
@@ -1455,6 +1471,7 @@ static int qr_task_step(struct qr_task *task,
                 */
                subreq_lead(task);
                struct session *session = handle->data;
+               assert(session->handle->type == UV_UDP);
                ret = timer_start(session, on_retransmit, timeout, 0);
                /* Start next step with timeout, fatal if can't start a timer. */
                if (ret != 0) {
@@ -1585,6 +1602,7 @@ static int qr_task_step(struct qr_task *task,
                                        session_del_waiting(session, task);
                                        worker_del_tcp_waiting(ctx->worker, addr);
                                        iorequest_release(ctx->worker, conn);
+                                       subreq_finalize(task, packet_source, packet);
                                        return qr_task_step(task, NULL, NULL);
                                }
                                tls_client_ctx_set_params(tls_ctx, entry);
@@ -1614,10 +1632,12 @@ static int qr_task_step(struct qr_task *task,
 
                        if (uv_tcp_connect(conn, (uv_tcp_t *)client,
                                           addr , on_connect) != 0) {
+                               uv_timer_stop(&session->timeout);
                                session_del_tasks(session, task);
                                session_del_waiting(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
                                iorequest_release(ctx->worker, conn);
+                               subreq_finalize(task, packet_source, packet);
                                return qr_task_step(task, NULL, NULL);
                        }
                }
@@ -1840,7 +1860,7 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
        }
        /* Connection error or forced disconnect */
        struct session *session = handle->data;
-       assert(session);
+       assert(session && session->handle == (uv_handle_t *)handle && handle->type == UV_TCP);
        if (session->closing) {
                return kr_ok();
        }
@@ -1856,7 +1876,6 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                                  addr_str, sizeof(addr_str));
                        VERBOSE_MSG(NULL, "=> connection to '%s' closed by peer\n", addr_str);
                }
-
                uv_timer_t *timer = &session->timeout;
                uv_timer_stop(timer);
                struct sockaddr *peer = &session->peer.ip;
@@ -1869,6 +1888,11 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                                                TLS_HS_NOT_STARTED);
                }
 
+               if (session->outgoing && session->buffering) {
+                       session->buffering = NULL;
+               }
+
+               assert(session->tasks.len >= session->waiting.len);
                while (session->waiting.len > 0) {
                        struct qr_task *task = session->waiting.at[0];
                        array_del(session->waiting, 0);
@@ -1909,29 +1933,30 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                        session_del_tasks(session, task);
                }
                session_close(session);
-               return kr_error(ECONNRESET);
+               return kr_ok();
        }
 
        if (session->outgoing) {
                uv_timer_stop(&session->timeout);
                timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
-               if (session->bytes_to_skip) {
-                       session->buffering = NULL;
-                       ssize_t min_len = MIN(session->bytes_to_skip, len);
-                       len -= min_len;
-                       msg += min_len;
-                       session->bytes_to_skip -= min_len;
-                       if (len < 0 || session->bytes_to_skip < 0) {
-                               /* Something gone wrong.
-                                * Better kill the connection */
-                               assert(false);
-                               return kr_error(EILSEQ);
-                       }
-                       if (len == 0) {
-                               return kr_ok();
-                       }
-                       assert(session->bytes_to_skip == 0);
+       }
+
+       if (session->bytes_to_skip) {
+               assert(session->buffering == NULL);
+               ssize_t min_len = MIN(session->bytes_to_skip, len);
+               len -= min_len;
+               msg += min_len;
+               session->bytes_to_skip -= min_len;
+               if (len < 0 || session->bytes_to_skip < 0) {
+                       /* Something gone wrong.
+                        * Better kill the connection */
+                       assert(false);
+                       return kr_error(EILSEQ);
                }
+               if (len == 0) {
+                       return kr_ok();
+               }
+               assert(session->bytes_to_skip == 0);
        }
 
        int submitted = 0;
@@ -1973,8 +1998,8 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                                        task->ctx->source.session = NULL;
                                }
                                array_del(session->waiting, 0);
-                               qr_task_unref(task);
                                session_del_tasks(session, task);
+                               qr_task_unref(task);
                        }
                        while (session->tasks.len > 0) {
                                struct qr_task *task = session->tasks.at[0];
@@ -1988,7 +2013,7 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                        }
                        session_close(session);
 
-                       return kr_error(EILSEQ);
+                       return kr_ok();
                }
 
                /* get task */
@@ -2019,8 +2044,8 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                         * that we will get multiple matches sooner or later (!) */
                        if (task) {
                                knot_pkt_clear(task->pktbuf);
+                               assert(task->leading == false);
                        } else  {
-                               session->buffering = NULL;
                                session->bytes_to_skip = msg_size - 2;
                                ssize_t min_len = MIN(session->bytes_to_skip, len);
                                len -= min_len;
@@ -2033,11 +2058,15 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                                        return kr_error(EILSEQ);
                                }
                                if (len == 0) {
-                                       return kr_ok();
+                                       return submitted;
                                }
                                assert(session->bytes_to_skip == 0);
                                int ret = worker_process_tcp(worker, handle, msg, len);
-                               submitted += ret;
+                               if (ret < 0) {
+                                       submitted = ret;
+                               } else {
+                                       submitted += ret;
+                               }
                                return submitted;
                        }
                }
@@ -2046,6 +2075,7 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                knot_wire_set_id(pkt_buf->wire, msg_id);
                pkt_buf->size = 2;
                task->bytes_remaining = msg_size - 2;
+               assert(session->buffering == NULL);
                session->buffering = task;
        }
        /* At this point session must have either created new task
@@ -2057,15 +2087,32 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
        if (pkt_buf->size + to_read > pkt_buf->max_size) {
                // TODO reallocate pkt_buf
                pkt_buf->size = 0;
+               len -= to_read;
+               msg += to_read;
                session->bytes_to_skip = task->bytes_remaining - to_read;
                task->bytes_remaining = 0;
-               session->buffering = NULL;
+               if (session->buffering) {
+                       if (!session->outgoing) {
+                               qr_task_complete(session->buffering);
+                       }
+                       session->buffering = NULL;
+               }
+               if (len > 0) {
+                       int ret = worker_process_tcp(worker, handle, msg, len);
+                       if (ret < 0) {
+                               submitted = ret;
+                       } else {
+                               submitted += ret;
+                       }
+               }
                return submitted;
        }
        /* Buffer message and check if it's complete */
        memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
        pkt_buf->size += to_read;
        task->bytes_remaining -= to_read;
+       len -= to_read;
+       msg += to_read;
        if (task->bytes_remaining == 0) {
                /* Message was assembled, clear temporary. */
                session->buffering = NULL;
@@ -2094,9 +2141,9 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                        const struct sockaddr *addr = session->outgoing ? &session->peer.ip : NULL;
                        ret = qr_task_step(task, addr, pkt_buf);
                }
-               if (len - to_read > 0) {
+               if (len > 0) {
                        /* TODO: this is simple via iteration; recursion doesn't really help */
-                       ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read);
+                       ret = worker_process_tcp(worker, handle, msg, len);
                        if (ret < 0) {
                                assert(false);
                                return ret;
@@ -2104,6 +2151,7 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                        submitted += ret;
                }
        }
+       assert(submitted >= 0);
        return submitted;
 }