]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: improved reliability under heavy load; bugfixing & minor refactoring
authorGrigorii Demidov <grigorii.demidov@nic.cz>
Tue, 14 Nov 2017 12:03:06 +0000 (13:03 +0100)
committerPetr Špaček <petr.spacek@nic.cz>
Mon, 8 Jan 2018 11:00:59 +0000 (12:00 +0100)
daemon/io.c
daemon/network.c
daemon/worker.c
daemon/worker.h

index 112b19611bef0668f124a25ffec6e8e35ee099a9..18232aea30775e01bc5b615db7ec221ff8523a11 100644 (file)
@@ -33,6 +33,8 @@
        } \
 } while (0)
 
+void io_release(uv_handle_t *handle);
+
 static void check_bufsize(uv_handle_t* handle)
 {
        /* We want to buffer at least N waves in advance.
@@ -108,7 +110,7 @@ static void session_release(struct worker_ctx *worker, uv_handle_t *handle)
 
 static uv_stream_t *handle_alloc(uv_loop_t *loop)
 {
-       uv_stream_t *handle = calloc(1, sizeof(*handle));
+       uv_stream_t *handle = calloc(1, sizeof(union uv_handles));
        if (!handle) {
                return NULL;
        }
@@ -116,6 +118,17 @@ static uv_stream_t *handle_alloc(uv_loop_t *loop)
        return handle;
 }
 
+static uv_stream_t *handle_borrow(uv_loop_t *loop)
+{
+       struct worker_ctx *worker = loop->data;
+       void *req = worker_iohandle_borrow(worker);
+       if (!req) {
+               return NULL;
+       }
+
+       return (uv_stream_t *)req;
+}
+
 static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
 {
        /* Worker has single buffer which is reused for all incoming
@@ -144,6 +157,10 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
 {
        uv_loop_t *loop = handle->loop;
        struct worker_ctx *worker = loop->data;
+       struct session *s = handle->data;
+       if (s->closing) {
+               return;
+       }
        if (nread <= 0) {
                if (nread < 0) { /* Error response, notify resolver */
                        worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
@@ -165,6 +182,7 @@ static int udp_bind_finalize(uv_handle_t *handle)
        /* Handle is already created, just create context. */
        struct session *session = session_new();
        assert(session);
+       session->outgoing = false;
        session->handle = handle;
        handle->data = session;
        return io_start_read((uv_handle_t *)handle);
@@ -249,14 +267,14 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
        if (status != 0) {
                return;
        }
-       uv_stream_t *client = handle_alloc(master->loop);
+       uv_stream_t *client = handle_borrow(master->loop);
        if (!client) {
                return;
        }
        memset(client, 0, sizeof(*client));
        io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
        if (uv_accept(master, client) != 0) {
-               uv_close((uv_handle_t *)client, io_free);
+               uv_close((uv_handle_t *)client, io_release);
                return;
        }
 
@@ -264,13 +282,12 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
         * It will re-check every half of a request time limit if the connection
         * is idle and should be terminated, this is an educated guess. */
        struct session *session = client->data;
+       assert(session->outgoing == false);
        session->has_tls = tls;
        if (tls && !session->tls_ctx) {
                session->tls_ctx = tls_new(master->loop->data);
        }
        uv_timer_t *timer = &session->timeout;
-       uv_timer_init(master->loop, timer);
-       timer->data = session;
        uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
        io_start_read((uv_handle_t *)client);
 }
@@ -376,13 +393,14 @@ int tcp_bindfd_tls(uv_tcp_t *handle, int fd)
 
 void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
 {
+       int ret = -1;
        if (type == SOCK_DGRAM) {
-               uv_udp_init(loop, (uv_udp_t *)handle);
-       } else {
-               uv_tcp_init(loop, (uv_tcp_t *)handle);
+               ret = uv_udp_init(loop, (uv_udp_t *)handle);
+       } else if (type == SOCK_STREAM) {
+               ret = uv_tcp_init(loop, (uv_tcp_t *)handle);
                uv_tcp_nodelay((uv_tcp_t *)handle, 1);
        }
-
+       assert(ret == 0);
        struct worker_ctx *worker = loop->data;
        struct session *session = session_borrow(worker);
        assert(session);
@@ -417,13 +435,25 @@ void io_free(uv_handle_t *handle)
        free(handle);
 }
 
+void io_release(uv_handle_t *handle)
+{
+       if (!handle) {
+               return;
+       }
+       uv_loop_t *loop = handle->loop;
+       struct worker_ctx *worker = loop->data;
+       io_deinit(handle);
+       worker_iohandle_release(worker, handle);
+}
+
 int io_start_read(uv_handle_t *handle)
 {
        if (handle->type == UV_UDP) {
                return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
-       } else {
+       } else if (handle->type == UV_TCP) {
                return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
        }
+       assert(false);
 }
 
 int io_stop_read(uv_handle_t *handle)
