return;
}
- uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
- if (!client) {
- return;
- }
- int res = io_create(master->loop, (uv_handle_t *)client,
- SOCK_STREAM, AF_UNSPEC, grp, NULL, 0, false);
+ struct session2 *s;
+ int res = io_create(master->loop, &s, SOCK_STREAM, AF_UNSPEC, grp,
+ NULL, 0, false);
if (res) {
if (res == UV_EMFILE) {
the_worker->too_many_open = true;
/* Since res isn't OK struct session wasn't allocated \ borrowed.
* We must release client handle only.
*/
- free(client);
return;
}
- /* struct session was allocated \ borrowed from memory pool. */
- struct session2 *s = client->data;
kr_require(s->outgoing == false);
+ uv_tcp_t *client = (uv_tcp_t *)session2_get_handle(s);
if (uv_accept(master, (uv_stream_t *)client) != 0) {
/* close session, close underlying uv handles and
* deallocate (or return to memory pool) memory. */
buf->base = malloc(suggested);
}
-struct io_stream_data *io_tty_alloc_data() {
+struct io_stream_data *io_tty_alloc_data(void) {
knot_mm_t *pool = mm_ctx_mempool2(MM_DEFAULT_BLKSIZE);
if (!pool) {
return NULL;
}
#endif
-int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family,
- enum protolayer_grp grp,
- struct protolayer_data_param *layer_param,
- size_t layer_param_count,
- bool outgoing)
+int io_create(uv_loop_t *loop, struct session2 **out_session, int type,
+ unsigned family, enum protolayer_grp grp,
+ struct protolayer_data_param *layer_param,
+ size_t layer_param_count, bool outgoing)
{
+ *out_session = NULL;
int ret = -1;
+ uv_handle_t *handle;
if (type == SOCK_DGRAM) {
- ret = uv_udp_init(loop, (uv_udp_t *)handle);
+ uv_udp_t *udp = malloc(sizeof(uv_udp_t));
+ kr_require(udp);
+ ret = uv_udp_init(loop, udp);
+
+ handle = (uv_handle_t *)udp;
} else if (type == SOCK_STREAM) {
- ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family);
- uv_tcp_nodelay((uv_tcp_t *)handle, 1);
+ uv_tcp_t *tcp = malloc(sizeof(uv_tcp_t));
+ kr_require(tcp);
+ ret = uv_tcp_init_ex(loop, tcp, family);
+ uv_tcp_nodelay(tcp, 1);
+
+ handle = (uv_handle_t *)tcp;
+ } else {
+ kr_require(false && "io_create: invalid socket type");
}
if (ret != 0) {
return ret;
if (s == NULL) {
ret = -1;
}
+
+ *out_session = s;
return ret;
}
return;
}
if (handle->type != UV_POLL) {
- session2_free(handle->data);
+ session2_unhandle(handle->data);
} else {
#if ENABLE_XDP
xdp_handle_data_t *xhd = handle->data;
uv_idle_stop(&xhd->tx_waker);
uv_close((uv_handle_t *)&xhd->tx_waker, NULL);
- session2_free(xhd->session);
+ session2_unhandle(xhd->session);
knot_xdp_deinit(xhd->socket);
queue_deinit(xhd->tx_waker_queue);
free(xhd);
* \param type = SOCK_*
* \param family = AF_*
* \param has_tls has meanings only when type is SOCK_STREAM */
-int io_create(uv_loop_t *loop, uv_handle_t *handle, int type,
- unsigned family, enum protolayer_grp grp,
- struct protolayer_data_param *layer_param,
- size_t layer_param_count,
- bool outgoing);
+int io_create(uv_loop_t *loop, struct session2 **out_session, int type,
+ unsigned family, enum protolayer_grp grp,
+ struct protolayer_data_param *layer_param,
+ size_t layer_param_count, bool outgoing);
void io_free(uv_handle_t *handle);
int io_start_read(uv_handle_t *handle);
io_free(ep->handle);
}
} else { /* Asynchronous close */
- uv_close(ep->handle, io_free);
+ struct session2 *s = ep->handle->data;
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
}
}
ret = uv_timer_init(uv_default_loop(), &s->timer);
kr_require(!ret);
s->timer.data = s;
+ s->uv_count++; /* Session owns the timer */
session2_touch(s);
return s;
}
-void session2_free(struct session2 *s)
+/** De-allocates the session. Must only be called once the underlying IO handle
+ * and timer are already closed, otherwise may leak resources. */
+static void session2_free(struct session2 *s)
{
protolayer_manager_free(s->layers);
wire_buf_deinit(&s->wire_buf);
free(s);
}
+void session2_unhandle(struct session2 *s)
+{
+ if (kr_fails_assert(s->uv_count > 0)) {
+ session2_free(s);
+ return;
+ }
+
+ s->uv_count--;
+ if (s->uv_count <= 0)
+ session2_free(s);
+}
+
int session2_start_read(struct session2 *session)
{
if (session->transport.type == SESSION2_TRANSPORT_IO)
io_free(handle);
}
+static void on_session2_timer_close(uv_handle_t *handle)
+{
+ session2_unhandle(handle->data);
+}
+
static int session2_handle_close(struct session2 *s, uv_handle_t *handle)
{
if (kr_fails_assert(s->transport.type == SESSION2_TRANSPORT_IO
return kr_error(EINVAL);
io_stop_read(handle);
- uv_close((uv_handle_t *)&s->timer, NULL);
+ uv_close((uv_handle_t *)&s->timer, on_session2_timer_close);
uv_close(handle, on_session2_handle_close);
return kr_ok();
}
queue_t(struct qr_task *) waiting; /**< List of tasks waiting for
* sending to upstream. */
+ int uv_count; /**< Number of unclosed libUV handles owned by this
+ * session. */
+
/** Communication information. Typically written into by one of the
* first layers facilitating transport protocol processing.
* Zero-initialized by default. */
layer_param, layer_param_count, outgoing);
s->transport.io.handle = handle;
handle->data = s;
+ s->uv_count++; /* Session owns the handle */
return s;
}
return s;
}
-/** De-allocates the session. Must only be called once the underlying IO handle
- * and timer are already closed, otherwise may leak resources. */
-void session2_free(struct session2 *s);
+/** Used when a libUV handle owned by the session is closed. Once all owned
+ * handles are closed, the session is freed. */
+void session2_unhandle(struct session2 *s);
/** Start reading from the underlying transport. */
int session2_start_read(struct session2 *session);
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
* socktype is SOCK_* */
-static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family,
- enum protolayer_grp grp,
- struct protolayer_data_param *layer_param,
- size_t layer_param_count)
+static struct session2 *ioreq_spawn(int socktype, sa_family_t family,
+ enum protolayer_grp grp,
+ struct protolayer_data_param *layer_param,
+ size_t layer_param_count)
{
bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
&& (family == AF_INET || family == AF_INET6);
}
/* Create connection for iterative query */
- uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
- ? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
- kr_require(handle);
-
- int ret = io_create(the_worker->loop, handle, socktype, family, grp,
+ struct session2 *s;
+ int ret = io_create(the_worker->loop, &s, socktype, family, grp,
layer_param, layer_param_count, true);
if (ret) {
if (ret == UV_EMFILE) {
the_worker->too_many_open = true;
the_worker->rconcurrent_highwatermark = the_worker->stats.rconcurrent;
}
- free(handle);
return NULL;
}
}
if (addr->ip.sa_family != AF_UNSPEC) {
if (kr_fails_assert(addr->ip.sa_family == family)) {
- io_free(handle);
+ session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
return NULL;
}
if (socktype == SOCK_DGRAM) {
- uv_udp_t *udp = (uv_udp_t *)handle;
+ uv_udp_t *udp = (uv_udp_t *)session2_get_handle(s);
ret = uv_udp_bind(udp, &addr->ip, 0);
} else if (socktype == SOCK_STREAM){
- uv_tcp_t *tcp = (uv_tcp_t *)handle;
+ uv_tcp_t *tcp = (uv_tcp_t *)session2_get_handle(s);
ret = uv_tcp_bind(tcp, &addr->ip, 0);
}
}
if (ret != 0) {
- io_free(handle);
+ session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
return NULL;
}
- /* Set current handle as a subrequest type. */
- struct session2 *session = handle->data;
- kr_assert(session->outgoing);
/* Connect or issue query datagram */
- return handle;
+ return s;
}
static void ioreq_kill_pending(struct qr_task *task)
if (ret)
return ret;
- uv_handle_t *handle = ioreq_spawn(SOCK_DGRAM, choice->sin6_family,
+ struct session2 *session = ioreq_spawn(SOCK_DGRAM, choice->sin6_family,
PROTOLAYER_GRP_DOUDP, NULL, 0);
- if (!handle)
+ if (!session)
return kr_error(EINVAL);
struct sockaddr *addr = (struct sockaddr *)choice;
- struct session2 *session = handle->data;
struct sockaddr *peer = session2_get_peer(session);
kr_assert(peer->sa_family == AF_UNSPEC && session->outgoing);
kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
if (!conn) {
return kr_error(EINVAL);
}
- uv_handle_t *client;
+ struct session2 *session;
bool has_tls = tls_entry;
if (has_tls) {
.protocol = PROTOLAYER_TLS,
.param = tls_entry
};
- client = ioreq_spawn(SOCK_STREAM, addr->sa_family,
+ session = ioreq_spawn(SOCK_STREAM, addr->sa_family,
PROTOLAYER_GRP_DOTLS, ¶m, 1);
} else {
- client = ioreq_spawn(SOCK_STREAM, addr->sa_family,
+ session = ioreq_spawn(SOCK_STREAM, addr->sa_family,
PROTOLAYER_GRP_DOTCP, NULL, 0);
}
- if (!client) {
+ if (!session) {
free(conn);
return kr_error(EINVAL);
}
- struct session2 *session = client->data;
if (kr_fails_assert(session->secure == has_tls)) {
free(conn);
return kr_error(EINVAL);
}
/* Start connection process to upstream. */
- ret = uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect);
+ ret = uv_tcp_connect(conn, (uv_tcp_t *)session2_get_handle(session),
+ addr , on_connect);
if (ret != 0) {
session2_timer_stop(session);
worker_del_tcp_waiting(addr);