return uv_default_loop()->data;
}
-static inline void *iohandle_borrow(struct worker_ctx *worker)
-{
- void *h = NULL;
-
- const size_t size = sizeof(uv_handles_t);
- if (worker->pool_iohandles.len > 0) {
- h = array_tail(worker->pool_iohandles);
- array_pop(worker->pool_iohandles);
- kr_asan_unpoison(h, size);
- } else {
- h = malloc(size);
- }
-
- return h;
-}
-
-static inline void iohandle_release(struct worker_ctx *worker, void *h)
-{
- assert(h);
-
- if (worker->pool_iohandles.len < MP_FREELIST_SIZE) {
- array_push(worker->pool_iohandles, h);
- kr_asan_poison(h, sizeof(uv_handles_t));
- } else {
- free(h);
- }
-}
-
-void *worker_iohandle_borrow(struct worker_ctx *worker)
-{
- return iohandle_borrow(worker);
-}
-
-void worker_iohandle_release(struct worker_ctx *worker, void *h)
-{
- iohandle_release(worker, h);
-}
-
-static inline void *iorequest_borrow(struct worker_ctx *worker)
-{
- void *r = NULL;
-
- const size_t size = sizeof(uv_reqs_t);
- if (worker->pool_ioreqs.len > 0) {
- r = array_tail(worker->pool_ioreqs);
- array_pop(worker->pool_ioreqs);
- kr_asan_unpoison(r, size);
- } else {
- r = malloc(size);
- }
-
- return r;
-}
-
-static inline void iorequest_release(struct worker_ctx *worker, void *r)
-{
- assert(r);
-
- if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) {
- array_push(worker->pool_ioreqs, r);
- kr_asan_poison(r, sizeof(uv_reqs_t));
- } else {
- free(r);
- }
-}
-
-
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
* socktype is SOCK_* */
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family)
}
/* Create connection for iterative query */
struct worker_ctx *worker = task->ctx->worker;
- void *h = iohandle_borrow(worker);
- uv_handle_t *handle = (uv_handle_t *)h;
+ uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
+ ? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
if (!handle) {
return NULL;
}
worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
}
- iohandle_release(worker, h);
+ free(handle);
return NULL;
}
}
if (ret < 0) {
io_deinit(handle);
- iohandle_release(worker, h);
+ free(handle);
return NULL;
}
/* Connect or issue query datagram */
{
uv_handle_t *handle = (uv_handle_t *)(req->handle);
uv_loop_t *loop = handle->loop;
- struct worker_ctx *worker = loop->data;
- assert(worker == get_worker());
struct qr_task *task = req->data;
qr_task_on_send(task, handle, status);
qr_task_unref(task);
- iorequest_release(worker, req);
+ free(req);
}
-
+// TODO: unify these two
static void on_task_write(uv_write_t *req, int status)
{
uv_handle_t *handle = (uv_handle_t *)(req->handle);
uv_loop_t *loop = handle->loop;
- struct worker_ctx *worker = loop->data;
- assert(worker == get_worker());
struct qr_task *task = req->data;
qr_task_on_send(task, handle, status);
qr_task_unref(task);
- iorequest_release(worker, req);
+ free(req);
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
int ret = 0;
struct request_ctx *ctx = task->ctx;
- struct worker_ctx *worker = ctx->worker;
struct kr_request *req = &ctx->req;
- void *ioreq = iorequest_borrow(worker);
- if (!ioreq) {
- return qr_task_on_send(task, handle, kr_error(ENOMEM));
- }
+
+ const bool is_stream = handle->type == UV_TCP;
+ if (!is_stream && handle->type != UV_UDP) abort();
+
if (knot_wire_get_qr(pkt->wire) == 0) {
/*
* Query must be finalised using destination address before
* trying to obtain the IP address from it.
*/
ret = kr_resolve_checkout(req, NULL, addr,
- handle->type == UV_UDP ? SOCK_DGRAM : SOCK_STREAM,
+ is_stream ? SOCK_STREAM : SOCK_DGRAM,
pkt);
if (ret != 0) {
- iorequest_release(worker, ioreq);
return ret;
}
}
+ uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
+ if (!ioreq) {
+ return qr_task_on_send(task, handle, kr_error(ENOMEM));
+ }
+
/* Pending ioreq on current task */
qr_task_ref(task);
assert(false);
}
+ struct worker_ctx *worker = ctx->worker;
if (ret == 0) {
if (worker->too_many_open &&
worker->stats.rconcurrent <
worker->too_many_open = false;
}
} else {
- iorequest_release(worker, ioreq);
+ free(ioreq);
qr_task_unref(task);
if (ret == UV_EMFILE) {
worker->too_many_open = true;
if (status == UV_ECANCELED) {
worker_del_tcp_waiting(worker, peer);
assert(session_is_empty(session) && session_flags(session)->closing);
- iorequest_release(worker, req);
+ free(req);
return;
}
if (session_flags(session)->closing) {
worker_del_tcp_waiting(worker, peer);
assert(session_is_empty(session));
- iorequest_release(worker, req);
+ free(req);
return;
}
worker_del_tcp_waiting(worker, peer);
session_waitinglist_retry(session, false);
assert(session_tasklist_is_empty(session));
- iorequest_release(worker, req);
+ free(req);
session_close(session);
return;
}
* something gone wrong */
session_waitinglist_finalize(session, KR_STATE_FAIL);
assert(session_tasklist_is_empty(session));
- iorequest_release(worker, req);
+ free(req);
session_close(session);
return;
}
struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
if (ret == kr_error(EAGAIN)) {
- iorequest_release(worker, req);
+ free(req);
session_start_read(session);
session_timer_start(session, on_tcp_watchdog_timeout,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
if (ret == kr_ok()) {
session_timer_start(session, on_tcp_watchdog_timeout,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
- iorequest_release(worker, req);
+ free(req);
return;
}
}
session_waitinglist_finalize(session, KR_STATE_FAIL);
assert(session_tasklist_is_empty(session));
- iorequest_release(worker, req);
+ free(req);
session_close(session);
}
task->pending_count += 1;
} else {
/* Make connection */
- uv_connect_t *conn = (uv_connect_t *)iorequest_borrow(ctx->worker);
+ uv_connect_t *conn = malloc(sizeof(uv_connect_t));
if (!conn) {
return qr_task_step(task, NULL, NULL);
}
uv_handle_t *client = ioreq_spawn(task, sock_type,
addr->sa_family);
if (!client) {
- iorequest_release(ctx->worker, conn);
+ free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
ret = worker_add_tcp_waiting(ctx->worker, addr, session);
if (ret < 0) {
session_tasklist_del(session, task);
- iorequest_release(ctx->worker, conn);
+ free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
if (ret < 0) {
session_tasklist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
- iorequest_release(ctx->worker, conn);
+ free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
session_tasklist_del(session, task);
session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
- iorequest_release(ctx->worker, conn);
+ free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_step(task, NULL, NULL);
}
session_tasklist_del(session, task);
session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
- iorequest_release(ctx->worker, conn);
+ free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
session_tasklist_del(session, task);
session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
- iorequest_release(ctx->worker, conn);
+ free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_step(task, NULL, NULL);
}
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
array_init(worker->pool_mp);
- array_init(worker->pool_ioreqs);
- array_init(worker->pool_iohandles);
array_init(worker->pool_sessions);
if (array_reserve(worker->pool_mp, ring_maxlen) ||
- array_reserve(worker->pool_ioreqs, ring_maxlen) ||
- array_reserve(worker->pool_iohandles, ring_maxlen) ||
array_reserve(worker->pool_sessions, ring_maxlen)) {
return kr_error(ENOMEM);
}
void worker_reclaim(struct worker_ctx *worker)
{
reclaim_freelist(worker->pool_mp, struct mempool, mp_delete);
- reclaim_freelist(worker->pool_ioreqs, uv_reqs_t, free);
- reclaim_freelist(worker->pool_iohandles, uv_handles_t, free);
reclaim_freelist_custom(worker->pool_sessions, session, session_free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;