index e181cae703c7210533d1c7372019f31baf2771c7..040d4ed9c6cb94783b4293f0ea173061319853df 100644 (file)
@@ -140,7 +140,7 @@ static int open_endpoint(struct network *net, struct endpoint *ep, struct sockad
 {
        int ret = 0;
        if (flags & NET_UDP) {
-               ep->udp = malloc(sizeof(*ep->udp));
+               ep->udp = malloc(sizeof(union uv_handles));
                if (!ep->udp) {
                        return kr_error(ENOMEM);
                }
@@ -153,7 +153,7 @@ static int open_endpoint(struct network *net, struct endpoint *ep, struct sockad
                ep->flags |= NET_UDP;
        }
        if (flags & NET_TCP) {
-               ep->tcp = malloc(sizeof(*ep->tcp));
+               ep->tcp = malloc(sizeof(union uv_handles));
                if (!ep->tcp) {
                        return kr_error(ENOMEM);
                }
@@ -185,7 +185,7 @@ static int open_endpoint_fd(struct network *net, struct endpoint *ep, int fd, in
                if (ep->udp) {
                        return kr_error(EEXIST);
                }
-               ep->udp = malloc(sizeof(*ep->udp));
+               ep->udp = malloc(sizeof(union uv_handles));// malloc(sizeof(*ep->udp));
                if (!ep->udp) {
                        return kr_error(ENOMEM);
                }
@@ -197,12 +197,11 @@ static int open_endpoint_fd(struct network *net, struct endpoint *ep, int fd, in
                }
                ep->flags |= NET_UDP;
                return kr_ok();
-       }
-       if (sock_type == SOCK_STREAM) {
+       } else if (sock_type == SOCK_STREAM) {
                if (ep->tcp) {
                        return kr_error(EEXIST);
                }
-               ep->tcp = malloc(sizeof(*ep->tcp));
+               ep->tcp = malloc(sizeof(union uv_handles));
                if (!ep->tcp) {
                        return kr_error(ENOMEM);
                }
index a7c96f739181d905ce6f6961f1f4eb109d417ef4..603be8e7350440390d828e6f2badbc79408f9200 100644 (file)
@@ -76,25 +76,6 @@ struct qr_task
 };
 
 
-int32_t tcp_connected = 0;
-int32_t tcp_waiting = 0;
-
-/* @internal Union of various libuv objects for freelist. */
-struct req
-{
-       union {
-               /* Socket handles, these have session as their `handle->data` and own it. */
-               uv_udp_t      udp;
-               uv_tcp_t      tcp;
-               /* I/O events, these have only a reference to the task they're operating on. */
-               uv_udp_send_t send;
-               uv_write_t    write;
-               uv_connect_t  connect;
-               /* Timer events */
-               uv_timer_t    timer;
-       } as;
-};
-
 /* Convenience macros */
 #define qr_task_ref(task) \
        do { ++(task)->refs; } while(0)
@@ -148,29 +129,75 @@ static inline struct worker_ctx *get_worker(void)
        return uv_default_loop()->data;
 }
 
-static inline struct req *req_borrow(struct worker_ctx *worker)
+static inline void *iohandle_borrow(struct worker_ctx *worker)
+{
+       void *h = NULL;
+
+       const size_t size = sizeof(union uv_handles);
+       if (worker->pool_iohandles.len > 0) {
+               h = array_tail(worker->pool_iohandles);
+               array_pop(worker->pool_iohandles);
+               kr_asan_unpoison(h, size);
+       } else {
+               h = malloc(size);
+       }
+
+       return h;
+}
+
+static inline void iohandle_release(struct worker_ctx *worker, void *h)
 {
-       struct req *req = NULL;
-       if (worker->pool_ioreq.len > 0) {
-               req = array_tail(worker->pool_ioreq);
-               array_pop(worker->pool_ioreq);
-               kr_asan_unpoison(req, sizeof(*req));
+       assert(h);
+
+       const size_t size = sizeof(union uv_handles);
+       if (worker->pool_iohandles.len < MP_FREELIST_SIZE) {
+               array_push(worker->pool_iohandles, h);
+               kr_asan_poison(h, size);
        } else {
-               req = malloc(sizeof(*req));
+               free(h);
        }
-       return req;
 }
 
-static inline void req_release(struct worker_ctx *worker, struct req *req)
+void *worker_iohandle_borrow(struct worker_ctx *worker)
 {
-       if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) {
-               array_push(worker->pool_ioreq, req);
-               kr_asan_poison(req, sizeof(*req));
+       return iohandle_borrow(worker);
+}
+
+void worker_iohandle_release(struct worker_ctx *worker, void *h)
+{
+       iohandle_release(worker, h);
+}
+
+static inline void *iorequest_borrow(struct worker_ctx *worker)
+{
+       void *r = NULL;
+
+       const size_t size = sizeof(union uv_reqs);
+       if (worker->pool_ioreqs.len > 0) {
+               r = array_tail(worker->pool_ioreqs);
+               array_pop(worker->pool_ioreqs);
+               kr_asan_unpoison(r, size);
        } else {
-               free(req);
+               r = malloc(size);
        }
+
+       return r;
 }
 
+static inline void iorequest_release(struct worker_ctx *worker, void *r)
+{
+       assert(r);
+
+       const size_t size = sizeof(union uv_reqs);
+       if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) {
+               array_push(worker->pool_ioreqs, r);
+               kr_asan_poison(r, size);
+       } else {
+               free(r);
+       }
+}
+
+
 /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
  *  socktype is SOCK_* */
 static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family)
@@ -188,7 +215,8 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
        }
        /* Create connection for iterative query */
        struct worker_ctx *worker = task->ctx->worker;
-       uv_handle_t *handle = (uv_handle_t *)req_borrow(worker);
+       void *h = iohandle_borrow(worker);
+       uv_handle_t *handle = (uv_handle_t *)h;
        if (!handle) {
                return NULL;
        }
@@ -205,9 +233,11 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
        if (addr->ip.sa_family != AF_UNSPEC) {
                assert(addr->ip.sa_family == family);
                if (socktype == SOCK_DGRAM) {
-                       ret = uv_udp_bind((uv_udp_t *)handle, &addr->ip, 0);
-               } else {
-                       ret = uv_tcp_bind((uv_tcp_t *)handle, &addr->ip, 0);
+                       uv_udp_t *udp = (uv_udp_t *)handle;
+                       ret = uv_udp_bind(udp, &addr->ip, 0);
+               } else if (socktype == SOCK_STREAM){
+                       uv_tcp_t *tcp = (uv_tcp_t *)handle;
+                       ret = uv_tcp_bind(tcp, &addr->ip, 0);
                }
        }
 
@@ -219,7 +249,7 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
        }
        if (ret < 0) {
                io_deinit(handle);
-               req_release(worker, (struct req *)handle);
+               iohandle_release(worker, h);
                return NULL;
        }
        /* Connect or issue query datagram */
