From 308635573afef415cccd191fb07e7403118d4eff Mon Sep 17 00:00:00 2001 From: =?utf8?q?Vladim=C3=ADr=20=C4=8Cun=C3=A1t?= Date: Wed, 19 Sep 2018 18:39:17 +0200 Subject: [PATCH] worker: remove freelists for iohandle and iorequest A quick profiling showed no change in performance, and in particular no change in time spent in malloc/free. Some of the types in the union differed in size by a multiple. If their performance won't be satisfying, replacements should be considered first (e.g. jemalloc) before rolling our own stuff. --- daemon/io.c | 18 +----- daemon/session.c | 5 +- daemon/worker.c | 139 +++++++++++------------------------------------ daemon/worker.h | 30 ---------- 4 files changed, 36 insertions(+), 156 deletions(-) diff --git a/daemon/io.c b/daemon/io.c index 09719f946..5cad0057f 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -34,8 +34,6 @@ } \ } while (0) -void io_release(uv_handle_t *handle); - static void check_bufsize(uv_handle_t* handle) { /* We want to buffer at least N waves in advance. @@ -235,11 +233,10 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) } struct worker_ctx *worker = (struct worker_ctx *)master->loop->data; - uv_stream_t *client = worker_iohandle_borrow(worker); + uv_stream_t *client = malloc(sizeof(uv_tcp_t)); if (!client) { return; } - memset(client, 0, sizeof(*client)); int res = io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM, AF_UNSPEC); if (res) { if (res == UV_EMFILE) { @@ -249,7 +246,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) /* Since res isn't OK struct session wasn't allocated \ borrowed. * We must release client handle only. */ - worker_iohandle_release(worker, client); + free(client); return; } @@ -448,17 +445,6 @@ void io_free(uv_handle_t *handle) free(handle); } -void io_release(uv_handle_t *handle) -{ - if (!handle) { - return; - } - uv_loop_t *loop = handle->loop; - struct worker_ctx *worker = loop->data; - io_deinit(handle); - worker_iohandle_release(worker, handle); -} - int io_start_read(uv_handle_t *handle) { switch (handle->type) { diff --git a/daemon/session.c b/daemon/session.c index e8109fd89..176ab8dd3 100644 --- a/daemon/session.c +++ b/daemon/session.c @@ -35,12 +35,9 @@ struct session { static void on_session_close(uv_handle_t *handle) { - uv_loop_t *loop = handle->loop; - struct worker_ctx *worker = loop->data; struct session *session = handle->data; assert(session->handle == handle); - io_deinit(handle); - worker_iohandle_release(worker, handle); + io_free(handle); } static void on_session_timer_close(uv_handle_t *timer) diff --git a/daemon/worker.c b/daemon/worker.c index 12ee439ad..4fa08d34a 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -116,73 +116,6 @@ static inline struct worker_ctx *get_worker(void) return uv_default_loop()->data; } -static inline void *iohandle_borrow(struct worker_ctx *worker) -{ - void *h = NULL; - - const size_t size = sizeof(uv_handles_t); - if (worker->pool_iohandles.len > 0) { - h = array_tail(worker->pool_iohandles); - array_pop(worker->pool_iohandles); - kr_asan_unpoison(h, size); - } else { - h = malloc(size); - } - - return h; -} - -static inline void iohandle_release(struct worker_ctx *worker, void *h) -{ - assert(h); - - if (worker->pool_iohandles.len < MP_FREELIST_SIZE) { - array_push(worker->pool_iohandles, h); - kr_asan_poison(h, sizeof(uv_handles_t)); - } else { - free(h); - } -} - -void *worker_iohandle_borrow(struct worker_ctx *worker) -{ - return iohandle_borrow(worker); -} - -void worker_iohandle_release(struct worker_ctx *worker, void *h) -{ - iohandle_release(worker, h); -} - -static inline void *iorequest_borrow(struct worker_ctx *worker) -{ - void *r = NULL; - - const size_t size = sizeof(uv_reqs_t); - if (worker->pool_ioreqs.len > 0) { - r = array_tail(worker->pool_ioreqs); - array_pop(worker->pool_ioreqs); - kr_asan_unpoison(r, size); - } else { - r = malloc(size); - } - - return r; -} - -static inline void iorequest_release(struct worker_ctx *worker, void *r) -{ - assert(r); - - if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) { - array_push(worker->pool_ioreqs, r); - kr_asan_poison(r, sizeof(uv_reqs_t)); - } else { - free(r); - } -} - - /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection. * socktype is SOCK_* */ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family) @@ -200,8 +133,8 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t } /* Create connection for iterative query */ struct worker_ctx *worker = task->ctx->worker; - void *h = iohandle_borrow(worker); - uv_handle_t *handle = (uv_handle_t *)h; + uv_handle_t *handle = malloc(socktype == SOCK_DGRAM + ? sizeof(uv_udp_t) : sizeof(uv_tcp_t)); if (!handle) { return NULL; } @@ -211,7 +144,7 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t worker->too_many_open = true; worker->rconcurrent_highwatermark = worker->stats.rconcurrent; } - iohandle_release(worker, h); + free(handle); return NULL; } @@ -241,7 +174,7 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t } if (ret < 0) { io_deinit(handle); - iohandle_release(worker, h); + free(handle); return NULL; } /* Connect or issue query datagram */ @@ -678,24 +611,20 @@ static void on_send(uv_udp_send_t *req, int status) { uv_handle_t *handle = (uv_handle_t *)(req->handle); uv_loop_t *loop = handle->loop; - struct worker_ctx *worker = loop->data; - assert(worker == get_worker()); struct qr_task *task = req->data; qr_task_on_send(task, handle, status); qr_task_unref(task); - iorequest_release(worker, req); + free(req); } - +// TODO: unify these two static void on_task_write(uv_write_t *req, int status) { uv_handle_t *handle = (uv_handle_t *)(req->handle); uv_loop_t *loop = handle->loop; - struct worker_ctx *worker = loop->data; - assert(worker == get_worker()); struct qr_task *task = req->data; qr_task_on_send(task, handle, status); qr_task_unref(task); - iorequest_release(worker, req); + free(req); } static int qr_task_send(struct qr_task *task, uv_handle_t *handle, @@ -707,12 +636,11 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, int ret = 0; struct request_ctx *ctx = task->ctx; - struct worker_ctx *worker = ctx->worker; struct kr_request *req = &ctx->req; - void *ioreq = iorequest_borrow(worker); - if (!ioreq) { - return qr_task_on_send(task, handle, kr_error(ENOMEM)); - } + + const bool is_stream = handle->type == UV_TCP; + if (!is_stream && handle->type != UV_UDP) abort(); + if (knot_wire_get_qr(pkt->wire) == 0) { /* * Query must be finalised using destination address before @@ -727,14 +655,18 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, * trying to obtain the IP address from it. */ ret = kr_resolve_checkout(req, NULL, addr, - handle->type == UV_UDP ? SOCK_DGRAM : SOCK_STREAM, + is_stream ? SOCK_STREAM : SOCK_DGRAM, pkt); if (ret != 0) { - iorequest_release(worker, ioreq); return ret; } } + uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t)); + if (!ioreq) { + return qr_task_on_send(task, handle, kr_error(ENOMEM)); + } + /* Pending ioreq on current task */ qr_task_ref(task); @@ -763,6 +695,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, assert(false); } + struct worker_ctx *worker = ctx->worker; if (ret == 0) { if (worker->too_many_open && worker->stats.rconcurrent < @@ -770,7 +703,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, worker->too_many_open = false; } } else { - iorequest_release(worker, ioreq); + free(ioreq); qr_task_unref(task); if (ret == UV_EMFILE) { worker->too_many_open = true; @@ -895,14 +828,14 @@ static void on_connect(uv_connect_t *req, int status) if (status == UV_ECANCELED) { worker_del_tcp_waiting(worker, peer); assert(session_is_empty(session) && session_flags(session)->closing); - iorequest_release(worker, req); + free(req); return; } if (session_flags(session)->closing) { worker_del_tcp_waiting(worker, peer); assert(session_is_empty(session)); - iorequest_release(worker, req); + free(req); return; } @@ -913,7 +846,7 @@ static void on_connect(uv_connect_t *req, int status) worker_del_tcp_waiting(worker, peer); session_waitinglist_retry(session, false); assert(session_tasklist_is_empty(session)); - iorequest_release(worker, req); + free(req); session_close(session); return; } @@ -926,7 +859,7 @@ static void on_connect(uv_connect_t *req, int status) * something gone wrong */ session_waitinglist_finalize(session, KR_STATE_FAIL); assert(session_tasklist_is_empty(session)); - iorequest_release(worker, req); + free(req); session_close(session); return; } @@ -948,7 +881,7 @@ static void on_connect(uv_connect_t *req, int status) struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session); ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb); if (ret == kr_error(EAGAIN)) { - iorequest_release(worker, req); + free(req); session_start_read(session); session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY); @@ -963,14 +896,14 @@ static void on_connect(uv_connect_t *req, int status) if (ret == kr_ok()) { session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY); - iorequest_release(worker, req); + free(req); return; } } session_waitinglist_finalize(session, KR_STATE_FAIL); assert(session_tasklist_is_empty(session)); - iorequest_release(worker, req); + free(req); session_close(session); } @@ -1409,14 +1342,14 @@ static int qr_task_step(struct qr_task *task, task->pending_count += 1; } else { /* Make connection */ - uv_connect_t *conn = (uv_connect_t *)iorequest_borrow(ctx->worker); + uv_connect_t *conn = malloc(sizeof(uv_connect_t)); if (!conn) { return qr_task_step(task, NULL, NULL); } uv_handle_t *client = ioreq_spawn(task, sock_type, addr->sa_family); if (!client) { - iorequest_release(ctx->worker, conn); + free(conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1424,7 +1357,7 @@ static int qr_task_step(struct qr_task *task, ret = worker_add_tcp_waiting(ctx->worker, addr, session); if (ret < 0) { session_tasklist_del(session, task); - iorequest_release(ctx->worker, conn); + free(conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1433,7 +1366,7 @@ static int qr_task_step(struct qr_task *task, if (ret < 0) { session_tasklist_del(session, task); worker_del_tcp_waiting(ctx->worker, addr); - iorequest_release(ctx->worker, conn); + free(conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1450,7 +1383,7 @@ static int qr_task_step(struct qr_task *task, session_tasklist_del(session, task); session_waitinglist_del(session, task); worker_del_tcp_waiting(ctx->worker, addr); - iorequest_release(ctx->worker, conn); + free(conn); subreq_finalize(task, packet_source, packet); return qr_task_step(task, NULL, NULL); } @@ -1469,7 +1402,7 @@ static int qr_task_step(struct qr_task *task, session_tasklist_del(session, task); session_waitinglist_del(session, task); worker_del_tcp_waiting(ctx->worker, addr); - iorequest_release(ctx->worker, conn); + free(conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1488,7 +1421,7 @@ static int qr_task_step(struct qr_task *task, session_tasklist_del(session, task); session_waitinglist_del(session, task); worker_del_tcp_waiting(ctx->worker, addr); - iorequest_release(ctx->worker, conn); + free(conn); subreq_finalize(task, packet_source, packet); return qr_task_step(task, NULL, NULL); } @@ -1882,12 +1815,8 @@ void worker_request_set_source_session(struct request_ctx *ctx, struct session * static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) { array_init(worker->pool_mp); - array_init(worker->pool_ioreqs); - array_init(worker->pool_iohandles); array_init(worker->pool_sessions); if (array_reserve(worker->pool_mp, ring_maxlen) || - array_reserve(worker->pool_ioreqs, ring_maxlen) || - array_reserve(worker->pool_iohandles, ring_maxlen) || array_reserve(worker->pool_sessions, ring_maxlen)) { return kr_error(ENOMEM); } @@ -1921,8 +1850,6 @@ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) void worker_reclaim(struct worker_ctx *worker) { reclaim_freelist(worker->pool_mp, struct mempool, mp_delete); - reclaim_freelist(worker->pool_ioreqs, uv_reqs_t, free); - reclaim_freelist(worker->pool_iohandles, uv_handles_t, free); reclaim_freelist_custom(worker->pool_sessions, session, session_free); mp_delete(worker->pkt_pool.ctx); worker->pkt_pool.ctx = NULL; diff --git a/daemon/worker.h b/daemon/worker.h index eea4c595f..733f62cde 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -74,10 +74,6 @@ struct session *worker_session_borrow(struct worker_ctx *worker); void worker_session_release(struct worker_ctx *worker, uv_handle_t *handle); -void *worker_iohandle_borrow(struct worker_ctx *worker); - -void worker_iohandle_release(struct worker_ctx *worker, void *h); - int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet); @@ -163,35 +159,9 @@ struct worker_ctx { /** Subrequest leaders (struct qr_task*), indexed by qname+qtype+qclass. */ trie_t *subreq_out; mp_freelist_t pool_mp; - mp_freelist_t pool_ioreqs; mp_freelist_t pool_sessions; - mp_freelist_t pool_iohandles; knot_mm_t pkt_pool; }; -/* @internal Union of some libuv handles for freelist. - * These have session as their `handle->data` and own it. - * Subset of uv_any_handle. */ -union uv_handles { - uv_handle_t handle; - uv_stream_t stream; - uv_udp_t udp; - uv_tcp_t tcp; - uv_timer_t timer; -}; -typedef union uv_any_handle uv_handles_t; - -/* @internal Union of derivatives from uv_req_t libuv request handles for freelist. - * These have only a reference to the task they're operating on. - * Subset of uv_any_req. */ -union uv_reqs { - uv_req_t req; - uv_shutdown_t sdown; - uv_write_t write; - uv_connect_t connect; - uv_udp_send_t send; -}; -typedef union uv_reqs uv_reqs_t; - /** @endcond */ -- 2.47.2