#define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt)
- /* @internal Union of various libuv objects for freelist. */
- struct req
- {
- union {
- /* Socket handles, these have session as their `handle->data` and own it. */
- uv_udp_t udp;
- uv_tcp_t tcp;
- /* I/O events, these have only a reference to the task they're operating on. */
- uv_udp_send_t send;
- uv_write_t write;
- uv_connect_t connect;
- /* Timer events */
- uv_timer_t timer;
- } as;
+ /** Client request state. */
+ struct request_ctx
+ {
+ struct kr_request req;
+ struct {
+ union inaddr addr;
+ union inaddr dst_addr;
+ /* uv_handle_t *handle; */
+
+ /** NULL if the request didn't come over network. */
+ struct session *session;
+ } source;
+ struct worker_ctx *worker;
+ qr_tasklist_t tasks;
+ };
+
+ /** Query resolution task. */
+ struct qr_task
+ {
+ struct request_ctx *ctx;
+ knot_pkt_t *pktbuf;
+ qr_tasklist_t waiting;
+ uv_handle_t *pending[MAX_PENDING];
+ uint16_t pending_count;
+ uint16_t addrlist_count;
+ uint16_t addrlist_turn;
+ uint16_t timeouts;
+ uint16_t iter_count;
+ uint16_t bytes_remaining;
+ struct sockaddr *addrlist;
- worker_cb_t on_complete;
- void *baton;
+ uint32_t refs;
+ bool finished : 1;
+ bool leading : 1;
};
+
/* Convenience macros */
#define qr_task_ref(task) \
do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
- do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
+ do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
#define qr_valid_handle(task, checked) \
- (!uv_is_closing((checked)) || (task)->source.handle == (checked))
+ (!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))
+
+ /** @internal get key for tcp session
+ * @note kr_straddr() return pointer to static string
+ */
+ #define tcpsess_key(addr) kr_straddr(addr)
/* Forward decls */
static void qr_task_free(struct qr_task *task);
return uv_default_loop()->data;
}
- static inline struct req *req_borrow(struct worker_ctx *worker)
+ static inline void *iohandle_borrow(struct worker_ctx *worker)
+ {
+ void *h = NULL;
+
+ const size_t size = sizeof(uv_handles_t);
+ if (worker->pool_iohandles.len > 0) {
+ h = array_tail(worker->pool_iohandles);
+ array_pop(worker->pool_iohandles);
+ kr_asan_unpoison(h, size);
+ } else {
+ h = malloc(size);
+ }
+
+ return h;
+ }
+
+ static inline void iohandle_release(struct worker_ctx *worker, void *h)
+ {
+ assert(h);
+
- const size_t size = sizeof(uv_handles_t);
+ if (worker->pool_iohandles.len < MP_FREELIST_SIZE) {
+ array_push(worker->pool_iohandles, h);
- kr_asan_poison(h, size);
++ kr_asan_poison(h, sizeof(uv_handles_t));
+ } else {
+ free(h);
+ }
+ }
+
+ void *worker_iohandle_borrow(struct worker_ctx *worker)
+ {
+ return iohandle_borrow(worker);
+ }
+
+ void worker_iohandle_release(struct worker_ctx *worker, void *h)
+ {
+ iohandle_release(worker, h);
+ }
+
+ static inline void *iorequest_borrow(struct worker_ctx *worker)
{
- struct req *req = NULL;
- if (worker->pool_ioreq.len > 0) {
- req = array_tail(worker->pool_ioreq);
- array_pop(worker->pool_ioreq);
- kr_asan_unpoison(req, sizeof(*req));
+ void *r = NULL;
+
+ const size_t size = sizeof(uv_reqs_t);
+ if (worker->pool_ioreqs.len > 0) {
+ r = array_tail(worker->pool_ioreqs);
+ array_pop(worker->pool_ioreqs);
+ kr_asan_unpoison(r, size);
} else {
- req = malloc(sizeof(*req));
+ r = malloc(size);
}
- return req;
+
+ return r;
}
- static inline void req_release(struct worker_ctx *worker, struct req *req)
+ static inline void iorequest_release(struct worker_ctx *worker, void *r)
{
- if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) {
- array_push(worker->pool_ioreq, req);
- kr_asan_poison(req, sizeof(*req));
+ assert(r);
+
- const size_t size = sizeof(uv_reqs_t);
+ if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) {
+ array_push(worker->pool_ioreqs, r);
- kr_asan_poison(r, size);
++ kr_asan_poison(r, sizeof(uv_reqs_t));
} else {
- free(req);
+ free(r);
}
}
.alloc = (knot_mm_alloc_t) mp_alloc
};
- /* Create resolution task */
- struct qr_task *task = mm_alloc(&pool, sizeof(*task));
- if (!task) {
- mp_delete(pool.ctx);
+ /* Create request context */
+ struct request_ctx *ctx = mm_alloc(&pool, sizeof(*ctx));
+ if (!ctx) {
+ pool_release(worker, pool.ctx);
return NULL;
}
- memset(&task->req, 0, sizeof(task->req));
+
- /* Create packet buffers for answer and subrequests */
- task->req.pool = pool;
- knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool);
- if (!pktbuf) {
- mp_delete(pool.ctx);
- return NULL;
+ memset(ctx, 0, sizeof(*ctx));
+
+ /* TODO Relocate pool to struct request */
+ ctx->worker = worker;
+ array_init(ctx->tasks);
+ struct session *session = handle ? handle->data : NULL;
+ if (session) {
+ assert(session->outgoing == false);
}
- pktbuf->size = 0;
- task->pktbuf = pktbuf;
- array_init(task->waiting);
- task->addrlist = NULL;
- task->pending_count = 0;
- task->bytes_remaining = 0;
- task->iter_count = 0;
- task->timeouts = 0;
- task->refs = 1;
- task->finished = false;
- task->leading = false;
- task->worker = worker;
- task->session = NULL;
- task->source.handle = handle;
- task->timeout = NULL;
+ ctx->source.session = session;
+
+ struct kr_request *req = &ctx->req;
+ req->pool = pool;
+
/* Remember query source addr */
- if (addr) {
+ if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
+ ctx->source.addr.ip.sa_family = AF_UNSPEC;
+ } else {
size_t addr_len = sizeof(struct sockaddr_in);
if (addr->sa_family == AF_INET6)
addr_len = sizeof(struct sockaddr_in6);
static void qr_task_complete(struct qr_task *task)
{
- struct worker_ctx *worker = ctx->worker;
+ struct request_ctx *ctx = task->ctx;
++
/* Kill pending I/O requests */
- ioreq_killall(task);
+ ioreq_kill_pending(task);
assert(task->waiting.len == 0);
assert(task->leading == false);
- /* Run the completion callback. */
- if (task->on_complete) {
- task->on_complete(worker, &ctx->req, task->baton);
- }
++
+ struct session *source_session = ctx->source.session;
+ if (source_session) {
+ assert(source_session->outgoing == false &&
+ source_session->waiting.len == 0);
+ session_del_tasks(source_session, task);
+ }
++
/* Release primary reference to task. */
- qr_task_unref(task);
+ request_del_tasks(ctx, task);
}
/* This is called when we send subrequest / answer */
return ret;
}
}
- /* Send using given protocol */
- if (handle->type == UV_UDP) {
- uv_buf_t buf = { (char *)pkt->wire, pkt->size };
- send_req->as.send.data = task;
- ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
- } else {
- uint16_t pkt_size = htons(pkt->size);
- uv_buf_t buf[2] = {
- { (char *)&pkt_size, sizeof(pkt_size) },
- { (char *)pkt->wire, pkt->size }
- };
- send_req->as.write.data = task;
- ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
- }
- if (ret == 0) {
- qr_task_ref(task); /* Pending ioreq on current task */
- } else {
- req_release(task->worker, send_req);
+ /* Send using given protocol */
+ if (handle->type == UV_UDP) {
+ uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
+ uv_buf_t buf = { (char *)pkt->wire, pkt->size };
+ send_req->data = task;
+ ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
+ } else if (handle->type == UV_TCP) {
+ uv_write_t *write_req = (uv_write_t *)ioreq;
+ uint16_t pkt_size = htons(pkt->size);
+ uv_buf_t buf[2] = {
+ { (char *)&pkt_size, sizeof(pkt_size) },
+ { (char *)pkt->wire, pkt->size }
+ };
+ write_req->data = task;
+ ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
+ } else {
+ assert(false);
+ }
+
+ if (ret == 0) {
+ qr_task_ref(task); /* Pending ioreq on current task */
+ if (worker->too_many_open &&
+ worker->stats.rconcurrent <
+ worker->rconcurrent_highwatermark - 10) {
+ worker->too_many_open = false;
+ }
+ } else {
+ iorequest_release(worker, ioreq);
+ if (ret == UV_EMFILE) {
+ worker->too_many_open = true;
+ worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
+ }
+ }
+
+ /* Update statistics */
+ if (ctx->source.session &&
+ handle != ctx->source.session->handle &&
+ addr) {
+ if (handle->type == UV_UDP)
+ worker->stats.udp += 1;
+ else
+ worker->stats.tcp += 1;
+ if (addr->sa_family == AF_INET6)
+ worker->stats.ipv6 += 1;
+ else if (addr->sa_family == AF_INET)
+ worker->stats.ipv4 += 1;
+ }
+ return ret;
+ }
+
+ static int session_next_waiting_send(struct session *session)
+ {
+ union inaddr *peer = &session->peer;
+ int ret = kr_ok();
+ if (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf);
+ }
+ session->timeout.data = session;
+ timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+ return ret;
+ }
+
+ static int session_tls_hs_cb(struct session *session, int status)
+ {
+ VERBOSE_MSG(NULL, "=> server: '%s' TLS handshake has %s\n",
+ kr_straddr(&session->peer.ip), status ? "failed" : "completed");
+
+ struct worker_ctx *worker = get_worker();
+ union inaddr *peer = &session->peer;
+ int deletion_res = worker_del_tcp_waiting(worker, &peer->ip);
+
+ if (status) {
+ for (size_t i = 0; i < session->waiting.len; ++i) {
+ struct qr_task *task = session->waiting.at[0];
+ struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
+ kr_nsrep_update_rtt(&qry->ns, &peer->ip, KR_NS_TIMEOUT,
+ worker->engine->resolver.cache_rtt, KR_NS_UPDATE);
+ }
+ } else {
+ if (deletion_res != 0) {
+ /* session isn't in list of waiting queries, *
+ * something gone wrong */
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
+ }
+ assert(session->tasks.len == 0);
+ session_close(session);
+ return kr_ok();
+ }
+
+ int ret = session_next_waiting_send(session);
+ if (ret == kr_ok()) {
+ struct worker_ctx *worker = get_worker();
+ union inaddr *peer = &session->peer;
+ int ret = worker_add_tcp_connected(worker, &peer->ip, session);
+ assert(ret == 0);
+ }
+ }
+ return kr_ok();
+ }
+
++static struct kr_query *session_current_query(struct session *session)
++{
++ if (session->waiting.len == 0) {
++ return NULL;
++ }
++
++ struct qr_task *task = session->waiting.at[0];
++ if (task->ctx->req.rplan.pending.len == 0) {
++ return NULL;
++ }
++
++ return array_tail(task->ctx->req.rplan.pending);
++}
++
+ static void on_connect(uv_connect_t *req, int status)
+ {
+ struct worker_ctx *worker = get_worker();
+ uv_stream_t *handle = req->handle;
+ struct session *session = handle->data;
+
+ union inaddr *peer = &session->peer;
+ uv_timer_stop((uv_timer_t *)&session->timeout);
+
+ if (status == UV_ECANCELED) {
+ worker_del_tcp_waiting(worker, &peer->ip);
+ assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0);
+ iorequest_release(worker, req);
+ return;
+ }
+
+ if (session->closing) {
+ worker_del_tcp_waiting(worker, &peer->ip);
+ assert(session->waiting.len == 0 && session->tasks.len == 0);
+ iorequest_release(worker, req);
+ return;
+ }
+
+ if (status != 0) {
+ worker_del_tcp_waiting(worker, &peer->ip);
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ assert(task->refs > 1);
+ qr_task_unref(task);
+ qr_task_step(task, NULL, NULL);
+ }
+ assert(session->tasks.len == 0);
+ iorequest_release(worker, req);
+ session_close(session);
+ return;
+ }
+
+ if (!session->has_tls) {
+ /* if there is a TLS, session still waiting for handshake,
+ * otherwise remove it from waiting list */
+ if (worker_del_tcp_waiting(worker, &peer->ip) != 0) {
+ /* session isn't in list of waiting queries, *
+ * something gone wrong */
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
+ }
+ assert(session->tasks.len == 0);
+ iorequest_release(worker, req);
+ session_close(session);
+ return;
+ }
+ }
+
- WITH_VERBOSE {
++ struct kr_query *qry = session_current_query(session);
++ WITH_VERBOSE (qry) {
+ char addr_str[INET6_ADDRSTRLEN];
+ inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
+ addr_str, sizeof(addr_str));
- VERBOSE_MSG(NULL, "=> connected to '%s'\n", addr_str);
++ VERBOSE_MSG(qry, "=> connected to '%s'\n", addr_str);
+ }
+
+ session->connected = true;
+ session->handle = (uv_handle_t *)handle;
+
+ int ret = kr_ok();
+ if (session->has_tls) {
+ ret = tls_client_connect_start(session->tls_client_ctx,
+ session, session_tls_hs_cb);
+ if (ret == kr_error(EAGAIN)) {
+ iorequest_release(worker, req);
+ io_start_read(session->handle);
+ return;
+ }
+ }
+
+ if (ret == kr_ok()) {
+ ret = session_next_waiting_send(session);
+ if (ret == kr_ok()) {
+ worker_add_tcp_connected(worker, &session->peer.ip, session);
+ iorequest_release(worker, req);
+ return;
+ }
}
- /* Update statistics */
- if (handle != task->source.handle && addr) {
- if (handle->type == UV_UDP)
- task->worker->stats.udp += 1;
- else
- task->worker->stats.tcp += 1;
- if (addr->sa_family == AF_INET6)
- task->worker->stats.ipv6 += 1;
- else
- task->worker->stats.ipv4 += 1;
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
}
- return ret;
+
+ assert(session->tasks.len == 0);
+
+ iorequest_release(worker, req);
+ session_close(session);
}
- static void on_connect(uv_connect_t *req, int status)
+ static void on_tcp_connect_timeout(uv_timer_t *timer)
{
+ struct session *session = timer->data;
+
+ uv_timer_stop(timer);
struct worker_ctx *worker = get_worker();
- struct qr_task *task = req->data;
- uv_stream_t *handle = req->handle;
- if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
- if (status == 0) {
- struct sockaddr_storage addr;
- int addr_len = sizeof(addr);
- uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addr_len);
- qr_task_send(task, (uv_handle_t *)handle, (struct sockaddr *)&addr, task->pktbuf);
- } else {
- qr_task_step(task, task->addrlist, NULL);
- }
+
+ assert (session->waiting.len == session->tasks.len);
+
+ union inaddr *peer = &session->peer;
+ worker_del_tcp_waiting(worker, &peer->ip);
+
- WITH_VERBOSE {
++ struct kr_query *qry = session_current_query(session);
++ WITH_VERBOSE (qry) {
+ char addr_str[INET6_ADDRSTRLEN];
+ inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str));
- VERBOSE_MSG(NULL, "=> connection to '%s' failed\n", addr_str);
++ VERBOSE_MSG(qry, "=> connection to '%s' failed\n", addr_str);
}
- qr_task_unref(task);
- req_release(worker, (struct req *)req);
+
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ struct request_ctx *ctx = task->ctx;
+ assert(ctx);
+ task->timeouts += 1;
+ worker->stats.timeout += 1;
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ assert(task->refs > 1);
+ qr_task_unref(task);
+ qr_task_step(task, NULL, NULL);
+ }
+
+ assert (session->tasks.len == 0);
+ session_close(session);
}
- static void on_timer_close(uv_handle_t *handle)
+ static void on_tcp_watchdog_timeout(uv_timer_t *timer)
{
- struct qr_task *task = handle->data;
- req_release(task->worker, (struct req *)handle);
- qr_task_unref(task);
+ struct session *session = timer->data;
+
+ assert(session->outgoing);
+ uv_timer_stop(timer);
+ struct worker_ctx *worker = get_worker();
+
+ if (session->outgoing) {
+ worker_del_tcp_connected(worker, &session->peer.ip);
+
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ task->timeouts += 1;
+ worker->stats.timeout += 1;
+ array_del(session->waiting, 0);
+ session_del_tasks(session, task);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
+ }
+ }
+
+ while (session->tasks.len > 0) {
+ struct qr_task *task = session->tasks.at[0];
+ task->timeouts += 1;
+ worker->stats.timeout += 1;
+ assert(task->refs > 1);
+ array_del(session->tasks, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
+ }
+
+ session_close(session);
}
/* This is called when I/O timeouts */
* @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
*/
subreq_lead(task);
- } else {
- uv_connect_t *conn = (uv_connect_t *)req_borrow(task->worker);
- if (!conn) {
- return qr_task_step(task, NULL, NULL);
+ struct session *session = handle->data;
+ assert(session->handle->type == UV_UDP);
+ ret = timer_start(session, on_retransmit, timeout, 0);
+ /* Start next step with timeout, fatal if can't start a timer. */
+ if (ret != 0) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
}
+ } else {
+ assert (sock_type == SOCK_STREAM);
const struct sockaddr *addr =
packet_source ? packet_source : task->addrlist;
- uv_handle_t *client = ioreq_spawn(task, sock_type, addr->sa_family);
- if (!client) {
- req_release(task->worker, (struct req *)conn);
- return qr_task_step(task, NULL, NULL);
- }
- conn->data = task;
- if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) {
- req_release(task->worker, (struct req *)conn);
- return qr_task_step(task, NULL, NULL);
+ if (addr->sa_family == AF_UNSPEC) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
}
- qr_task_ref(task); /* Connect request borrows task */
- ret = timer_start(task, on_timeout, KR_CONN_RTT_MAX, 0);
- }
+ struct session* session = NULL;
+ if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
+ assert(session->outgoing);
+ if (session->closing) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ /* There are waiting tasks.
+ * It means that connection establishing or data sending
+ * is coming right now. */
+ /* Task will be notified in on_connect() or qr_task_on_send(). */
+ ret = session_add_waiting(session, task);
+ if (ret < 0) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ ret = session_add_tasks(session, task);
+ if (ret < 0) {
+ session_del_waiting(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
+ /* Connection has been already established */
+ assert(session->outgoing);
+ if (session->closing) {
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
- /* Start next step with timeout, fatal if can't start a timer. */
- if (ret != 0) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
+ if (session->tasks.len >= worker->tcp_pipeline_max) {
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
+ /* will be removed in qr_task_on_send() */
+ ret = session_add_waiting(session, task);
+ if (ret < 0) {
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ ret = session_add_tasks(session, task);
+ if (ret < 0) {
+ session_del_waiting(session, task);
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ if (session->waiting.len == 1) {
+ ret = qr_task_send(task, session->handle,
+ &session->peer.ip, task->pktbuf);
+ if (ret < 0) {
+ session_del_waiting(session, task);
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ ret = timer_start(session, on_tcp_watchdog_timeout,
+ KR_CONN_RTT_MAX, 0);
+ if (ret < 0) {
+ assert(false);
+ session_del_waiting(session, task);
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ }
+ task->pending[task->pending_count] = session->handle;
+ task->pending_count += 1;
+ } else {
+ /* Make connection */
+ uv_connect_t *conn = (uv_connect_t *)iorequest_borrow(ctx->worker);
+ if (!conn) {
+ return qr_task_step(task, NULL, NULL);
+ }
+ uv_handle_t *client = ioreq_spawn(task, sock_type,
+ addr->sa_family);
+ if (!client) {
+ iorequest_release(ctx->worker, conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ session = client->data;
+ ret = worker_add_tcp_waiting(ctx->worker, addr, session);
+ if (ret < 0) {
+ session_del_tasks(session, task);
+ iorequest_release(ctx->worker, conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ /* will be removed in qr_task_on_send() */
+ ret = session_add_waiting(session, task);
+ if (ret < 0) {
+ session_del_tasks(session, task);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ iorequest_release(ctx->worker, conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
+ /* Check if there must be TLS */
+ struct engine *engine = ctx->worker->engine;
+ struct network *net = &engine->net;
+ const char *key = tcpsess_key(addr);
+ struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
+ if (entry) {
+ assert(session->tls_client_ctx == NULL);
+ struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry);
+ if (!tls_ctx) {
+ session_del_tasks(session, task);
+ session_del_waiting(session, task);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ iorequest_release(ctx->worker, conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_step(task, NULL, NULL);
+ }
+ tls_client_ctx_set_params(tls_ctx, entry, session);
+ session->tls_client_ctx = tls_ctx;
+ session->has_tls = true;
+ }
+
+ conn->data = session;
+ memcpy(&session->peer, addr, sizeof(session->peer));
+
+ ret = timer_start(session, on_tcp_connect_timeout,
+ KR_CONN_RTT_MAX, 0);
+ if (ret != 0) {
+ session_del_tasks(session, task);
+ session_del_waiting(session, task);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ iorequest_release(ctx->worker, conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
- WITH_VERBOSE {
++ struct kr_query *qry = session_current_query(session);
++ WITH_VERBOSE (qry) {
+ char addr_str[INET6_ADDRSTRLEN];
+ inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), addr_str, sizeof(addr_str));
- VERBOSE_MSG(NULL, "=> connecting to: '%s'\n", addr_str);
++ VERBOSE_MSG(qry, "=> connecting to: '%s'\n", addr_str);
+ }
+
+ if (uv_tcp_connect(conn, (uv_tcp_t *)client,
+ addr , on_connect) != 0) {
+ uv_timer_stop(&session->timeout);
+ session_del_tasks(session, task);
+ session_del_waiting(session, task);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ iorequest_release(ctx->worker, conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_step(task, NULL, NULL);
+ }
+ }
}
- return 0;
+ return kr_ok();
}
static int parse_packet(knot_pkt_t *query)
if (len <= 0 || !msg) {
/* If we have pending tasks, we must dissociate them from the
* connection so they don't try to access closed and freed handle.
- * @warning Do not modify task if this is outgoing request as it is shared with originator.
+ * @warning Do not modify task if this is outgoing request
+ * as it is shared with originator.
*/
- if (!session->outgoing) {
- for (size_t i = 0; i < session->tasks.len; ++i) {
- struct qr_task *task = session->tasks.at[i];
- task->session = NULL;
- task->source.handle = NULL;
- WITH_VERBOSE {
++ struct kr_query *qry = session_current_query(session);
++ WITH_VERBOSE (qry) {
+ char addr_str[INET6_ADDRSTRLEN];
+ inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
+ addr_str, sizeof(addr_str));
- VERBOSE_MSG(NULL, "=> connection to '%s' closed by peer\n", addr_str);
++ VERBOSE_MSG(qry, "=> connection to '%s' closed by peer\n", addr_str);
+ }
+ uv_timer_t *timer = &session->timeout;
+ uv_timer_stop(timer);
+ struct sockaddr *peer = &session->peer.ip;
+ worker_del_tcp_connected(worker, peer);
+ session->connected = false;
+
+ if (session->tls_client_ctx) {
+ /* Avoid gnutls_bye() call */
+ tls_client_set_hs_state(session->tls_client_ctx,
+ TLS_HS_NOT_STARTED);
+ }
+
+ if (session->outgoing && session->buffering) {
+ session->buffering = NULL;
+ }
+
+ assert(session->tasks.len >= session->waiting.len);
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ array_del(session->waiting, 0);
+ assert(task->refs > 1);
+ session_del_tasks(session, task);
+ if (session->outgoing) {
+ if (task->ctx->req.options.FORWARD) {
+ /* We are in TCP_FORWARD mode.
+ * To prevent failing at kr_resolve_consume()
+ * qry.flags.TCP must be cleared.
+ * TODO - refactoring is needed. */
+ struct kr_request *req = &task->ctx->req;
+ struct kr_rplan *rplan = &req->rplan;
+ struct kr_query *qry = array_tail(rplan->pending);
+ qry->flags.TCP = false;
+ }
+ qr_task_step(task, NULL, NULL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
+ qr_task_unref(task);
+ }
+ while (session->tasks.len > 0) {
+ struct qr_task *task = session->tasks.at[0];
+ if (session->outgoing) {
+ if (task->ctx->req.options.FORWARD) {
+ struct kr_request *req = &task->ctx->req;
+ struct kr_rplan *rplan = &req->rplan;
+ struct kr_query *qry = array_tail(rplan->pending);
+ qry->flags.TCP = false;
+ }
+ qr_task_step(task, NULL, NULL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
}
- session->tasks.len = 0;
+ session_del_tasks(session, task);
+ }
+ session_close(session);
+ return kr_ok();
+ }
+
+ if (session->outgoing) {
+ uv_timer_stop(&session->timeout);
+ timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+ }
+
+ if (session->bytes_to_skip) {
+ assert(session->buffering == NULL);
+ ssize_t min_len = MIN(session->bytes_to_skip, len);
+ len -= min_len;
+ msg += min_len;
+ session->bytes_to_skip -= min_len;
+ if (len < 0 || session->bytes_to_skip < 0) {
+ /* Something gone wrong.
+ * Better kill the connection */
+ assert(false);
+ return kr_error(EILSEQ);
+ }
+ if (len == 0) {
+ return kr_ok();
}
- return kr_error(ECONNRESET);
+ assert(session->bytes_to_skip == 0);
}
int submitted = 0;
struct qr_task *task = session->buffering;
+ knot_pkt_t *pkt_buf = NULL;
+ if (task) {
+ pkt_buf = task->pktbuf;
+ } else {
+ /* Update DNS header in session->msg_hdr* */
+ assert(session->msg_hdr_idx <= sizeof(session->msg_hdr));
+ ssize_t hdr_amount = sizeof(session->msg_hdr) -
+ session->msg_hdr_idx;
+ if (hdr_amount > len) {
+ hdr_amount = len;
+ }
+ if (hdr_amount > 0) {
+ memcpy(session->msg_hdr + session->msg_hdr_idx, msg, hdr_amount);
+ session->msg_hdr_idx += hdr_amount;
+ len -= hdr_amount;
+ msg += hdr_amount;
+ }
+ if (len == 0) { /* no data beyond msg_hdr -> not much to do */
+ return kr_ok();
+ }
+ assert(session->msg_hdr_idx == sizeof(session->msg_hdr));
+ session->msg_hdr_idx = 0;
+ uint16_t msg_size = get_msg_size(session->msg_hdr);
+ uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2);
+ if (msg_size < KNOT_WIRE_HEADER_SIZE) {
+ /* better kill the connection; we would probably get out of sync */
+ uv_timer_t *timer = &session->timeout;
+ uv_timer_stop(timer);
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ if (session->outgoing) {
+ qr_task_finalize(task, KR_STATE_FAIL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
+ array_del(session->waiting, 0);
+ session_del_tasks(session, task);
+ qr_task_unref(task);
+ }
+ while (session->tasks.len > 0) {
+ struct qr_task *task = session->tasks.at[0];
+ if (session->outgoing) {
+ qr_task_finalize(task, KR_STATE_FAIL);
+ } else {
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
+ session_del_tasks(session, task);
+ }
+ session_close(session);
- /* If this is a new query, create a new task that we can use
- * to buffer incoming message until it's complete. */
- if (!session->outgoing) {
- if (!task) {
- /* Get TCP peer name, keep zeroed address if it fails. */
- struct sockaddr_storage addr;
- memset(&addr, 0, sizeof(addr));
- int addr_len = sizeof(addr);
- uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addr_len);
- task = qr_task_create(worker, (uv_handle_t *)handle, (struct sockaddr *)&addr);
+ return kr_ok();
+ }
+
+ /* get task */
+ if (!session->outgoing) {
+ /* This is a new query, create a new task that we can use
+ * to buffer incoming message until it's complete. */
+ struct sockaddr *addr = &(session->peer.ip);
+ assert(addr->sa_family != AF_UNSPEC);
+ struct request_ctx *ctx = request_create(worker,
+ (uv_handle_t *)handle,
+ addr);
+ if (!ctx) {
+ assert(false);
+ return kr_error(ENOMEM);
+ }
+ task = qr_task_create(ctx);
if (!task) {
+ assert(false);
+ request_free(ctx);
return kr_error(ENOMEM);
}
- session->buffering = task;
+ } else {
+ /* Start of response from upstream.
+ * The session task list must contain a task
+ * with the same msg id. */
+ task = find_task(session, msg_id);
+ /* FIXME: on high load over one connection, it's likely
+ * that we will get multiple matches sooner or later (!) */
+ if (task) {
+ knot_pkt_clear(task->pktbuf);
+ assert(task->leading == false);
+ } else {
+ session->bytes_to_skip = msg_size - 2;
+ ssize_t min_len = MIN(session->bytes_to_skip, len);
+ len -= min_len;
+ msg += min_len;
+ session->bytes_to_skip -= min_len;
+ if (len < 0 || session->bytes_to_skip < 0) {
+ /* Something gone wrong.
+ * Better kill the connection */
+ assert(false);
+ return kr_error(EILSEQ);
+ }
+ if (len == 0) {
+ return submitted;
+ }
+ assert(session->bytes_to_skip == 0);
+ int ret = worker_process_tcp(worker, handle, msg, len);
+ if (ret < 0) {
+ submitted = ret;
+ } else {
+ submitted += ret;
+ }
+ return submitted;
+ }
}
- } else {
- assert(session->tasks.len > 0);
- task = array_tail(session->tasks);
+
+ pkt_buf = task->pktbuf;
+ knot_wire_set_id(pkt_buf->wire, msg_id);
+ pkt_buf->size = 2;
+ task->bytes_remaining = msg_size - 2;
+ assert(session->buffering == NULL);
+ session->buffering = task;
}
- /* At this point session must have either created new task or it's already assigned. */
+ /* At this point session must have either created new task
+ * or it's already assigned. */
assert(task);
assert(len > 0);
- /* Start reading DNS/TCP message length */
- knot_pkt_t *pkt_buf = task->pktbuf;
- if (task->bytes_remaining == 0 && pkt_buf->size == 0) {
- knot_pkt_clear(pkt_buf);
- /* Make sure we can process maximum packet sizes over TCP for outbound queries.
- * Previous packet is allocated with mempool, so there's no need to free it manually. */
- if (session->outgoing && pkt_buf->max_size < KNOT_WIRE_MAX_PKTSIZE) {
- pkt_buf = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
- if (!pkt_buf) {
- return kr_error(ENOMEM);
- }
- task->pktbuf = pkt_buf;
- }
- /* Read only one byte as TCP fragment may end at a 1B boundary
- * which would lead to OOB read or improper reassembly length. */
- pkt_buf->size = 1;
- pkt_buf->wire[0] = msg[0];
- len -= 1;
- msg += 1;
- if (len == 0) {
- return 0;
- }
- }
- /* Finish reading DNS/TCP message length. */
- if (task->bytes_remaining == 0 && pkt_buf->size == 1) {
- pkt_buf->wire[1] = msg[0];
- ssize_t nbytes = msg_size(pkt_buf->wire);
- len -= 1;
- msg += 1;
- /* Cut off fragment length and start reading DNS message. */
- pkt_buf->size = 0;
- task->bytes_remaining = nbytes;
- }
++
/* Message is too long, can't process it. */
ssize_t to_read = MIN(len, task->bytes_remaining);
if (pkt_buf->size + to_read > pkt_buf->max_size) {
return submitted;
}
-int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query,
- struct kr_qflags options, worker_cb_t on_complete,
- void *baton)
+struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options)
{
if (!worker || !query) {
- assert(false);
- return kr_error(EINVAL);
+ return NULL;
}
- struct qr_task *task = qr_task_create(worker, NULL, NULL);
+ struct request_ctx *ctx = request_create(worker, NULL, NULL);
+ if (!ctx) {
- return kr_error(ENOMEM);
++ return NULL;
+ }
+
+ /* Create task */
+ struct qr_task *task = qr_task_create(ctx);
if (!task) {
- return kr_error(ENOMEM);
+ request_free(ctx);
+ return NULL;
}
- task->baton = baton;
- task->on_complete = on_complete;
+
- int ret = qr_task_start(task, query);
+ /* Start task */
+ int ret = request_start(ctx, query);
+ if (ret != 0) {
++ request_free(ctx);
+ qr_task_unref(task);
+ return NULL;
+ }
/* Set options late, as qr_task_start() -> kr_resolve_begin() rewrite it. */
- kr_qflags_set(&task->req.options, options);
+ kr_qflags_set(&task->ctx->req.options, options);
+ return task;
+}
- if (ret != 0) {
- request_free(ctx);
- qr_task_unref(task);
- return ret;
+int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query)
+{
+ if (!task) {
+ return kr_error(EINVAL);
}
return qr_task_step(task, NULL, query);
}
- int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options)
++struct kr_request *worker_task_request(struct qr_task *task)
+{
- if (!worker || !query) {
- return kr_error(EINVAL);
++ if (!task || !task->ctx) {
++ return NULL;
+ }
+
- /* Create task */
- struct qr_task *task = worker_resolve_start(worker, query, options);
- if (!task) {
- return kr_error(ENOMEM);
- }
++ return &task->ctx->req;
++}
+
- return worker_resolve_exec(task, query);
+ void worker_session_close(struct session *session)
+ {
+ session_close(session);
}
/** Reserve worker buffers */