@@ -230,27 +260,21 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
 
 static void on_session_close(uv_handle_t *handle)
 {
-       struct worker_ctx *worker = get_worker();
+       uv_loop_t *loop = handle->loop;
+       struct worker_ctx *worker = loop->data;
        struct session *session = handle->data;
-       if (!session->outgoing) {
-               assert(session->handle->type == UV_TCP);
-       }
-       bool free_handle = false;
-       if (!session->outgoing && session->handle->type == UV_TCP) {
-               free_handle = true;
-       }
+       assert(session->handle == handle);
+       session->handle = NULL;
        io_deinit(handle);
-       if (free_handle) {
-               free(handle);
-       } else {
-               req_release(worker, (struct req *)handle);
-       }
+       iohandle_release(worker, handle);
 }
 
 static void on_session_timer_close(uv_handle_t *timer)
 {
        struct session *session = timer->data;
        uv_handle_t *handle = session->handle;
+       assert(handle && handle->data == session);
+       assert (session->outgoing || handle->type == UV_TCP);
        if (!uv_is_closing(handle)) {
                uv_close(handle, on_session_close);
        }
@@ -331,6 +355,8 @@ static void session_close(struct session *session)
                session->buffering = NULL;
        }
 
+       uv_handle_t *handle = session->handle;
+       io_stop_read(handle);
        session->closing = true;
        if (session->outgoing &&
            session->peer.ip.sa_family != AF_UNSPEC) {
@@ -488,7 +514,11 @@ static struct request_ctx *request_create(struct worker_ctx *worker,
        /* TODO Relocate pool to struct request */
        ctx->worker = worker;
        array_init(ctx->tasks);
-       ctx->source.session = handle ? handle->data : NULL;
+       struct session *session = handle ? handle->data : NULL;
+       if (session) {
+               assert(session->outgoing == false);
+       }
+       ctx->source.session = session;
 
        struct kr_request *req = &ctx->req;
        req->pool = pool;
@@ -672,12 +702,14 @@ static void qr_task_free(struct qr_task *task)
        /* Process source session. */
        if (source_session) {
                /* Walk the session task list and remove itself. */
+               assert(source_session->outgoing == false);
                session_del_tasks(source_session, task);
                /* Start reading again if the session is throttled and
                 * the number of outgoing requests is below watermark. */
                uv_handle_t *handle = source_session->handle;
                if (handle && source_session->tasks.len < worker->tcp_pipeline_max/2) {
                        if (!uv_is_closing(handle) && source_session->throttled) {
+                               assert(source_session->closing == false);
                                io_start_read(handle);
                                source_session->throttled = false;
                        }
@@ -696,7 +728,7 @@ static void qr_task_free(struct qr_task *task)
 /*@ Register new qr_task within session. */
 static int qr_task_register(struct qr_task *task, struct session *session)
 {
-       assert(session->outgoing == false);
+       assert(session->outgoing == false && session->handle->type == UV_TCP);
 
        int ret = array_reserve(session->tasks, session->tasks.len + 1);
        if (ret != 0) {
@@ -743,6 +775,7 @@ static void qr_task_complete(struct qr_task *task)
 /* This is called when we send subrequest / answer */
 static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
 {
+       assert(handle == NULL || uv_is_closing(handle) == false);
        if (task->finished) {
                assert(task->leading == false);
                qr_task_complete(task);
@@ -767,46 +800,68 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
                                int ret = qr_task_send(t, (uv_handle_t *)handle,
                                                       &session->peer.ip, t->pktbuf);
                                if (ret != kr_ok()) {
+                                       uv_timer_t *timer = &session->timeout;
+                                       uv_timer_stop(timer);
                                        while (session->waiting.len > 0) {
                                                struct qr_task *task = session->waiting.at[0];
+                                               if (session->outgoing) {
+                                                       qr_task_finalize(task, KR_STATE_FAIL);
+                                               } else {
+                                                       assert(task->ctx->source.session == session);
+                                                       task->ctx->source.session = NULL;
+                                               }
                                                array_del(session->waiting, 0);
-                                               qr_task_finalize(task, KR_STATE_FAIL);
+                                               qr_task_unref(task);
+                                               session_del_tasks(session, task);
                                        }
                                        while (session->tasks.len > 0) {
                                                struct qr_task *task = session->tasks.at[0];
-                                               array_del(session->tasks, 0);
-                                               qr_task_finalize(task, KR_STATE_FAIL);
+                                               if (session->outgoing) {
+                                                       qr_task_finalize(task, KR_STATE_FAIL);
+                                               } else {
+                                                       assert(task->ctx->source.session == session);
+                                                       task->ctx->source.session = NULL;
+                                               }
+                                               session_del_tasks(session, task);
                                        }
                                        session_close(session);
                                        return status;
                                }
                        }
                }
-               io_start_read(handle); /* Start reading new query */
+               if (!uv_is_closing(handle)) {
+                       io_start_read(handle); /* Start reading new query */
+               }
        }
        return status;
 }
 
 static void on_send(uv_udp_send_t *req, int status)
 {
-       struct worker_ctx *worker = get_worker();
+       uv_handle_t *handle = (uv_handle_t *)(req->handle);
+       uv_loop_t *loop = handle->loop;
+       struct worker_ctx *worker = loop->data;
+       assert(worker == get_worker());
        struct qr_task *task = req->data;
-       if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
-               qr_task_on_send(task, (uv_handle_t *)req->handle, status);
+       if (qr_valid_handle(task, handle)) {
+               qr_task_on_send(task, handle, status);
        }
        qr_task_unref(task);
-       req_release(worker, (struct req *)req);
+       iorequest_release(worker, req);
 }
 
 static void on_write(uv_write_t *req, int status)
 {
-       struct worker_ctx *worker = get_worker();
+       uv_handle_t *handle = (uv_handle_t *)(req->handle);
+       uv_loop_t *loop = handle->loop;
+       struct worker_ctx *worker = loop->data;
+       assert(worker == get_worker());
        struct qr_task *task = req->data;
-       if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
-               qr_task_on_send(task, (uv_handle_t *)req->handle, status);
+       if (qr_valid_handle(task, handle)) {
+               qr_task_on_send(task, handle, status);
        }
        qr_task_unref(task);
-       req_release(worker, (struct req *)req);
+       iorequest_release(worker, req);
 }
 
 static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
@@ -817,6 +872,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
 
        /* Synchronous push to TLS context, bypassing event loop. */
        struct session *session = handle->data;
+       assert(session->closing == false);
        if (session->has_tls) {
                struct kr_request *req = &task->ctx->req;
                int ret = kr_ok();
@@ -837,8 +893,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
        struct request_ctx *ctx = task->ctx;
        struct worker_ctx *worker = ctx->worker;
        struct kr_request *req = &ctx->req;
-       struct req *send_req = req_borrow(worker);
-       if (!send_req) {
+       void *ioreq = iorequest_borrow(worker);
+       if (!ioreq) {
                return qr_task_on_send(task, handle, kr_error(ENOMEM));
        }
        if (knot_wire_get_qr(pkt->wire) == 0) {
@@ -858,33 +914,38 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
                                          handle->type == UV_UDP ? SOCK_DGRAM : SOCK_STREAM,
                                          pkt);
                if (ret != kr_ok()) {
-                       req_release(worker, send_req);
+                       iorequest_release(worker, ioreq);
                        return ret;
                }
        }
        /* Send using given protocol */
        if (handle->type == UV_UDP) {
+               uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
                uv_buf_t buf = { (char *)pkt->wire, pkt->size };
-               send_req->as.send.data = task;
-               ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
-       } else {
+               send_req->data = task;
+               ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
+       } else if (handle->type == UV_TCP) {
+               uv_write_t *write_req = (uv_write_t *)ioreq;
                uint16_t pkt_size = htons(pkt->size);
                uv_buf_t buf[2] = {
                        { (char *)&pkt_size, sizeof(pkt_size) },
                        { (char *)pkt->wire, pkt->size }
                };
-               send_req->as.write.data = task;
-               ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
+               write_req->data = task;
+               ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
+       } else {
+               assert(false);
        }
+
        if (ret == 0) {
                qr_task_ref(task); /* Pending ioreq on current task */
                if (worker->too_many_open &&
                    worker->stats.rconcurrent <
-                       worker->rconcurrent_highwatermark - (worker->rconcurrent_highwatermark / 4)) {
+                       worker->rconcurrent_highwatermark - 10) {
                        worker->too_many_open = false;
                }
        } else {
-               req_release(worker, send_req);
+               iorequest_release(worker, ioreq);
                if (ret == UV_EMFILE) {
                        worker->too_many_open = true;
                        worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
@@ -950,14 +1011,14 @@ static void on_connect(uv_connect_t *req, int status)
        if (status == UV_ECANCELED) {
                worker_del_tcp_waiting(worker, &peer->ip);
                assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0);
-               req_release(worker, (struct req *)req);
+               iorequest_release(worker, req);
                return;
        }
 
        if (session->closing) {
                worker_del_tcp_waiting(worker, &peer->ip);
                assert(session->waiting.len == 0 && session->tasks.len == 0);
-               req_release(worker, (struct req *)req);
+               iorequest_release(worker, req);
                return;
        }
 
@@ -972,7 +1033,7 @@ static void on_connect(uv_connect_t *req, int status)
                        qr_task_unref(task);
                }
                assert(session->tasks.len == 0);
-               req_release(worker, (struct req *)req);
+               iorequest_release(worker, req);
                session_close(session);
                return;
        }
@@ -986,7 +1047,7 @@ static void on_connect(uv_connect_t *req, int status)
                        qr_task_unref(task);
                }
                assert(session->tasks.len == 0);
-               req_release(worker, (struct req *)req);
+               iorequest_release(worker, req);
                session_close(session);
                return;
        }
@@ -999,7 +1060,7 @@ static void on_connect(uv_connect_t *req, int status)
                ret = tls_client_connect_start(session->tls_client_ctx,
                                               session, session_tls_hs_cb);
                if (ret == kr_error(EAGAIN)) {
-                       req_release(worker, (struct req *)req);
+                       iorequest_release(worker, req);
                        io_start_read(session->handle);
                        return;
                }
@@ -1009,7 +1070,7 @@ static void on_connect(uv_connect_t *req, int status)
                ret = session_next_waiting_send(session);
                if (ret == kr_ok()) {
                        worker_add_tcp_connected(worker, &session->peer.ip, session);
-                       req_release(worker, (struct req *)req);
+                       iorequest_release(worker, req);
                        return;
                }
        }
@@ -1024,7 +1085,7 @@ static void on_connect(uv_connect_t *req, int status)
 
        assert(session->tasks.len == 0);
 
-       req_release(worker, (struct req *)req);
+       iorequest_release(worker, req);
        session_close(session);
 }
 
