From: Grigorii Demidov Date: Tue, 14 Nov 2017 12:03:06 +0000 (+0100) Subject: daemon: improved reliability under heavy load; bugfixing & minor refactoring X-Git-Tag: v2.0.0~43^2~24 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c55cbb028c9250693db07160687972e096328389;p=thirdparty%2Fknot-resolver.git daemon: improved reliability under heavy load; bugfixing & minor refactoring --- diff --git a/daemon/io.c b/daemon/io.c index 112b19611..18232aea3 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -33,6 +33,8 @@ } \ } while (0) +void io_release(uv_handle_t *handle); + static void check_bufsize(uv_handle_t* handle) { /* We want to buffer at least N waves in advance. @@ -108,7 +110,7 @@ static void session_release(struct worker_ctx *worker, uv_handle_t *handle) static uv_stream_t *handle_alloc(uv_loop_t *loop) { - uv_stream_t *handle = calloc(1, sizeof(*handle)); + uv_stream_t *handle = calloc(1, sizeof(union uv_handles)); if (!handle) { return NULL; } @@ -116,6 +118,17 @@ static uv_stream_t *handle_alloc(uv_loop_t *loop) return handle; } +static uv_stream_t *handle_borrow(uv_loop_t *loop) +{ + struct worker_ctx *worker = loop->data; + void *req = worker_iohandle_borrow(worker); + if (!req) { + return NULL; + } + + return (uv_stream_t *)req; +} + static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { /* Worker has single buffer which is reused for all incoming @@ -144,6 +157,10 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, { uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; + struct session *s = handle->data; + if (s->closing) { + return; + } if (nread <= 0) { if (nread < 0) { /* Error response, notify resolver */ worker_submit(worker, (uv_handle_t *)handle, NULL, addr); @@ -165,6 +182,7 @@ static int udp_bind_finalize(uv_handle_t *handle) /* Handle is already created, just create context. */ struct session *session = session_new(); assert(session); + session->outgoing = false; session->handle = handle; handle->data = session; return io_start_read((uv_handle_t *)handle); @@ -249,14 +267,14 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) if (status != 0) { return; } - uv_stream_t *client = handle_alloc(master->loop); + uv_stream_t *client = handle_borrow(master->loop); if (!client) { return; } memset(client, 0, sizeof(*client)); io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM); if (uv_accept(master, client) != 0) { - uv_close((uv_handle_t *)client, io_free); + uv_close((uv_handle_t *)client, io_release); return; } @@ -264,13 +282,12 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) * It will re-check every half of a request time limit if the connection * is idle and should be terminated, this is an educated guess. */ struct session *session = client->data; + assert(session->outgoing == false); session->has_tls = tls; if (tls && !session->tls_ctx) { session->tls_ctx = tls_new(master->loop->data); } uv_timer_t *timer = &session->timeout; - uv_timer_init(master->loop, timer); - timer->data = session; uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2); io_start_read((uv_handle_t *)client); } @@ -376,13 +393,14 @@ int tcp_bindfd_tls(uv_tcp_t *handle, int fd) void io_create(uv_loop_t *loop, uv_handle_t *handle, int type) { + int ret = -1; if (type == SOCK_DGRAM) { - uv_udp_init(loop, (uv_udp_t *)handle); - } else { - uv_tcp_init(loop, (uv_tcp_t *)handle); + ret = uv_udp_init(loop, (uv_udp_t *)handle); + } else if (type == SOCK_STREAM) { + ret = uv_tcp_init(loop, (uv_tcp_t *)handle); uv_tcp_nodelay((uv_tcp_t *)handle, 1); } - + assert(ret == 0); struct worker_ctx *worker = loop->data; struct session *session = session_borrow(worker); assert(session); @@ -417,13 +435,25 @@ void io_free(uv_handle_t *handle) free(handle); } +void io_release(uv_handle_t *handle) +{ + if (!handle) { + return; + } + uv_loop_t *loop = handle->loop; + struct worker_ctx *worker = loop->data; + io_deinit(handle); + worker_iohandle_release(worker, handle); +} + int io_start_read(uv_handle_t *handle) { if (handle->type == UV_UDP) { return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv); - } else { + } else if (handle->type == UV_TCP) { return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv); } + assert(false); } int io_stop_read(uv_handle_t *handle) diff --git a/daemon/network.c b/daemon/network.c index e181cae70..040d4ed9c 100644 --- a/daemon/network.c +++ b/daemon/network.c @@ -140,7 +140,7 @@ static int open_endpoint(struct network *net, struct endpoint *ep, struct sockad { int ret = 0; if (flags & NET_UDP) { - ep->udp = malloc(sizeof(*ep->udp)); + ep->udp = malloc(sizeof(union uv_handles)); if (!ep->udp) { return kr_error(ENOMEM); } @@ -153,7 +153,7 @@ static int open_endpoint(struct network *net, struct endpoint *ep, struct sockad ep->flags |= NET_UDP; } if (flags & NET_TCP) { - ep->tcp = malloc(sizeof(*ep->tcp)); + ep->tcp = malloc(sizeof(union uv_handles)); if (!ep->tcp) { return kr_error(ENOMEM); } @@ -185,7 +185,7 @@ static int open_endpoint_fd(struct network *net, struct endpoint *ep, int fd, in if (ep->udp) { return kr_error(EEXIST); } - ep->udp = malloc(sizeof(*ep->udp)); + ep->udp = malloc(sizeof(union uv_handles));// malloc(sizeof(*ep->udp)); if (!ep->udp) { return kr_error(ENOMEM); } @@ -197,12 +197,11 @@ static int open_endpoint_fd(struct network *net, struct endpoint *ep, int fd, in } ep->flags |= NET_UDP; return kr_ok(); - } - if (sock_type == SOCK_STREAM) { + } else if (sock_type == SOCK_STREAM) { if (ep->tcp) { return kr_error(EEXIST); } - ep->tcp = malloc(sizeof(*ep->tcp)); + ep->tcp = malloc(sizeof(union uv_handles)); if (!ep->tcp) { return kr_error(ENOMEM); } diff --git a/daemon/worker.c b/daemon/worker.c index a7c96f739..603be8e73 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -76,25 +76,6 @@ struct qr_task }; -int32_t tcp_connected = 0; -int32_t tcp_waiting = 0; - -/* @internal Union of various libuv objects for freelist. */ -struct req -{ - union { - /* Socket handles, these have session as their `handle->data` and own it. */ - uv_udp_t udp; - uv_tcp_t tcp; - /* I/O events, these have only a reference to the task they're operating on. */ - uv_udp_send_t send; - uv_write_t write; - uv_connect_t connect; - /* Timer events */ - uv_timer_t timer; - } as; -}; - /* Convenience macros */ #define qr_task_ref(task) \ do { ++(task)->refs; } while(0) @@ -148,29 +129,75 @@ static inline struct worker_ctx *get_worker(void) return uv_default_loop()->data; } -static inline struct req *req_borrow(struct worker_ctx *worker) +static inline void *iohandle_borrow(struct worker_ctx *worker) +{ + void *h = NULL; + + const size_t size = sizeof(union uv_handles); + if (worker->pool_iohandles.len > 0) { + h = array_tail(worker->pool_iohandles); + array_pop(worker->pool_iohandles); + kr_asan_unpoison(h, size); + } else { + h = malloc(size); + } + + return h; +} + +static inline void iohandle_release(struct worker_ctx *worker, void *h) { - struct req *req = NULL; - if (worker->pool_ioreq.len > 0) { - req = array_tail(worker->pool_ioreq); - array_pop(worker->pool_ioreq); - kr_asan_unpoison(req, sizeof(*req)); + assert(h); + + const size_t size = sizeof(union uv_handles); + if (worker->pool_iohandles.len < MP_FREELIST_SIZE) { + array_push(worker->pool_iohandles, h); + kr_asan_poison(h, size); } else { - req = malloc(sizeof(*req)); + free(h); } - return req; } -static inline void req_release(struct worker_ctx *worker, struct req *req) +void *worker_iohandle_borrow(struct worker_ctx *worker) { - if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) { - array_push(worker->pool_ioreq, req); - kr_asan_poison(req, sizeof(*req)); + return iohandle_borrow(worker); +} + +void worker_iohandle_release(struct worker_ctx *worker, void *h) +{ + iohandle_release(worker, h); +} + +static inline void *iorequest_borrow(struct worker_ctx *worker) +{ + void *r = NULL; + + const size_t size = sizeof(union uv_reqs); + if (worker->pool_ioreqs.len > 0) { + r = array_tail(worker->pool_ioreqs); + array_pop(worker->pool_ioreqs); + kr_asan_unpoison(r, size); } else { - free(req); + r = malloc(size); } + + return r; } +static inline void iorequest_release(struct worker_ctx *worker, void *r) +{ + assert(r); + + const size_t size = sizeof(union uv_reqs); + if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) { + array_push(worker->pool_ioreqs, r); + kr_asan_poison(r, size); + } else { + free(r); + } +} + + /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection. * socktype is SOCK_* */ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family) @@ -188,7 +215,8 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t } /* Create connection for iterative query */ struct worker_ctx *worker = task->ctx->worker; - uv_handle_t *handle = (uv_handle_t *)req_borrow(worker); + void *h = iohandle_borrow(worker); + uv_handle_t *handle = (uv_handle_t *)h; if (!handle) { return NULL; } @@ -205,9 +233,11 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t if (addr->ip.sa_family != AF_UNSPEC) { assert(addr->ip.sa_family == family); if (socktype == SOCK_DGRAM) { - ret = uv_udp_bind((uv_udp_t *)handle, &addr->ip, 0); - } else { - ret = uv_tcp_bind((uv_tcp_t *)handle, &addr->ip, 0); + uv_udp_t *udp = (uv_udp_t *)handle; + ret = uv_udp_bind(udp, &addr->ip, 0); + } else if (socktype == SOCK_STREAM){ + uv_tcp_t *tcp = (uv_tcp_t *)handle; + ret = uv_tcp_bind(tcp, &addr->ip, 0); } } @@ -219,7 +249,7 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t } if (ret < 0) { io_deinit(handle); - req_release(worker, (struct req *)handle); + iohandle_release(worker, h); return NULL; } /* Connect or issue query datagram */ @@ -230,27 +260,21 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t static void on_session_close(uv_handle_t *handle) { - struct worker_ctx *worker = get_worker(); + uv_loop_t *loop = handle->loop; + struct worker_ctx *worker = loop->data; struct session *session = handle->data; - if (!session->outgoing) { - assert(session->handle->type == UV_TCP); - } - bool free_handle = false; - if (!session->outgoing && session->handle->type == UV_TCP) { - free_handle = true; - } + assert(session->handle == handle); + session->handle = NULL; io_deinit(handle); - if (free_handle) { - free(handle); - } else { - req_release(worker, (struct req *)handle); - } + iohandle_release(worker, handle); } static void on_session_timer_close(uv_handle_t *timer) { struct session *session = timer->data; uv_handle_t *handle = session->handle; + assert(handle && handle->data == session); + assert (session->outgoing || handle->type == UV_TCP); if (!uv_is_closing(handle)) { uv_close(handle, on_session_close); } @@ -331,6 +355,8 @@ static void session_close(struct session *session) session->buffering = NULL; } + uv_handle_t *handle = session->handle; + io_stop_read(handle); session->closing = true; if (session->outgoing && session->peer.ip.sa_family != AF_UNSPEC) { @@ -488,7 +514,11 @@ static struct request_ctx *request_create(struct worker_ctx *worker, /* TODO Relocate pool to struct request */ ctx->worker = worker; array_init(ctx->tasks); - ctx->source.session = handle ? handle->data : NULL; + struct session *session = handle ? handle->data : NULL; + if (session) { + assert(session->outgoing == false); + } + ctx->source.session = session; struct kr_request *req = &ctx->req; req->pool = pool; @@ -672,12 +702,14 @@ static void qr_task_free(struct qr_task *task) /* Process source session. */ if (source_session) { /* Walk the session task list and remove itself. */ + assert(source_session->outgoing == false); session_del_tasks(source_session, task); /* Start reading again if the session is throttled and * the number of outgoing requests is below watermark. */ uv_handle_t *handle = source_session->handle; if (handle && source_session->tasks.len < worker->tcp_pipeline_max/2) { if (!uv_is_closing(handle) && source_session->throttled) { + assert(source_session->closing == false); io_start_read(handle); source_session->throttled = false; } @@ -696,7 +728,7 @@ static void qr_task_free(struct qr_task *task) /*@ Register new qr_task within session. */ static int qr_task_register(struct qr_task *task, struct session *session) { - assert(session->outgoing == false); + assert(session->outgoing == false && session->handle->type == UV_TCP); int ret = array_reserve(session->tasks, session->tasks.len + 1); if (ret != 0) { @@ -743,6 +775,7 @@ static void qr_task_complete(struct qr_task *task) /* This is called when we send subrequest / answer */ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status) { + assert(handle == NULL || uv_is_closing(handle) == false); if (task->finished) { assert(task->leading == false); qr_task_complete(task); @@ -767,46 +800,68 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status int ret = qr_task_send(t, (uv_handle_t *)handle, &session->peer.ip, t->pktbuf); if (ret != kr_ok()) { + uv_timer_t *timer = &session->timeout; + uv_timer_stop(timer); while (session->waiting.len > 0) { struct qr_task *task = session->waiting.at[0]; + if (session->outgoing) { + qr_task_finalize(task, KR_STATE_FAIL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; + } array_del(session->waiting, 0); - qr_task_finalize(task, KR_STATE_FAIL); + qr_task_unref(task); + session_del_tasks(session, task); } while (session->tasks.len > 0) { struct qr_task *task = session->tasks.at[0]; - array_del(session->tasks, 0); - qr_task_finalize(task, KR_STATE_FAIL); + if (session->outgoing) { + qr_task_finalize(task, KR_STATE_FAIL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; + } + session_del_tasks(session, task); } session_close(session); return status; } } } - io_start_read(handle); /* Start reading new query */ + if (!uv_is_closing(handle)) { + io_start_read(handle); /* Start reading new query */ + } } return status; } static void on_send(uv_udp_send_t *req, int status) { - struct worker_ctx *worker = get_worker(); + uv_handle_t *handle = (uv_handle_t *)(req->handle); + uv_loop_t *loop = handle->loop; + struct worker_ctx *worker = loop->data; + assert(worker == get_worker()); struct qr_task *task = req->data; - if (qr_valid_handle(task, (uv_handle_t *)req->handle)) { - qr_task_on_send(task, (uv_handle_t *)req->handle, status); + if (qr_valid_handle(task, handle)) { + qr_task_on_send(task, handle, status); } qr_task_unref(task); - req_release(worker, (struct req *)req); + iorequest_release(worker, req); } static void on_write(uv_write_t *req, int status) { - struct worker_ctx *worker = get_worker(); + uv_handle_t *handle = (uv_handle_t *)(req->handle); + uv_loop_t *loop = handle->loop; + struct worker_ctx *worker = loop->data; + assert(worker == get_worker()); struct qr_task *task = req->data; - if (qr_valid_handle(task, (uv_handle_t *)req->handle)) { - qr_task_on_send(task, (uv_handle_t *)req->handle, status); + if (qr_valid_handle(task, handle)) { + qr_task_on_send(task, handle, status); } qr_task_unref(task); - req_release(worker, (struct req *)req); + iorequest_release(worker, req); } static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt) @@ -817,6 +872,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad /* Synchronous push to TLS context, bypassing event loop. */ struct session *session = handle->data; + assert(session->closing == false); if (session->has_tls) { struct kr_request *req = &task->ctx->req; int ret = kr_ok(); @@ -837,8 +893,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad struct request_ctx *ctx = task->ctx; struct worker_ctx *worker = ctx->worker; struct kr_request *req = &ctx->req; - struct req *send_req = req_borrow(worker); - if (!send_req) { + void *ioreq = iorequest_borrow(worker); + if (!ioreq) { return qr_task_on_send(task, handle, kr_error(ENOMEM)); } if (knot_wire_get_qr(pkt->wire) == 0) { @@ -858,33 +914,38 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad handle->type == UV_UDP ? SOCK_DGRAM : SOCK_STREAM, pkt); if (ret != kr_ok()) { - req_release(worker, send_req); + iorequest_release(worker, ioreq); return ret; } } /* Send using given protocol */ if (handle->type == UV_UDP) { + uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq; uv_buf_t buf = { (char *)pkt->wire, pkt->size }; - send_req->as.send.data = task; - ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send); - } else { + send_req->data = task; + ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send); + } else if (handle->type == UV_TCP) { + uv_write_t *write_req = (uv_write_t *)ioreq; uint16_t pkt_size = htons(pkt->size); uv_buf_t buf[2] = { { (char *)&pkt_size, sizeof(pkt_size) }, { (char *)pkt->wire, pkt->size } }; - send_req->as.write.data = task; - ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write); + write_req->data = task; + ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write); + } else { + assert(false); } + if (ret == 0) { qr_task_ref(task); /* Pending ioreq on current task */ if (worker->too_many_open && worker->stats.rconcurrent < - worker->rconcurrent_highwatermark - (worker->rconcurrent_highwatermark / 4)) { + worker->rconcurrent_highwatermark - 10) { worker->too_many_open = false; } } else { - req_release(worker, send_req); + iorequest_release(worker, ioreq); if (ret == UV_EMFILE) { worker->too_many_open = true; worker->rconcurrent_highwatermark = worker->stats.rconcurrent; @@ -950,14 +1011,14 @@ static void on_connect(uv_connect_t *req, int status) if (status == UV_ECANCELED) { worker_del_tcp_waiting(worker, &peer->ip); assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0); - req_release(worker, (struct req *)req); + iorequest_release(worker, req); return; } if (session->closing) { worker_del_tcp_waiting(worker, &peer->ip); assert(session->waiting.len == 0 && session->tasks.len == 0); - req_release(worker, (struct req *)req); + iorequest_release(worker, req); return; } @@ -972,7 +1033,7 @@ static void on_connect(uv_connect_t *req, int status) qr_task_unref(task); } assert(session->tasks.len == 0); - req_release(worker, (struct req *)req); + iorequest_release(worker, req); session_close(session); return; } @@ -986,7 +1047,7 @@ static void on_connect(uv_connect_t *req, int status) qr_task_unref(task); } assert(session->tasks.len == 0); - req_release(worker, (struct req *)req); + iorequest_release(worker, req); session_close(session); return; } @@ -999,7 +1060,7 @@ static void on_connect(uv_connect_t *req, int status) ret = tls_client_connect_start(session->tls_client_ctx, session, session_tls_hs_cb); if (ret == kr_error(EAGAIN)) { - req_release(worker, (struct req *)req); + iorequest_release(worker, req); io_start_read(session->handle); return; } @@ -1009,7 +1070,7 @@ static void on_connect(uv_connect_t *req, int status) ret = session_next_waiting_send(session); if (ret == kr_ok()) { worker_add_tcp_connected(worker, &session->peer.ip, session); - req_release(worker, (struct req *)req); + iorequest_release(worker, req); return; } } @@ -1024,7 +1085,7 @@ static void on_connect(uv_connect_t *req, int status) assert(session->tasks.len == 0); - req_release(worker, (struct req *)req); + iorequest_release(worker, req); session_close(session); } @@ -1045,7 +1106,6 @@ static void on_tcp_connect_timeout(uv_timer_t *timer) session_del_tasks(session, task); array_del(session->waiting, 0); qr_task_unref(task); - assert(task->refs == 1); qr_task_finalize(task, KR_STATE_FAIL); } @@ -1090,8 +1150,11 @@ static void on_tcp_watchdog_timeout(uv_timer_t *timer) static void on_udp_timeout(uv_timer_t *timer) { struct session *session = timer->data; - uv_timer_stop(timer); + uv_handle_t *handle = session->handle; + assert(handle->data == session); + + uv_timer_stop(timer); assert(session->tasks.len == 1); assert(session->waiting.len == 0); @@ -1247,7 +1310,12 @@ static int qr_task_finalize(struct qr_task *task, int state) task->finished = true; /* Send back answer */ if (ctx->source.session != NULL) { - (void) qr_task_send(task, ctx->source.session->handle, + uv_handle_t *handle = ctx->source.session->handle; + assert(ctx->source.session->closing == false); + assert(handle->data == ctx->source.session); + assert(!uv_is_closing(handle)); + assert(ctx->source.addr.ip.sa_family != AF_UNSPEC); + (void) qr_task_send(task, handle, (struct sockaddr *)&ctx->source.addr, ctx->req.answer); } else { @@ -1264,11 +1332,11 @@ static int qr_task_step(struct qr_task *task, return kr_error(ESTALE); } - /* Close pending I/O requests */ subreq_finalize(task, packet_source, packet); /* Consume input and produce next query */ struct request_ctx *ctx = task->ctx; + assert(ctx); struct kr_request *req = &ctx->req; struct worker_ctx *worker = ctx->worker; int sock_type = -1; @@ -1280,7 +1348,7 @@ static int qr_task_step(struct qr_task *task, if (worker->too_many_open) { struct kr_rplan *rplan = &req->rplan; if (worker->stats.rconcurrent < - worker->rconcurrent_highwatermark - (worker->rconcurrent_highwatermark / 4)) { + worker->rconcurrent_highwatermark - 10) { worker->too_many_open = false; } else if (packet && kr_rplan_empty(rplan)) { /* new query; TODO - make this detection more obvious */ @@ -1358,6 +1426,7 @@ static int qr_task_step(struct qr_task *task, } struct session* session = NULL; if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) { + assert(session->outgoing); if (session->closing) { subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); @@ -1429,14 +1498,14 @@ static int qr_task_step(struct qr_task *task, task->pending_count += 1; } else { /* Make connection */ - uv_connect_t *conn = (uv_connect_t *)req_borrow(ctx->worker); + uv_connect_t *conn = (uv_connect_t *)iorequest_borrow(ctx->worker); if (!conn) { return qr_task_step(task, NULL, NULL); } uv_handle_t *client = ioreq_spawn(task, sock_type, addr->sa_family); if (!client) { - req_release(ctx->worker, (struct req *)conn); + iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1444,7 +1513,7 @@ static int qr_task_step(struct qr_task *task, ret = worker_add_tcp_waiting(ctx->worker, addr, session); if (ret < 0) { session_del_tasks(session, task); - req_release(ctx->worker, (struct req *)conn); + iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1453,7 +1522,7 @@ static int qr_task_step(struct qr_task *task, if (ret < 0) { session_del_tasks(session, task); worker_del_tcp_waiting(ctx->worker, addr); - req_release(ctx->worker, (struct req *)conn); + iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1470,7 +1539,7 @@ static int qr_task_step(struct qr_task *task, session_del_tasks(session, task); session_del_waiting(session, task); worker_del_tcp_waiting(ctx->worker, addr); - req_release(ctx->worker, (struct req *)conn); + iorequest_release(ctx->worker, conn); return qr_task_step(task, NULL, NULL); } tls_client_ctx_set_params(tls_ctx, entry); @@ -1487,7 +1556,7 @@ static int qr_task_step(struct qr_task *task, session_del_tasks(session, task); session_del_waiting(session, task); worker_del_tcp_waiting(ctx->worker, addr); - req_release(ctx->worker, (struct req *)conn); + iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1497,7 +1566,7 @@ static int qr_task_step(struct qr_task *task, session_del_tasks(session, task); session_del_waiting(session, task); worker_del_tcp_waiting(ctx->worker, addr); - req_release(ctx->worker, (struct req *)conn); + iorequest_release(ctx->worker, conn); return qr_task_step(task, NULL, NULL); } } @@ -1582,7 +1651,13 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, } } else if (msg) { /* response from upstream */ task = find_task(session, knot_wire_get_id(msg->wire)); + if (task == NULL) { + return kr_error(ENOENT); + } + assert(session->closing == false); } + fflush(stdout); + assert(uv_is_closing(session->handle) == false); /* Consume input and produce next message */ return qr_task_step(task, NULL, msg); @@ -1625,7 +1700,6 @@ static int worker_add_tcp_connected(struct worker_ctx *worker, assert(addr); const char *key = tcpsess_key(addr); assert(key); - tcp_connected += 1; assert(map_contains(&worker->tcp_connected, key) == 0); return map_add_tcp_session(&worker->tcp_connected, addr, session); } @@ -1637,9 +1711,6 @@ static int worker_del_tcp_connected(struct worker_ctx *worker, const char *key = tcpsess_key(addr); assert(key); int ret = map_del_tcp_session(&worker->tcp_connected, addr); - if (ret == 0) { - tcp_connected -= 1; - } return ret; } @@ -1658,9 +1729,6 @@ static int worker_add_tcp_waiting(struct worker_ctx *worker, assert(key); assert(map_contains(&worker->tcp_waiting, key) == 0); int ret = map_add_tcp_session(&worker->tcp_waiting, addr, session); - if (ret == 0) { - tcp_waiting += 1; - } return ret; } @@ -1671,9 +1739,6 @@ static int worker_del_tcp_waiting(struct worker_ctx *worker, const char *key = tcpsess_key(addr); assert(key); int ret = map_del_tcp_session(&worker->tcp_waiting, addr); - if (ret == 0) { - tcp_waiting -= 1; - } return ret; } @@ -1725,6 +1790,7 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, } /* Connection error or forced disconnect */ struct session *session = handle->data; + assert(session); if (session->closing) { return kr_ok(); } @@ -1740,6 +1806,9 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, struct qr_task *task = session->waiting.at[0]; if (session->outgoing) { qr_task_finalize(task, KR_STATE_FAIL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; } array_del(session->waiting, 0); qr_task_unref(task); @@ -1749,6 +1818,9 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, struct qr_task *task = session->tasks.at[0]; if (session->outgoing) { qr_task_finalize(task, KR_STATE_FAIL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; } session_del_tasks(session, task); } @@ -1806,7 +1878,32 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2); if (msg_size < KNOT_WIRE_HEADER_SIZE) { /* better kill the connection; we would probably get out of sync */ - assert(false); + uv_timer_t *timer = &session->timeout; + uv_timer_stop(timer); + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + if (session->outgoing) { + qr_task_finalize(task, KR_STATE_FAIL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; + } + array_del(session->waiting, 0); + qr_task_unref(task); + session_del_tasks(session, task); + } + while (session->tasks.len > 0) { + struct qr_task *task = session->tasks.at[0]; + if (session->outgoing) { + qr_task_finalize(task, KR_STATE_FAIL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; + } + session_del_tasks(session, task); + } + session_close(session); + return kr_error(EILSEQ); } @@ -1844,7 +1941,6 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, if (task) { knot_pkt_clear(task->pktbuf); } else { - /* TODO: only ignore one message without killing connection */ session->buffering = NULL; session->bytes_to_skip = msg_size - 2; ssize_t min_len = MIN(session->bytes_to_skip, len); @@ -1881,10 +1977,10 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, ssize_t to_read = MIN(len, task->bytes_remaining); if (pkt_buf->size + to_read > pkt_buf->max_size) { pkt_buf->size = 0; + session->bytes_to_skip = task->bytes_remaining - to_read; task->bytes_remaining = 0; - /* TODO: only ignore one message without killing connection */ session->buffering = NULL; - return kr_error(EMSGSIZE); + return submitted; } /* Buffer message and check if it's complete */ memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read); @@ -1894,7 +1990,9 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, /* Message was assembled, clear temporary. */ session->buffering = NULL; session->msg_hdr_idx = 0; - session_del_tasks(session, task); + if (session->outgoing) { + session_del_tasks(session, task); + } /* Parse the packet and start resolving complete query */ int ret = parse_packet(pkt_buf); if (ret == 0 && !session->outgoing) { @@ -1920,6 +2018,7 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, /* TODO: this is simple via iteration; recursion doesn't really help */ ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read); if (ret < 0) { + assert(false); return ret; } submitted += ret; @@ -1973,10 +2072,12 @@ void worker_session_close(struct session *session) static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) { array_init(worker->pool_mp); - array_init(worker->pool_ioreq); + array_init(worker->pool_ioreqs); + array_init(worker->pool_iohandles); array_init(worker->pool_sessions); if (array_reserve(worker->pool_mp, ring_maxlen) || - array_reserve(worker->pool_ioreq, ring_maxlen) || + array_reserve(worker->pool_ioreqs, ring_maxlen) || + array_reserve(worker->pool_iohandles, ring_maxlen) || array_reserve(worker->pool_sessions, ring_maxlen)) { return kr_error(ENOMEM); } @@ -1993,7 +2094,7 @@ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) #define reclaim_freelist(list, type, cb) \ for (unsigned i = 0; i < list.len; ++i) { \ - type *elm = list.at[i]; \ + void *elm = list.at[i]; \ kr_asan_unpoison(elm, sizeof(type)); \ cb(elm); \ } \ @@ -2002,7 +2103,8 @@ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) void worker_reclaim(struct worker_ctx *worker) { reclaim_freelist(worker->pool_mp, struct mempool, mp_delete); - reclaim_freelist(worker->pool_ioreq, struct req, free); + reclaim_freelist(worker->pool_ioreqs, union uv_reqs, free); + reclaim_freelist(worker->pool_iohandles, union uv_handles, free); reclaim_freelist(worker->pool_sessions, struct session, session_free); mp_delete(worker->pkt_pool.ctx); worker->pkt_pool.ctx = NULL; diff --git a/daemon/worker.h b/daemon/worker.h index 97883e761..c19ad477a 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -27,6 +27,7 @@ struct qr_task; struct worker_ctx; /** Transport session (opaque). */ struct session; +/** Union of various libuv objects for freelist. */ /** Worker callback */ typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton); @@ -80,6 +81,11 @@ void worker_reclaim(struct worker_ctx *worker); /** Closes given session */ void worker_session_close(struct session *session); +void *worker_iohandle_borrow(struct worker_ctx *worker); + +void worker_iohandle_release(struct worker_ctx *worker, void *h); + + /** @cond internal */ @@ -135,11 +141,31 @@ struct worker_ctx { map_t tcp_waiting; map_t outgoing; mp_freelist_t pool_mp; - mp_freelist_t pool_ioreq; + mp_freelist_t pool_ioreqs; mp_freelist_t pool_sessions; + mp_freelist_t pool_iohandles; knot_mm_t pkt_pool; }; +/* @internal Union of derivatives from libuv uv_handle_t for freelist. + * These have session as their `handle->data` and own it. */ +union uv_handles { + uv_handle_t handle; + uv_stream_t stream; + uv_udp_t udp; + uv_tcp_t tcp; + uv_timer_t timer; +}; + +/* @internal Union of derivatives from uv_req_t libuv request handles for freelist. + * These have only a reference to the task they're operating on. */ +union uv_reqs { + uv_req_t req; + uv_shutdown_t sdown; + uv_write_t write; + uv_connect_t connect; + uv_udp_send_t send; +}; /** @endcond */