From: Oto Šťáva Date: Tue, 31 Jan 2023 08:24:03 +0000 (+0100) Subject: daemon: make closing more session-centric X-Git-Tag: v6.0.2~42^2~29 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b7b75f6aeec6160d1d4aff1141617700f55d4a4e;p=thirdparty%2Fknot-resolver.git daemon: make closing more session-centric Should clear up some memory ownership issues and strange behaviour when closing I/O handles. Also, sessions now count their owned handles, because libUV apparently does not guarantee the order of uv_close, so sometimes the closure of a session handle would cause a use-after-free because the session's timer has not yet been properly closed. --- diff --git a/daemon/io.c b/daemon/io.c index 13dde0ae0..f8a1b7d92 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -516,12 +516,9 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp return; } - uv_tcp_t *client = malloc(sizeof(uv_tcp_t)); - if (!client) { - return; - } - int res = io_create(master->loop, (uv_handle_t *)client, - SOCK_STREAM, AF_UNSPEC, grp, NULL, 0, false); + struct session2 *s; + int res = io_create(master->loop, &s, SOCK_STREAM, AF_UNSPEC, grp, + NULL, 0, false); if (res) { if (res == UV_EMFILE) { the_worker->too_many_open = true; @@ -530,14 +527,12 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp /* Since res isn't OK struct session wasn't allocated \ borrowed. * We must release client handle only. */ - free(client); return; } - /* struct session was allocated \ borrowed from memory pool. */ - struct session2 *s = client->data; kr_require(s->outgoing == false); + uv_tcp_t *client = (uv_tcp_t *)session2_get_handle(s); if (uv_accept(master, (uv_stream_t *)client) != 0) { /* close session, close underlying uv handles and * deallocate (or return to memory pool) memory. */ @@ -820,7 +815,7 @@ void io_tty_alloc(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) buf->base = malloc(suggested); } -struct io_stream_data *io_tty_alloc_data() { +struct io_stream_data *io_tty_alloc_data(void) { knot_mm_t *pool = mm_ctx_mempool2(MM_DEFAULT_BLKSIZE); if (!pool) { return NULL; @@ -1025,18 +1020,29 @@ int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname) } #endif -int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, - enum protolayer_grp grp, - struct protolayer_data_param *layer_param, - size_t layer_param_count, - bool outgoing) +int io_create(uv_loop_t *loop, struct session2 **out_session, int type, + unsigned family, enum protolayer_grp grp, + struct protolayer_data_param *layer_param, + size_t layer_param_count, bool outgoing) { + *out_session = NULL; int ret = -1; + uv_handle_t *handle; if (type == SOCK_DGRAM) { - ret = uv_udp_init(loop, (uv_udp_t *)handle); + uv_udp_t *udp = malloc(sizeof(uv_udp_t)); + kr_require(udp); + ret = uv_udp_init(loop, udp); + + handle = (uv_handle_t *)udp; } else if (type == SOCK_STREAM) { - ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family); - uv_tcp_nodelay((uv_tcp_t *)handle, 1); + uv_tcp_t *tcp = malloc(sizeof(uv_tcp_t)); + kr_require(tcp); + ret = uv_tcp_init_ex(loop, tcp, family); + uv_tcp_nodelay(tcp, 1); + + handle = (uv_handle_t *)tcp; + } else { + kr_require(false && "io_create: invalid socket type"); } if (ret != 0) { return ret; @@ -1046,6 +1052,8 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, if (s == NULL) { ret = -1; } + + *out_session = s; return ret; } @@ -1055,13 +1063,13 @@ static void io_deinit(uv_handle_t *handle) return; } if (handle->type != UV_POLL) { - session2_free(handle->data); + session2_unhandle(handle->data); } else { #if ENABLE_XDP xdp_handle_data_t *xhd = handle->data; uv_idle_stop(&xhd->tx_waker); uv_close((uv_handle_t *)&xhd->tx_waker, NULL); - session2_free(xhd->session); + session2_unhandle(xhd->session); knot_xdp_deinit(xhd->socket); queue_deinit(xhd->tx_waker_queue); free(xhd); diff --git a/daemon/io.h b/daemon/io.h index 6733cd954..305f26cfe 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -44,11 +44,10 @@ void tcp_timeout_trigger(uv_timer_t *timer); * \param type = SOCK_* * \param family = AF_* * \param has_tls has meanings only when type is SOCK_STREAM */ -int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, - unsigned family, enum protolayer_grp grp, - struct protolayer_data_param *layer_param, - size_t layer_param_count, - bool outgoing); +int io_create(uv_loop_t *loop, struct session2 **out_session, int type, + unsigned family, enum protolayer_grp grp, + struct protolayer_data_param *layer_param, + size_t layer_param_count, bool outgoing); void io_free(uv_handle_t *handle); int io_start_read(uv_handle_t *handle); diff --git a/daemon/network.c b/daemon/network.c index 504a2545d..cc170bf97 100644 --- a/daemon/network.c +++ b/daemon/network.c @@ -235,7 +235,8 @@ static void endpoint_close(struct endpoint *ep, bool force) io_free(ep->handle); } } else { /* Asynchronous close */ - uv_close(ep->handle, io_free); + struct session2 *s = ep->handle->data; + session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL); } } diff --git a/daemon/session2.c b/daemon/session2.c index d9b5ba159..38558e704 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -709,13 +709,16 @@ struct session2 *session2_new(enum session2_transport_type transport_type, ret = uv_timer_init(uv_default_loop(), &s->timer); kr_require(!ret); s->timer.data = s; + s->uv_count++; /* Session owns the timer */ session2_touch(s); return s; } -void session2_free(struct session2 *s) +/** De-allocates the session. Must only be called once the underlying IO handle + * and timer are already closed, otherwise may leak resources. */ +static void session2_free(struct session2 *s) { protolayer_manager_free(s->layers); wire_buf_deinit(&s->wire_buf); @@ -725,6 +728,18 @@ void session2_free(struct session2 *s) free(s); } +void session2_unhandle(struct session2 *s) +{ + if (kr_fails_assert(s->uv_count > 0)) { + session2_free(s); + return; + } + + s->uv_count--; + if (s->uv_count <= 0) + session2_free(s); +} + int session2_start_read(struct session2 *session) { if (session->transport.type == SESSION2_TRANSPORT_IO) @@ -1373,6 +1388,11 @@ static void on_session2_handle_close(uv_handle_t *handle) io_free(handle); } +static void on_session2_timer_close(uv_handle_t *handle) +{ + session2_unhandle(handle->data); +} + static int session2_handle_close(struct session2 *s, uv_handle_t *handle) { if (kr_fails_assert(s->transport.type == SESSION2_TRANSPORT_IO @@ -1380,7 +1400,7 @@ static int session2_handle_close(struct session2 *s, uv_handle_t *handle) return kr_error(EINVAL); io_stop_read(handle); - uv_close((uv_handle_t *)&s->timer, NULL); + uv_close((uv_handle_t *)&s->timer, on_session2_timer_close); uv_close(handle, on_session2_handle_close); return kr_ok(); } diff --git a/daemon/session2.h b/daemon/session2.h index 5b63a27c7..12cfa66b6 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -713,6 +713,9 @@ struct session2 { queue_t(struct qr_task *) waiting; /**< List of tasks waiting for * sending to upstream. */ + int uv_count; /**< Number of unclosed libUV handles owned by this + * session. */ + /** Communication information. Typically written into by one of the * first layers facilitating transport protocol processing. * Zero-initialized by default. */ @@ -783,6 +786,7 @@ static inline struct session2 *session2_new_io(uv_handle_t *handle, layer_param, layer_param_count, outgoing); s->transport.io.handle = handle; handle->data = s; + s->uv_count++; /* Session owns the handle */ return s; } @@ -800,9 +804,9 @@ static inline struct session2 *session2_new_child(struct session2 *parent, return s; } -/** De-allocates the session. Must only be called once the underlying IO handle - * and timer are already closed, otherwise may leak resources. */ -void session2_free(struct session2 *s); +/** Used when a libUV handle owned by the session is closed. Once all owned + * handles are closed, the session is freed. */ +void session2_unhandle(struct session2 *s); /** Start reading from the underlying transport. */ int session2_start_read(struct session2 *session); diff --git a/daemon/worker.c b/daemon/worker.c index 2233fc725..532edfc51 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -125,10 +125,10 @@ struct worker_ctx *the_worker = NULL; /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection. * socktype is SOCK_* */ -static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family, - enum protolayer_grp grp, - struct protolayer_data_param *layer_param, - size_t layer_param_count) +static struct session2 *ioreq_spawn(int socktype, sa_family_t family, + enum protolayer_grp grp, + struct protolayer_data_param *layer_param, + size_t layer_param_count) { bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM) && (family == AF_INET || family == AF_INET6); @@ -138,18 +138,14 @@ static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family, } /* Create connection for iterative query */ - uv_handle_t *handle = malloc(socktype == SOCK_DGRAM - ? sizeof(uv_udp_t) : sizeof(uv_tcp_t)); - kr_require(handle); - - int ret = io_create(the_worker->loop, handle, socktype, family, grp, + struct session2 *s; + int ret = io_create(the_worker->loop, &s, socktype, family, grp, layer_param, layer_param_count, true); if (ret) { if (ret == UV_EMFILE) { the_worker->too_many_open = true; the_worker->rconcurrent_highwatermark = the_worker->stats.rconcurrent; } - free(handle); return NULL; } @@ -162,28 +158,25 @@ static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family, } if (addr->ip.sa_family != AF_UNSPEC) { if (kr_fails_assert(addr->ip.sa_family == family)) { - io_free(handle); + session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL); return NULL; } if (socktype == SOCK_DGRAM) { - uv_udp_t *udp = (uv_udp_t *)handle; + uv_udp_t *udp = (uv_udp_t *)session2_get_handle(s); ret = uv_udp_bind(udp, &addr->ip, 0); } else if (socktype == SOCK_STREAM){ - uv_tcp_t *tcp = (uv_tcp_t *)handle; + uv_tcp_t *tcp = (uv_tcp_t *)session2_get_handle(s); ret = uv_tcp_bind(tcp, &addr->ip, 0); } } if (ret != 0) { - io_free(handle); + session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL); return NULL; } - /* Set current handle as a subrequest type. */ - struct session2 *session = handle->data; - kr_assert(session->outgoing); /* Connect or issue query datagram */ - return handle; + return s; } static void ioreq_kill_pending(struct qr_task *task) @@ -842,13 +835,12 @@ static int transmit(struct qr_task *task) if (ret) return ret; - uv_handle_t *handle = ioreq_spawn(SOCK_DGRAM, choice->sin6_family, + struct session2 *session = ioreq_spawn(SOCK_DGRAM, choice->sin6_family, PROTOLAYER_GRP_DOUDP, NULL, 0); - if (!handle) + if (!session) return kr_error(EINVAL); struct sockaddr *addr = (struct sockaddr *)choice; - struct session2 *session = handle->data; struct sockaddr *peer = session2_get_peer(session); kr_assert(peer->sa_family == AF_UNSPEC && session->outgoing); kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6); @@ -1095,7 +1087,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr if (!conn) { return kr_error(EINVAL); } - uv_handle_t *client; + struct session2 *session; bool has_tls = tls_entry; if (has_tls) { @@ -1103,17 +1095,16 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr .protocol = PROTOLAYER_TLS, .param = tls_entry }; - client = ioreq_spawn(SOCK_STREAM, addr->sa_family, + session = ioreq_spawn(SOCK_STREAM, addr->sa_family, PROTOLAYER_GRP_DOTLS, ¶m, 1); } else { - client = ioreq_spawn(SOCK_STREAM, addr->sa_family, + session = ioreq_spawn(SOCK_STREAM, addr->sa_family, PROTOLAYER_GRP_DOTCP, NULL, 0); } - if (!client) { + if (!session) { free(conn); return kr_error(EINVAL); } - struct session2 *session = client->data; if (kr_fails_assert(session->secure == has_tls)) { free(conn); return kr_error(EINVAL); @@ -1150,7 +1141,8 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr } /* Start connection process to upstream. */ - ret = uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect); + ret = uv_tcp_connect(conn, (uv_tcp_t *)session2_get_handle(session), + addr , on_connect); if (ret != 0) { session2_timer_stop(session); worker_del_tcp_waiting(addr);