@@ -1045,7 +1106,6 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
                session_del_tasks(session, task);
                array_del(session->waiting, 0);
                qr_task_unref(task);
-               assert(task->refs == 1);
                qr_task_finalize(task, KR_STATE_FAIL);
        }
 
@@ -1090,8 +1150,11 @@ static void on_tcp_watchdog_timeout(uv_timer_t *timer)
 static void on_udp_timeout(uv_timer_t *timer)
 {
        struct session *session = timer->data;
-       uv_timer_stop(timer);
 
+       uv_handle_t *handle = session->handle;
+       assert(handle->data == session);
+
+       uv_timer_stop(timer);
        assert(session->tasks.len == 1);
        assert(session->waiting.len == 0);
 
@@ -1247,7 +1310,12 @@ static int qr_task_finalize(struct qr_task *task, int state)
        task->finished = true;
        /* Send back answer */
        if (ctx->source.session != NULL) {
-               (void) qr_task_send(task, ctx->source.session->handle,
+               uv_handle_t *handle = ctx->source.session->handle;
+               assert(ctx->source.session->closing == false);
+               assert(handle->data == ctx->source.session);
+               assert(!uv_is_closing(handle));
+               assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
+               (void) qr_task_send(task, handle,
                                    (struct sockaddr *)&ctx->source.addr,
                                    ctx->req.answer);
        } else {
@@ -1264,11 +1332,11 @@ static int qr_task_step(struct qr_task *task,
                return kr_error(ESTALE);
        }
 
-
        /* Close pending I/O requests */
        subreq_finalize(task, packet_source, packet);
        /* Consume input and produce next query */
        struct request_ctx *ctx = task->ctx;
+       assert(ctx);
        struct kr_request *req = &ctx->req;
        struct worker_ctx *worker = ctx->worker;
        int sock_type = -1;
@@ -1280,7 +1348,7 @@ static int qr_task_step(struct qr_task *task,
        if (worker->too_many_open) {
                struct kr_rplan *rplan = &req->rplan;
                if (worker->stats.rconcurrent <
-                       worker->rconcurrent_highwatermark - (worker->rconcurrent_highwatermark / 4)) {
+                       worker->rconcurrent_highwatermark - 10) {
                        worker->too_many_open = false;
                } else if (packet && kr_rplan_empty(rplan)) {
                        /* new query; TODO - make this detection more obvious */
@@ -1358,6 +1426,7 @@ static int qr_task_step(struct qr_task *task,
                }
                struct session* session = NULL;
                if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
+                       assert(session->outgoing);
                        if (session->closing) {
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
@@ -1429,14 +1498,14 @@ static int qr_task_step(struct qr_task *task,
                        task->pending_count += 1;
                } else {
                        /* Make connection */
-                       uv_connect_t *conn = (uv_connect_t *)req_borrow(ctx->worker);
+                       uv_connect_t *conn = (uv_connect_t *)iorequest_borrow(ctx->worker);
                        if (!conn) {
                                return qr_task_step(task, NULL, NULL);
                        }
                        uv_handle_t *client = ioreq_spawn(task, sock_type,
                                                          addr->sa_family);
                        if (!client) {
-                               req_release(ctx->worker, (struct req *)conn);
+                               iorequest_release(ctx->worker, conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
@@ -1444,7 +1513,7 @@ static int qr_task_step(struct qr_task *task,
                        ret = worker_add_tcp_waiting(ctx->worker, addr, session);
                        if (ret < 0) {
                                session_del_tasks(session, task);
-                               req_release(ctx->worker, (struct req *)conn);
+                               iorequest_release(ctx->worker, conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
@@ -1453,7 +1522,7 @@ static int qr_task_step(struct qr_task *task,
                        if (ret < 0) {
                                session_del_tasks(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
-                               req_release(ctx->worker, (struct req *)conn);
+                               iorequest_release(ctx->worker, conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
@@ -1470,7 +1539,7 @@ static int qr_task_step(struct qr_task *task,
                                        session_del_tasks(session, task);
                                        session_del_waiting(session, task);
                                        worker_del_tcp_waiting(ctx->worker, addr);
-                                       req_release(ctx->worker, (struct req *)conn);
+                                       iorequest_release(ctx->worker, conn);
                                        return qr_task_step(task, NULL, NULL);
                                }
                                tls_client_ctx_set_params(tls_ctx, entry);
@@ -1487,7 +1556,7 @@ static int qr_task_step(struct qr_task *task,
                                session_del_tasks(session, task);
                                session_del_waiting(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
-                               req_release(ctx->worker, (struct req *)conn);
+                               iorequest_release(ctx->worker, conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
@@ -1497,7 +1566,7 @@ static int qr_task_step(struct qr_task *task,
                                session_del_tasks(session, task);
                                session_del_waiting(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
-                               req_release(ctx->worker, (struct req *)conn);
+                               iorequest_release(ctx->worker, conn);
                                return qr_task_step(task, NULL, NULL);
                        }
                }
@@ -1582,7 +1651,13 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle,
                }
        } else if (msg) { /* response from upstream */
                task = find_task(session, knot_wire_get_id(msg->wire));
+               if (task == NULL) {
+                       return kr_error(ENOENT);
+               }
+               assert(session->closing == false);
        }
+       fflush(stdout);
+       assert(uv_is_closing(session->handle) == false);
 
        /* Consume input and produce next message */
        return qr_task_step(task, NULL, msg);
@@ -1625,7 +1700,6 @@ static int worker_add_tcp_connected(struct worker_ctx *worker,
        assert(addr);
        const char *key = tcpsess_key(addr);
        assert(key);
-       tcp_connected += 1;
        assert(map_contains(&worker->tcp_connected, key) == 0);
        return map_add_tcp_session(&worker->tcp_connected, addr, session);
 }
@@ -1637,9 +1711,6 @@ static int worker_del_tcp_connected(struct worker_ctx *worker,
        const char *key = tcpsess_key(addr);
        assert(key);
        int ret = map_del_tcp_session(&worker->tcp_connected, addr);
-       if (ret == 0) {
-               tcp_connected -= 1;
-       }
        return ret;
 }
 
@@ -1658,9 +1729,6 @@ static int worker_add_tcp_waiting(struct worker_ctx *worker,
        assert(key);
        assert(map_contains(&worker->tcp_waiting, key) == 0);
        int ret = map_add_tcp_session(&worker->tcp_waiting, addr, session);
-       if (ret == 0) {
-               tcp_waiting += 1;
-       }
        return ret;
 }
 
@@ -1671,9 +1739,6 @@ static int worker_del_tcp_waiting(struct worker_ctx *worker,
        const char *key = tcpsess_key(addr);
        assert(key);
        int ret = map_del_tcp_session(&worker->tcp_waiting, addr);
-       if (ret == 0) {
-               tcp_waiting -= 1;
-       }
        return ret;
 }
 
@@ -1725,6 +1790,7 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
        }
        /* Connection error or forced disconnect */
        struct session *session = handle->data;
+       assert(session);
        if (session->closing) {
                return kr_ok();
        }
@@ -1740,6 +1806,9 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                        struct qr_task *task = session->waiting.at[0];
                        if (session->outgoing) {
                                qr_task_finalize(task, KR_STATE_FAIL);
+                       } else {
+                               assert(task->ctx->source.session == session);
+                               task->ctx->source.session = NULL;
                        }
                        array_del(session->waiting, 0);
                        qr_task_unref(task);
@@ -1749,6 +1818,9 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                        struct qr_task *task = session->tasks.at[0];
                        if (session->outgoing) {
                                qr_task_finalize(task, KR_STATE_FAIL);
+                       } else {
+                               assert(task->ctx->source.session == session);
+                               task->ctx->source.session = NULL;
                        }
                        session_del_tasks(session, task);
                }
@@ -1806,7 +1878,32 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2);
                if (msg_size < KNOT_WIRE_HEADER_SIZE) {
                        /* better kill the connection; we would probably get out of sync */
-                       assert(false);
+                       uv_timer_t *timer = &session->timeout;
+                       uv_timer_stop(timer);
+                       while (session->waiting.len > 0) {
+                               struct qr_task *task = session->waiting.at[0];
+                               if (session->outgoing) {
+                                       qr_task_finalize(task, KR_STATE_FAIL);
+                               } else {
+                                       assert(task->ctx->source.session == session);
+                                       task->ctx->source.session = NULL;
+                               }
+                               array_del(session->waiting, 0);
+                               qr_task_unref(task);
+                               session_del_tasks(session, task);
+                       }
+                       while (session->tasks.len > 0) {
+                               struct qr_task *task = session->tasks.at[0];
+                               if (session->outgoing) {
+                                       qr_task_finalize(task, KR_STATE_FAIL);
+                               } else {
+                                       assert(task->ctx->source.session == session);
+                                       task->ctx->source.session = NULL;
+                               }
+                               session_del_tasks(session, task);
+                       }
+                       session_close(session);
+
                        return kr_error(EILSEQ);
                }
 
@@ -1844,7 +1941,6 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                        if (task) {
                                knot_pkt_clear(task->pktbuf);
                        } else  {
-                               /* TODO: only ignore one message without killing connection */
                                session->buffering = NULL;
                                session->bytes_to_skip = msg_size - 2;
                                ssize_t min_len = MIN(session->bytes_to_skip, len);
@@ -1881,10 +1977,10 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
        ssize_t to_read = MIN(len, task->bytes_remaining);
        if (pkt_buf->size + to_read > pkt_buf->max_size) {
                pkt_buf->size = 0;
+               session->bytes_to_skip = task->bytes_remaining - to_read;
                task->bytes_remaining = 0;
-               /* TODO: only ignore one message without killing connection */
                session->buffering = NULL;
-               return kr_error(EMSGSIZE);
+               return submitted;
        }
        /* Buffer message and check if it's complete */
        memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
@@ -1894,7 +1990,9 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                /* Message was assembled, clear temporary. */
                session->buffering = NULL;
                session->msg_hdr_idx = 0;
-               session_del_tasks(session, task);
+               if (session->outgoing) {
+                       session_del_tasks(session, task);
+               }
                /* Parse the packet and start resolving complete query */
                int ret = parse_packet(pkt_buf);
                if (ret == 0 && !session->outgoing) {
@@ -1920,6 +2018,7 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
                        /* TODO: this is simple via iteration; recursion doesn't really help */
                        ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read);
                        if (ret < 0) {
+                               assert(false);
                                return ret;
                        }
                        submitted += ret;
@@ -1973,10 +2072,12 @@ void worker_session_close(struct session *session)
 static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 {
        array_init(worker->pool_mp);
-       array_init(worker->pool_ioreq);
+       array_init(worker->pool_ioreqs);
+       array_init(worker->pool_iohandles);
        array_init(worker->pool_sessions);
        if (array_reserve(worker->pool_mp, ring_maxlen) ||
-               array_reserve(worker->pool_ioreq, ring_maxlen) ||
+               array_reserve(worker->pool_ioreqs, ring_maxlen) ||
+               array_reserve(worker->pool_iohandles, ring_maxlen) ||
                array_reserve(worker->pool_sessions, ring_maxlen)) {
                return kr_error(ENOMEM);
        }
@@ -1993,7 +2094,7 @@ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 
 #define reclaim_freelist(list, type, cb) \
        for (unsigned i = 0; i < list.len; ++i) { \
-               type *elm = list.at[i]; \
+               void *elm = list.at[i]; \
                kr_asan_unpoison(elm, sizeof(type)); \
                cb(elm); \
        } \
@@ -2002,7 +2103,8 @@ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 void worker_reclaim(struct worker_ctx *worker)
 {
        reclaim_freelist(worker->pool_mp, struct mempool, mp_delete);
-       reclaim_freelist(worker->pool_ioreq, struct req, free);
+       reclaim_freelist(worker->pool_ioreqs, union uv_reqs, free);
+       reclaim_freelist(worker->pool_iohandles, union uv_handles, free);
        reclaim_freelist(worker->pool_sessions, struct session, session_free);
        mp_delete(worker->pkt_pool.ctx);
        worker->pkt_pool.ctx = NULL;
index 97883e76129f642f2128a7b32d8a292931e38978..c19ad477a6d6886db3bc3061fa172e10663fb766 100644 (file)
@@ -27,6 +27,7 @@ struct qr_task;
 struct worker_ctx;
 /** Transport session (opaque). */
 struct session;
+/** Union of various libuv objects for freelist. */
 /** Worker callback */
 typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
 
@@ -80,6 +81,11 @@ void worker_reclaim(struct worker_ctx *worker);
 /** Closes given session */
 void worker_session_close(struct session *session);
 
+void *worker_iohandle_borrow(struct worker_ctx *worker);
+
+void worker_iohandle_release(struct worker_ctx *worker, void *h);
+
+
 
 /** @cond internal */
 
@@ -135,11 +141,31 @@ struct worker_ctx {
        map_t tcp_waiting;
        map_t outgoing;
        mp_freelist_t pool_mp;
-       mp_freelist_t pool_ioreq;
+       mp_freelist_t pool_ioreqs;
        mp_freelist_t pool_sessions;
+       mp_freelist_t pool_iohandles;
        knot_mm_t pkt_pool;
 };
 
+/* @internal Union of derivatives from libuv uv_handle_t for freelist.
+ * These have session as their `handle->data` and own it. */
+union uv_handles {
+       uv_handle_t   handle;
+       uv_stream_t   stream;
+       uv_udp_t      udp;
+       uv_tcp_t      tcp;
+       uv_timer_t    timer;
+};
+
+/* @internal Union of derivatives from uv_req_t libuv request handles for freelist.
+ * These have only a reference to the task they're operating on. */
+union uv_reqs {
+       uv_req_t      req;
+       uv_shutdown_t sdown;
+       uv_write_t    write;
+       uv_connect_t  connect;
+       uv_udp_send_t send;
+};
 
 /** @endcond */