};
-int32_t tcp_connected = 0;
-int32_t tcp_waiting = 0;
-
-/* @internal Union of various libuv objects for freelist. */
-struct req
-{
- union {
- /* Socket handles, these have session as their `handle->data` and own it. */
- uv_udp_t udp;
- uv_tcp_t tcp;
- /* I/O events, these have only a reference to the task they're operating on. */
- uv_udp_send_t send;
- uv_write_t write;
- uv_connect_t connect;
- /* Timer events */
- uv_timer_t timer;
- } as;
-};
-
/* Convenience macros */
#define qr_task_ref(task) \
do { ++(task)->refs; } while(0)
return uv_default_loop()->data;
}
-static inline struct req *req_borrow(struct worker_ctx *worker)
+static inline void *iohandle_borrow(struct worker_ctx *worker)
+{
+ void *h = NULL;
+
+ const size_t size = sizeof(union uv_handles);
+ if (worker->pool_iohandles.len > 0) {
+ h = array_tail(worker->pool_iohandles);
+ array_pop(worker->pool_iohandles);
+ kr_asan_unpoison(h, size);
+ } else {
+ h = malloc(size);
+ }
+
+ return h;
+}
+
+static inline void iohandle_release(struct worker_ctx *worker, void *h)
{
- struct req *req = NULL;
- if (worker->pool_ioreq.len > 0) {
- req = array_tail(worker->pool_ioreq);
- array_pop(worker->pool_ioreq);
- kr_asan_unpoison(req, sizeof(*req));
+ assert(h);
+
+ const size_t size = sizeof(union uv_handles);
+ if (worker->pool_iohandles.len < MP_FREELIST_SIZE) {
+ array_push(worker->pool_iohandles, h);
+ kr_asan_poison(h, size);
} else {
- req = malloc(sizeof(*req));
+ free(h);
}
- return req;
}
-static inline void req_release(struct worker_ctx *worker, struct req *req)
+void *worker_iohandle_borrow(struct worker_ctx *worker)
{
- if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) {
- array_push(worker->pool_ioreq, req);
- kr_asan_poison(req, sizeof(*req));
+ return iohandle_borrow(worker);
+}
+
+void worker_iohandle_release(struct worker_ctx *worker, void *h)
+{
+ iohandle_release(worker, h);
+}
+
+static inline void *iorequest_borrow(struct worker_ctx *worker)
+{
+ void *r = NULL;
+
+ const size_t size = sizeof(union uv_reqs);
+ if (worker->pool_ioreqs.len > 0) {
+ r = array_tail(worker->pool_ioreqs);
+ array_pop(worker->pool_ioreqs);
+ kr_asan_unpoison(r, size);
} else {
- free(req);
+ r = malloc(size);
}
+
+ return r;
}
+static inline void iorequest_release(struct worker_ctx *worker, void *r)
+{
+ assert(r);
+
+ const size_t size = sizeof(union uv_reqs);
+ if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) {
+ array_push(worker->pool_ioreqs, r);
+ kr_asan_poison(r, size);
+ } else {
+ free(r);
+ }
+}
+
+
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
* socktype is SOCK_* */
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family)
}
/* Create connection for iterative query */
struct worker_ctx *worker = task->ctx->worker;
- uv_handle_t *handle = (uv_handle_t *)req_borrow(worker);
+ void *h = iohandle_borrow(worker);
+ uv_handle_t *handle = (uv_handle_t *)h;
if (!handle) {
return NULL;
}
if (addr->ip.sa_family != AF_UNSPEC) {
assert(addr->ip.sa_family == family);
if (socktype == SOCK_DGRAM) {
- ret = uv_udp_bind((uv_udp_t *)handle, &addr->ip, 0);
- } else {
- ret = uv_tcp_bind((uv_tcp_t *)handle, &addr->ip, 0);
+ uv_udp_t *udp = (uv_udp_t *)handle;
+ ret = uv_udp_bind(udp, &addr->ip, 0);
+ } else if (socktype == SOCK_STREAM){
+ uv_tcp_t *tcp = (uv_tcp_t *)handle;
+ ret = uv_tcp_bind(tcp, &addr->ip, 0);
}
}
}
if (ret < 0) {
io_deinit(handle);
- req_release(worker, (struct req *)handle);
+ iohandle_release(worker, h);
return NULL;
}
/* Connect or issue query datagram */
static void on_session_close(uv_handle_t *handle)
{
- struct worker_ctx *worker = get_worker();
+ uv_loop_t *loop = handle->loop;
+ struct worker_ctx *worker = loop->data;
struct session *session = handle->data;
- if (!session->outgoing) {
- assert(session->handle->type == UV_TCP);
- }
- bool free_handle = false;
- if (!session->outgoing && session->handle->type == UV_TCP) {
- free_handle = true;
- }
+ assert(session->handle == handle);
+ session->handle = NULL;
io_deinit(handle);
- if (free_handle) {
- free(handle);
- } else {
- req_release(worker, (struct req *)handle);
- }
+ iohandle_release(worker, handle);
}
static void on_session_timer_close(uv_handle_t *timer)
{
struct session *session = timer->data;
uv_handle_t *handle = session->handle;
+ assert(handle && handle->data == session);
+ assert (session->outgoing || handle->type == UV_TCP);
if (!uv_is_closing(handle)) {
uv_close(handle, on_session_close);
}
session->buffering = NULL;
}
+ uv_handle_t *handle = session->handle;
+ io_stop_read(handle);
session->closing = true;
if (session->outgoing &&
session->peer.ip.sa_family != AF_UNSPEC) {
/* TODO Relocate pool to struct request */
ctx->worker = worker;
array_init(ctx->tasks);
- ctx->source.session = handle ? handle->data : NULL;
+ struct session *session = handle ? handle->data : NULL;
+ if (session) {
+ assert(session->outgoing == false);
+ }
+ ctx->source.session = session;
struct kr_request *req = &ctx->req;
req->pool = pool;
/* Process source session. */
if (source_session) {
/* Walk the session task list and remove itself. */
+ assert(source_session->outgoing == false);
session_del_tasks(source_session, task);
/* Start reading again if the session is throttled and
* the number of outgoing requests is below watermark. */
uv_handle_t *handle = source_session->handle;
if (handle && source_session->tasks.len < worker->tcp_pipeline_max/2) {
if (!uv_is_closing(handle) && source_session->throttled) {
+ assert(source_session->closing == false);
io_start_read(handle);
source_session->throttled = false;
}
/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session *session)
{
- assert(session->outgoing == false);
+ assert(session->outgoing == false && session->handle->type == UV_TCP);
int ret = array_reserve(session->tasks, session->tasks.len + 1);
if (ret != 0) {
/* This is called when we send subrequest / answer */
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
{
+ assert(handle == NULL || uv_is_closing(handle) == false);
if (task->finished) {
assert(task->leading == false);
qr_task_complete(task);
int ret = qr_task_send(t, (uv_handle_t *)handle,
&session->peer.ip, t->pktbuf);
if (ret != kr_ok()) {
+ uv_timer_t *timer = &session->timeout;
+ uv_timer_stop(timer);
while (session->waiting.len > 0) {
struct qr_task *task = session->waiting.at[0];
+ if (session->outgoing) {
+ qr_task_finalize(task, KR_STATE_FAIL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
array_del(session->waiting, 0);
- qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
+ session_del_tasks(session, task);
}
while (session->tasks.len > 0) {
struct qr_task *task = session->tasks.at[0];
- array_del(session->tasks, 0);
- qr_task_finalize(task, KR_STATE_FAIL);
+ if (session->outgoing) {
+ qr_task_finalize(task, KR_STATE_FAIL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
+ session_del_tasks(session, task);
}
session_close(session);
return status;
}
}
}
- io_start_read(handle); /* Start reading new query */
+ if (!uv_is_closing(handle)) {
+ io_start_read(handle); /* Start reading new query */
+ }
}
return status;
}
static void on_send(uv_udp_send_t *req, int status)
{
- struct worker_ctx *worker = get_worker();
+ uv_handle_t *handle = (uv_handle_t *)(req->handle);
+ uv_loop_t *loop = handle->loop;
+ struct worker_ctx *worker = loop->data;
+ assert(worker == get_worker());
struct qr_task *task = req->data;
- if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
- qr_task_on_send(task, (uv_handle_t *)req->handle, status);
+ if (qr_valid_handle(task, handle)) {
+ qr_task_on_send(task, handle, status);
}
qr_task_unref(task);
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
}
static void on_write(uv_write_t *req, int status)
{
- struct worker_ctx *worker = get_worker();
+ uv_handle_t *handle = (uv_handle_t *)(req->handle);
+ uv_loop_t *loop = handle->loop;
+ struct worker_ctx *worker = loop->data;
+ assert(worker == get_worker());
struct qr_task *task = req->data;
- if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
- qr_task_on_send(task, (uv_handle_t *)req->handle, status);
+ if (qr_valid_handle(task, handle)) {
+ qr_task_on_send(task, handle, status);
}
qr_task_unref(task);
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
/* Synchronous push to TLS context, bypassing event loop. */
struct session *session = handle->data;
+ assert(session->closing == false);
if (session->has_tls) {
struct kr_request *req = &task->ctx->req;
int ret = kr_ok();
struct request_ctx *ctx = task->ctx;
struct worker_ctx *worker = ctx->worker;
struct kr_request *req = &ctx->req;
- struct req *send_req = req_borrow(worker);
- if (!send_req) {
+ void *ioreq = iorequest_borrow(worker);
+ if (!ioreq) {
return qr_task_on_send(task, handle, kr_error(ENOMEM));
}
if (knot_wire_get_qr(pkt->wire) == 0) {
handle->type == UV_UDP ? SOCK_DGRAM : SOCK_STREAM,
pkt);
if (ret != kr_ok()) {
- req_release(worker, send_req);
+ iorequest_release(worker, ioreq);
return ret;
}
}
/* Send using given protocol */
if (handle->type == UV_UDP) {
+ uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
- send_req->as.send.data = task;
- ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
- } else {
+ send_req->data = task;
+ ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
+ } else if (handle->type == UV_TCP) {
+ uv_write_t *write_req = (uv_write_t *)ioreq;
uint16_t pkt_size = htons(pkt->size);
uv_buf_t buf[2] = {
{ (char *)&pkt_size, sizeof(pkt_size) },
{ (char *)pkt->wire, pkt->size }
};
- send_req->as.write.data = task;
- ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
+ write_req->data = task;
+ ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
+ } else {
+ assert(false);
}
+
if (ret == 0) {
qr_task_ref(task); /* Pending ioreq on current task */
if (worker->too_many_open &&
worker->stats.rconcurrent <
- worker->rconcurrent_highwatermark - (worker->rconcurrent_highwatermark / 4)) {
+ worker->rconcurrent_highwatermark - 10) {
worker->too_many_open = false;
}
} else {
- req_release(worker, send_req);
+ iorequest_release(worker, ioreq);
if (ret == UV_EMFILE) {
worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
if (status == UV_ECANCELED) {
worker_del_tcp_waiting(worker, &peer->ip);
assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0);
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
return;
}
if (session->closing) {
worker_del_tcp_waiting(worker, &peer->ip);
assert(session->waiting.len == 0 && session->tasks.len == 0);
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
return;
}
qr_task_unref(task);
}
assert(session->tasks.len == 0);
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
session_close(session);
return;
}
qr_task_unref(task);
}
assert(session->tasks.len == 0);
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
session_close(session);
return;
}
ret = tls_client_connect_start(session->tls_client_ctx,
session, session_tls_hs_cb);
if (ret == kr_error(EAGAIN)) {
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
io_start_read(session->handle);
return;
}
ret = session_next_waiting_send(session);
if (ret == kr_ok()) {
worker_add_tcp_connected(worker, &session->peer.ip, session);
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
return;
}
}
assert(session->tasks.len == 0);
- req_release(worker, (struct req *)req);
+ iorequest_release(worker, req);
session_close(session);
}
session_del_tasks(session, task);
array_del(session->waiting, 0);
qr_task_unref(task);
- assert(task->refs == 1);
qr_task_finalize(task, KR_STATE_FAIL);
}
static void on_udp_timeout(uv_timer_t *timer)
{
struct session *session = timer->data;
- uv_timer_stop(timer);
+ uv_handle_t *handle = session->handle;
+ assert(handle->data == session);
+
+ uv_timer_stop(timer);
assert(session->tasks.len == 1);
assert(session->waiting.len == 0);
task->finished = true;
/* Send back answer */
if (ctx->source.session != NULL) {
- (void) qr_task_send(task, ctx->source.session->handle,
+ uv_handle_t *handle = ctx->source.session->handle;
+ assert(ctx->source.session->closing == false);
+ assert(handle->data == ctx->source.session);
+ assert(!uv_is_closing(handle));
+ assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
+ (void) qr_task_send(task, handle,
(struct sockaddr *)&ctx->source.addr,
ctx->req.answer);
} else {
return kr_error(ESTALE);
}
-
/* Close pending I/O requests */
subreq_finalize(task, packet_source, packet);
/* Consume input and produce next query */
struct request_ctx *ctx = task->ctx;
+ assert(ctx);
struct kr_request *req = &ctx->req;
struct worker_ctx *worker = ctx->worker;
int sock_type = -1;
if (worker->too_many_open) {
struct kr_rplan *rplan = &req->rplan;
if (worker->stats.rconcurrent <
- worker->rconcurrent_highwatermark - (worker->rconcurrent_highwatermark / 4)) {
+ worker->rconcurrent_highwatermark - 10) {
worker->too_many_open = false;
} else if (packet && kr_rplan_empty(rplan)) {
/* new query; TODO - make this detection more obvious */
}
struct session* session = NULL;
if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
+ assert(session->outgoing);
if (session->closing) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
task->pending_count += 1;
} else {
/* Make connection */
- uv_connect_t *conn = (uv_connect_t *)req_borrow(ctx->worker);
+ uv_connect_t *conn = (uv_connect_t *)iorequest_borrow(ctx->worker);
if (!conn) {
return qr_task_step(task, NULL, NULL);
}
uv_handle_t *client = ioreq_spawn(task, sock_type,
addr->sa_family);
if (!client) {
- req_release(ctx->worker, (struct req *)conn);
+ iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
ret = worker_add_tcp_waiting(ctx->worker, addr, session);
if (ret < 0) {
session_del_tasks(session, task);
- req_release(ctx->worker, (struct req *)conn);
+ iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
if (ret < 0) {
session_del_tasks(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
- req_release(ctx->worker, (struct req *)conn);
+ iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
session_del_tasks(session, task);
session_del_waiting(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
- req_release(ctx->worker, (struct req *)conn);
+ iorequest_release(ctx->worker, conn);
return qr_task_step(task, NULL, NULL);
}
tls_client_ctx_set_params(tls_ctx, entry);
session_del_tasks(session, task);
session_del_waiting(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
- req_release(ctx->worker, (struct req *)conn);
+ iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
session_del_tasks(session, task);
session_del_waiting(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
- req_release(ctx->worker, (struct req *)conn);
+ iorequest_release(ctx->worker, conn);
return qr_task_step(task, NULL, NULL);
}
}
}
} else if (msg) { /* response from upstream */
task = find_task(session, knot_wire_get_id(msg->wire));
+ if (task == NULL) {
+ return kr_error(ENOENT);
+ }
+ assert(session->closing == false);
}
+ fflush(stdout);
+ assert(uv_is_closing(session->handle) == false);
/* Consume input and produce next message */
return qr_task_step(task, NULL, msg);
assert(addr);
const char *key = tcpsess_key(addr);
assert(key);
- tcp_connected += 1;
assert(map_contains(&worker->tcp_connected, key) == 0);
return map_add_tcp_session(&worker->tcp_connected, addr, session);
}
const char *key = tcpsess_key(addr);
assert(key);
int ret = map_del_tcp_session(&worker->tcp_connected, addr);
- if (ret == 0) {
- tcp_connected -= 1;
- }
return ret;
}
assert(key);
assert(map_contains(&worker->tcp_waiting, key) == 0);
int ret = map_add_tcp_session(&worker->tcp_waiting, addr, session);
- if (ret == 0) {
- tcp_waiting += 1;
- }
return ret;
}
const char *key = tcpsess_key(addr);
assert(key);
int ret = map_del_tcp_session(&worker->tcp_waiting, addr);
- if (ret == 0) {
- tcp_waiting -= 1;
- }
return ret;
}
}
/* Connection error or forced disconnect */
struct session *session = handle->data;
+ assert(session);
if (session->closing) {
return kr_ok();
}
struct qr_task *task = session->waiting.at[0];
if (session->outgoing) {
qr_task_finalize(task, KR_STATE_FAIL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
}
array_del(session->waiting, 0);
qr_task_unref(task);
struct qr_task *task = session->tasks.at[0];
if (session->outgoing) {
qr_task_finalize(task, KR_STATE_FAIL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
}
session_del_tasks(session, task);
}
uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2);
if (msg_size < KNOT_WIRE_HEADER_SIZE) {
/* better kill the connection; we would probably get out of sync */
- assert(false);
+ uv_timer_t *timer = &session->timeout;
+ uv_timer_stop(timer);
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ if (session->outgoing) {
+ qr_task_finalize(task, KR_STATE_FAIL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
+ array_del(session->waiting, 0);
+ qr_task_unref(task);
+ session_del_tasks(session, task);
+ }
+ while (session->tasks.len > 0) {
+ struct qr_task *task = session->tasks.at[0];
+ if (session->outgoing) {
+ qr_task_finalize(task, KR_STATE_FAIL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
+ session_del_tasks(session, task);
+ }
+ session_close(session);
+
return kr_error(EILSEQ);
}
if (task) {
knot_pkt_clear(task->pktbuf);
} else {
- /* TODO: only ignore one message without killing connection */
session->buffering = NULL;
session->bytes_to_skip = msg_size - 2;
ssize_t min_len = MIN(session->bytes_to_skip, len);
ssize_t to_read = MIN(len, task->bytes_remaining);
if (pkt_buf->size + to_read > pkt_buf->max_size) {
pkt_buf->size = 0;
+ session->bytes_to_skip = task->bytes_remaining - to_read;
task->bytes_remaining = 0;
- /* TODO: only ignore one message without killing connection */
session->buffering = NULL;
- return kr_error(EMSGSIZE);
+ return submitted;
}
/* Buffer message and check if it's complete */
memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
/* Message was assembled, clear temporary. */
session->buffering = NULL;
session->msg_hdr_idx = 0;
- session_del_tasks(session, task);
+ if (session->outgoing) {
+ session_del_tasks(session, task);
+ }
/* Parse the packet and start resolving complete query */
int ret = parse_packet(pkt_buf);
if (ret == 0 && !session->outgoing) {
/* TODO: this is simple via iteration; recursion doesn't really help */
ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read);
if (ret < 0) {
+ assert(false);
return ret;
}
submitted += ret;
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
array_init(worker->pool_mp);
- array_init(worker->pool_ioreq);
+ array_init(worker->pool_ioreqs);
+ array_init(worker->pool_iohandles);
array_init(worker->pool_sessions);
if (array_reserve(worker->pool_mp, ring_maxlen) ||
- array_reserve(worker->pool_ioreq, ring_maxlen) ||
+ array_reserve(worker->pool_ioreqs, ring_maxlen) ||
+ array_reserve(worker->pool_iohandles, ring_maxlen) ||
array_reserve(worker->pool_sessions, ring_maxlen)) {
return kr_error(ENOMEM);
}
#define reclaim_freelist(list, type, cb) \
for (unsigned i = 0; i < list.len; ++i) { \
- type *elm = list.at[i]; \
+ void *elm = list.at[i]; \
kr_asan_unpoison(elm, sizeof(type)); \
cb(elm); \
} \
void worker_reclaim(struct worker_ctx *worker)
{
reclaim_freelist(worker->pool_mp, struct mempool, mp_delete);
- reclaim_freelist(worker->pool_ioreq, struct req, free);
+ reclaim_freelist(worker->pool_ioreqs, union uv_reqs, free);
+ reclaim_freelist(worker->pool_iohandles, union uv_handles, free);
reclaim_freelist(worker->pool_sessions, struct session, session_free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;