--- /dev/null
+#include <assert.h>
+
+#include <libknot/packet/pkt.h>
+
+#include "lib/defines.h"
+#include "daemon/session.h"
+#include "daemon/engine.h"
+#include "daemon/tls.h"
+#include "daemon/worker.h"
+#include "daemon/io.h"
+
+/** List of tasks. */
+typedef array_t(struct qr_task *) session_tasklist_t;
+
+struct session_flags {
+ bool outgoing : 1; /**< True: to upstream; false: from a client. */
+ bool throttled : 1; /**< True: data reading from peer is temporarily stopped. */
+ bool has_tls : 1; /**< True: given session uses TLS. */
+ bool connected : 1; /**< True: TCP connection is established. */
+ bool closing : 1; /**< True: session close sequence is in progress. */
+ bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */
+};
+
+
+/* Per-session (TCP or UDP) persistent structure,
+ * that exists between remote counterpart and a local socket.
+ */
+struct session {
+ struct session_flags sflags; /**< miscellaneous flags. */
+ union inaddr peer; /**< address of peer; is not set for client's UDP sessions. */
+ uv_handle_t *handle; /**< libuv handle for IO operations. */
+ uv_timer_t timeout; /**< libuv handle for timer. */
+
+ struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */
+ struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
+
+ session_tasklist_t tasks; /**< list of tasks which assotiated with given session. */
+ session_tasklist_t waiting; /**< list of tasks been waiting for IO (subset of taska). */
+
+ uint8_t *wire_buf; /**< Buffer for DNS message. */
+ ssize_t wire_buf_size; /**< Buffer size. */
+ ssize_t wire_buf_idx; /**< The number of bytes in wire_buf filled so far. */
+};
+
+static void on_session_close(uv_handle_t *handle)
+{
+ uv_loop_t *loop = handle->loop;
+ struct worker_ctx *worker = loop->data;
+ struct session *session = handle->data;
+ assert(session->handle == handle);
+ io_deinit(handle);
+ worker_iohandle_release(worker, handle);
+}
+
+static void on_session_timer_close(uv_handle_t *timer)
+{
+ struct session *session = timer->data;
+ uv_handle_t *handle = session->handle;
+ assert(handle && handle->data == session);
+ assert (session->sflags.outgoing || handle->type == UV_TCP);
+ if (!uv_is_closing(handle)) {
+ uv_close(handle, on_session_close);
+ }
+}
+
+void session_free(struct session *s)
+{
+ if (s) {
+ assert(s->tasks.len == 0 && s->waiting.len == 0);
+ session_clear(s);
+ free(s);
+ }
+}
+
+void session_clear(struct session *s)
+{
+ assert(s->tasks.len == 0 && s->waiting.len == 0);
+ if (s->handle && s->handle->type == UV_TCP) {
+ free(s->wire_buf);
+ }
+ array_clear(s->tasks);
+ array_clear(s->waiting);
+ tls_free(s->tls_ctx);
+ tls_client_ctx_free(s->tls_client_ctx);
+ memset(s, 0, sizeof(*s));
+}
+
+struct session *session_new(void)
+{
+ return calloc(1, sizeof(struct session));
+}
+
+void session_close(struct session *session)
+{
+ assert(session->tasks.len == 0 && session->waiting.len == 0);
+
+ if (session->sflags.closing) {
+ return;
+ }
+
+ uv_handle_t *handle = session->handle;
+ io_stop_read(handle);
+ session->sflags.closing = true;
+ if (session->sflags.outgoing &&
+ session->peer.ip.sa_family != AF_UNSPEC) {
+ struct worker_ctx *worker = handle->loop->data;
+ struct sockaddr *peer = &session->peer.ip;
+ worker_del_tcp_connected(worker, peer);
+ session->sflags.connected = false;
+ }
+
+ if (!uv_is_closing((uv_handle_t *)&session->timeout)) {
+ uv_timer_stop(&session->timeout);
+ if (session->tls_client_ctx) {
+ tls_close(&session->tls_client_ctx->c);
+ }
+ if (session->tls_ctx) {
+ tls_close(&session->tls_ctx->c);
+ }
+
+ session->timeout.data = session;
+ uv_close((uv_handle_t *)&session->timeout, on_session_timer_close);
+ }
+}
+
+int session_start_read(struct session *session)
+{
+ return io_start_read(session->handle);
+}
+
+int session_waitinglist_add(struct session *session, struct qr_task *task)
+{
+ for (int i = 0; i < session->waiting.len; ++i) {
+ if (session->waiting.at[i] == task) {
+ return i;
+ }
+ }
+ int ret = array_push(session->waiting, task);
+ if (ret >= 0) {
+ worker_task_ref(task);
+ }
+ return ret;
+}
+
+int session_waitinglist_del(struct session *session, struct qr_task *task)
+{
+ int ret = kr_error(ENOENT);
+ for (int i = 0; i < session->waiting.len; ++i) {
+ if (session->waiting.at[i] == task) {
+ array_del(session->waiting, i);
+ worker_task_unref(task);
+ ret = kr_ok();
+ break;
+ }
+ }
+ return ret;
+}
+
+int session_waitinglist_del_index(struct session *session, int index)
+{
+ int ret = kr_error(ENOENT);
+ if (index < session->waiting.len) {
+ struct qr_task *task = session->waiting.at[index];
+ array_del(session->waiting, index);
+ worker_task_unref(task);
+ ret = kr_ok();
+ }
+ return ret;
+}
+
+int session_tasklist_add(struct session *session, struct qr_task *task)
+{
+ for (int i = 0; i < session->tasks.len; ++i) {
+ if (session->tasks.at[i] == task) {
+ return i;
+ }
+ }
+ int ret = array_push(session->tasks, task);
+ if (ret >= 0) {
+ worker_task_ref(task);
+ }
+ return ret;
+}
+
+int session_tasklist_del(struct session *session, struct qr_task *task)
+{
+ int ret = kr_error(ENOENT);
+ for (int i = 0; i < session->tasks.len; ++i) {
+ if (session->tasks.at[i] == task) {
+ array_del(session->tasks, i);
+ worker_task_unref(task);
+ ret = kr_ok();
+ break;
+ }
+ }
+ return ret;
+}
+
+int session_tasklist_del_index(struct session *session, int index)
+{
+ int ret = kr_error(ENOENT);
+ if (index < session->tasks.len) {
+ struct qr_task *task = session->tasks.at[index];
+ array_del(session->tasks, index);
+ worker_task_unref(task);
+ ret = kr_ok();
+ }
+ return ret;
+}
+
+struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id)
+{
+ struct qr_task *ret = NULL;
+ const session_tasklist_t *tasklist = &session->tasks;
+ for (size_t i = 0; i < tasklist->len; ++i) {
+ struct qr_task *task = tasklist->at[i];
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ uint16_t task_msg_id = knot_wire_get_id(pktbuf->wire);
+ if (task_msg_id == msg_id) {
+ ret = task;
+ break;
+ }
+ }
+ return ret;
+}
+
+bool session_is_outgoing(const struct session *session)
+{
+ return session->sflags.outgoing;
+}
+
+void session_set_outgoing(struct session *session, bool outgoing)
+{
+ session->sflags.outgoing = outgoing;
+}
+
+bool session_is_closing(const struct session *session)
+{
+ return session->sflags.closing;
+}
+
+void session_set_closing(struct session *session, bool closing)
+{
+ session->sflags.closing = closing;
+}
+
+bool session_is_connected(const struct session *session)
+{
+ return session->sflags.connected;
+}
+
+void session_set_connected(struct session *session, bool connected)
+{
+ session->sflags.connected = connected;
+}
+
+bool session_is_throttled(const struct session *session)
+{
+ return session->sflags.throttled;
+}
+
+void session_set_throttled(struct session *session, bool throttled)
+{
+ session->sflags.throttled = throttled;
+}
+
+struct sockaddr *session_get_peer(struct session *session)
+{
+ return &session->peer.ip;
+}
+
+struct tls_ctx_t *session_tls_get_server_ctx(const struct session *session)
+{
+ return session->tls_ctx;
+}
+
+void session_tls_set_server_ctx(struct session *session, struct tls_ctx_t *ctx)
+{
+ session->tls_ctx = ctx;
+}
+
+struct tls_client_ctx_t *session_tls_get_client_ctx(const struct session *session)
+{
+ return session->tls_client_ctx;
+}
+
+void session_tls_set_client_ctx(struct session *session, struct tls_client_ctx_t *ctx)
+{
+ session->tls_client_ctx = ctx;
+}
+
+struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session)
+{
+ struct tls_common_ctx *tls_ctx = session->sflags.outgoing ? &session->tls_client_ctx->c :
+ &session->tls_ctx->c;
+ return tls_ctx;
+}
+
+uv_handle_t *session_get_handle(struct session *session)
+{
+ return session->handle;
+}
+
+int session_set_handle(struct session *session, uv_handle_t *h)
+{
+ if (!h) {
+ return kr_error(EINVAL);
+ }
+
+ if (h->type == UV_TCP) {
+ uint8_t *wire_buf = malloc(KNOT_WIRE_MAX_PKTSIZE);
+ if (!wire_buf) {
+ return kr_error(ENOMEM);
+ }
+ session->wire_buf = wire_buf;
+ session->wire_buf_size = KNOT_WIRE_MAX_PKTSIZE;
+ } else if (h->type == UV_UDP) {
+ assert(h->loop->data);
+ struct worker_ctx *worker = h->loop->data;
+ session->wire_buf = worker->wire_buf;
+ session->wire_buf_size = sizeof(worker->wire_buf);
+ }
+
+ session->handle = h;
+ h->data = session;
+ return kr_ok();
+}
+
+uv_timer_t *session_get_timer(struct session *session)
+{
+ return &session->timeout;
+}
+
+size_t session_tasklist_get_len(const struct session *session)
+{
+ return session->tasks.len;
+}
+
+size_t session_waitinglist_get_len(const struct session *session)
+{
+ return session->waiting.len;
+}
+
+bool session_tasklist_is_empty(const struct session *session)
+{
+ return session_tasklist_get_len(session) == 0;
+}
+
+bool session_waitinglist_is_empty(const struct session *session)
+{
+ return session_waitinglist_get_len(session) == 0;
+}
+
+bool session_is_empty(const struct session *session)
+{
+ return session_tasklist_is_empty(session) &&
+ session_waitinglist_is_empty(session);
+}
+
+bool session_has_tls(const struct session *session)
+{
+ return session->sflags.has_tls;
+}
+
+void session_set_has_tls(struct session *session, bool has_tls)
+{
+ session->sflags.has_tls = has_tls;
+}
+
+struct qr_task *session_waitinglist_get_first(const struct session *session)
+{
+ struct qr_task *t = NULL;
+ if (session->waiting.len > 0) {
+ t = session->waiting.at[0];
+ }
+ return t;
+}
+
+struct qr_task *session_tasklist_get_first(const struct session *session)
+{
+ struct qr_task *t = NULL;
+ if (session->tasks.len > 0) {
+ t = session->tasks.at[0];
+ }
+ return t;
+}
+
+void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt)
+{
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ session_tasklist_del(session, task);
+ array_del(session->waiting, 0);
+ assert(worker_task_numrefs(task) > 1);
+ if (increase_timeout_cnt) {
+ worker_task_timeout_inc(task);
+ }
+ worker_task_unref(task);
+ worker_task_step(task, NULL, NULL);
+ }
+}
+
+void session_waitinglist_finalize(struct session *session, int status)
+{
+ while (session->waiting.len > 0) {
+ struct qr_task *t = session->waiting.at[0];
+ array_del(session->waiting, 0);
+ session_tasklist_del(session, t);
+ if (session->sflags.outgoing) {
+ worker_task_finalize(t, status);
+ } else {
+ struct request_ctx *ctx = worker_task_get_request(t);
+ assert(worker_request_get_source_session(ctx) == session);
+ worker_request_set_source_session(ctx, NULL);
+ }
+ worker_task_unref(t);
+ }
+}
+
+void session_tasklist_finalize(struct session *session, int status)
+{
+ while (session->tasks.len > 0) {
+ struct qr_task *t = session->tasks.at[0];
+ array_del(session->tasks, 0);
+ if (session->sflags.outgoing) {
+ worker_task_finalize(t, status);
+ } else {
+ struct request_ctx *ctx = worker_task_get_request(t);
+ assert(worker_request_get_source_session(ctx) == session);
+ worker_request_set_source_session(ctx, NULL);
+ }
+ worker_task_unref(t);
+ }
+}
+
+void session_tasks_finalize(struct session *session, int status)
+{
+ session_waitinglist_finalize(session, status);
+ session_tasklist_finalize(session, status);
+}
+
+int session_timer_start(struct session *session, uv_timer_cb cb,
+ uint64_t timeout, uint64_t repeat)
+{
+ uv_timer_t *timer = &session->timeout;
+ assert(timer->data == session);
+ int ret = uv_timer_start(timer, cb, timeout, repeat);
+ if (ret != 0) {
+ uv_timer_stop(timer);
+ return kr_error(ENOMEM);
+ }
+ return 0;
+}
+
+int session_timer_restart(struct session *session)
+{
+ return uv_timer_again(&session->timeout);
+}
+
+int session_timer_stop(struct session *session)
+{
+ return uv_timer_stop(&session->timeout);
+}
+
+ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len)
+{
+ if (data != &session->wire_buf[session->wire_buf_idx]) {
+ /* shouldn't happen */
+ return kr_error(EINVAL);
+ }
+
+ if (session->wire_buf_idx + len > session->wire_buf_size) {
+ /* shouldn't happen */
+ return kr_error(EINVAL);
+ }
+
+ session->wire_buf_idx += len;
+ return len;
+}
+
+knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
+{
+ if (session->wire_buf_idx == 0) {
+ session->sflags.wirebuf_error = false;
+ return NULL;
+ }
+
+ const uv_handle_t *handle = session->handle;
+ uint8_t *msg_start = session->wire_buf;
+ uint16_t msg_size = session->wire_buf_idx;
+
+ session->sflags.wirebuf_error = true;
+ if (!handle) {
+ return NULL;
+ } else if (handle->type == UV_TCP) {
+ if (session->wire_buf_idx < 2) {
+ session->sflags.wirebuf_error = false;
+ return NULL;
+ }
+ msg_size = knot_wire_read_u16(session->wire_buf);
+ if (msg_size + 2 > session->wire_buf_idx) {
+ session->sflags.wirebuf_error = false;
+ return NULL;
+ }
+ msg_start += 2;
+ }
+
+ knot_pkt_t *pkt = knot_pkt_new(msg_start, msg_size, mm);
+ if (pkt) {
+ session->sflags.wirebuf_error = false;
+ }
+ return pkt;
+}
+
+int session_discard_packet(struct session *session, const knot_pkt_t *pkt)
+{
+ uv_handle_t *handle = session->handle;
+ uint8_t *wirebuf_data_start = session->wire_buf;
+ size_t wirebuf_msg_data_size = session->wire_buf_idx;
+ uint8_t *wirebuf_msg_start = session->wire_buf;
+ size_t wirebuf_msg_size = session->wire_buf_idx;
+ uint8_t *pkt_msg_start = pkt->wire;
+ size_t pkt_msg_size = pkt->size;
+
+ session->sflags.wirebuf_error = true;
+ if (!handle) {
+ return kr_error(EINVAL);
+ } else if (handle->type == UV_TCP) {
+ if (session->wire_buf_idx < 2) {
+ return kr_error(EINVAL);
+ }
+ wirebuf_msg_size = knot_wire_read_u16(wirebuf_data_start);
+ wirebuf_msg_start += 2;
+ wirebuf_msg_data_size = wirebuf_msg_size + 2;
+ }
+
+ if (wirebuf_msg_start != pkt_msg_start || wirebuf_msg_size != pkt_msg_size) {
+ return kr_error(EINVAL);
+ }
+
+ if (wirebuf_msg_data_size > session->wire_buf_idx) {
+ return kr_error(EINVAL);
+ }
+
+ uint16_t wirebuf_data_amount = session->wire_buf_idx - wirebuf_msg_data_size;
+ if (wirebuf_data_amount) {
+ if (wirebuf_msg_data_size < wirebuf_data_amount) {
+ memmove(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
+ wirebuf_data_amount);
+ } else {
+ memcpy(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
+ wirebuf_data_amount);
+ }
+ }
+
+ session->wire_buf_idx = wirebuf_data_amount;
+ session->sflags.wirebuf_error = false;
+
+ return kr_ok();
+}
+
+bool session_wirebuf_error(struct session *session)
+{
+ return session->sflags.wirebuf_error;
+}
+
+uint8_t *session_wirebuf_get_start(struct session *session)
+{
+ return session->wire_buf;
+}
+
+size_t session_wirebuf_get_len(struct session *session)
+{
+ return session->wire_buf_idx;
+}
+
+size_t session_wirebuf_get_size(struct session *session)
+{
+ return sizeof(session->wire_buf);
+}
+
+uint8_t *session_wirebuf_get_free_start(struct session *session)
+{
+ return &session->wire_buf[session->wire_buf_idx];
+}
+
+size_t session_wirebuf_get_free_size(struct session *session)
+{
+ return session->wire_buf_size - session->wire_buf_idx;
+}
+
+void session_poison(struct session *session)
+{
+ kr_asan_poison(session, sizeof(*session));
+}
+
+void session_unpoison(struct session *session)
+{
+ kr_asan_unpoison(session, sizeof(*session));
+}
+
+int session_wirebuf_process(struct session *session)
+{
+ int ret = 0;
+ if (session->wire_buf_idx == 0) {
+ return ret;
+ }
+ struct worker_ctx *worker = session_get_handle(session)->loop->data;
+ knot_pkt_t *query = NULL;
+ while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) && (ret < 100)) {
+ worker_submit(session, query);
+ if (session_discard_packet(session, query) < 0) {
+ break;
+ }
+ ret += 1;
+ }
+ assert(ret < 100);
+ if (session_wirebuf_error(session)) {
+ ret = -1;
+ }
+ return ret;
+}
+
#include "daemon/io.h"
#include "daemon/tls.h"
#include "daemon/zimport.h"
+#include "daemon/session.h"
#define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt)
struct sockaddr *addr, knot_pkt_t *pkt);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
-static int worker_add_tcp_connected(struct worker_ctx *worker,
- const struct sockaddr *addr,
- struct session *session);
-static int worker_del_tcp_connected(struct worker_ctx *worker,
- const struct sockaddr *addr);
static struct session* worker_find_tcp_connected(struct worker_ctx *worker,
const struct sockaddr *addr);
static int worker_add_tcp_waiting(struct worker_ctx *worker,
const struct sockaddr *addr);
static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
const struct sockaddr *addr);
-static int session_add_waiting(struct session *session, struct qr_task *task);
-static int session_del_waiting(struct session *session, struct qr_task *task);
-static int session_add_tasks(struct session *session, struct qr_task *task);
-static int session_del_tasks(struct session *session, struct qr_task *task);
-static void session_close(struct session *session);
static void on_session_idle_timeout(uv_timer_t *timer);
-static int timer_start(struct session *session, uv_timer_cb cb,
- uint64_t timeout, uint64_t repeat);
static void on_tcp_connect_timeout(uv_timer_t *timer);
static void on_tcp_watchdog_timeout(uv_timer_t *timer);
/* Set current handle as a subrequest type. */
struct session *session = handle->data;
if (ret == 0) {
- session->outgoing = true;
- ret = session_add_tasks(session, task);
+ session_set_outgoing(session, true);
+ ret = session_tasklist_add(session, task);
}
if (ret < 0) {
io_deinit(handle);
return handle;
}
-static void on_session_close(uv_handle_t *handle)
-{
- uv_loop_t *loop = handle->loop;
- struct worker_ctx *worker = loop->data;
- struct session *session = handle->data;
- assert(session->handle == handle);
- session->handle = NULL;
- io_deinit(handle);
- iohandle_release(worker, handle);
-}
-
-static void on_session_timer_close(uv_handle_t *timer)
-{
- struct session *session = timer->data;
- uv_handle_t *handle = session->handle;
- assert(handle && handle->data == session);
- assert (session->outgoing || handle->type == UV_TCP);
- if (!uv_is_closing(handle)) {
- uv_close(handle, on_session_close);
- }
-}
-
static void ioreq_kill_udp(uv_handle_t *req, struct qr_task *task)
{
assert(req);
- struct session *session = req->data;
- assert(session->outgoing);
- if (session->closing) {
+ struct session *s = req->data;
+ assert(session_is_outgoing(s));
+ if (session_is_closing(s)) {
return;
}
- uv_timer_stop(&session->timeout);
- session_del_tasks(session, task);
- assert(session->tasks.len == 0);
- session_close(session);
+ uv_timer_t *t = session_get_timer(s);
+ uv_timer_stop(t);
+ session_tasklist_del(s, task);
+ assert(session_tasklist_is_empty(s));
+ session_close(s);
}
static void ioreq_kill_tcp(uv_handle_t *req, struct qr_task *task)
{
assert(req);
- struct session *session = req->data;
- assert(session->outgoing);
- if (session->closing) {
+ struct session *s = req->data;
+ assert(session_is_outgoing(s));
+ if (session_is_closing(s)) {
return;
}
- session_del_waiting(session, task);
- session_del_tasks(session, task);
+ session_waitinglist_del(s, task);
+ session_tasklist_del(s, task);
int res = 0;
- if (session->outgoing && session->peer.ip.sa_family != AF_UNSPEC &&
- session->tasks.len == 0 && session->waiting.len == 0 && !session->closing) {
- assert(session->peer.ip.sa_family == AF_INET ||
- session->peer.ip.sa_family == AF_INET6);
+ const struct sockaddr *peer = session_get_peer(s);
+ if (peer->sa_family != AF_UNSPEC && session_is_empty(s) && !session_is_closing(s)) {
+ assert(peer->sa_family == AF_INET || peer->sa_family == AF_INET6);
res = 1;
- if (session->connected) {
+ if (session_is_connected(s)) {
/* This is outbound TCP connection which can be reused.
* Close it after timeout */
- uv_timer_t *timer = &session->timeout;
- timer->data = session;
- uv_timer_stop(timer);
- res = uv_timer_start(timer, on_session_idle_timeout,
+ uv_timer_t *t = session_get_timer(s);
+ t->data = s;
+ uv_timer_stop(t);
+ res = uv_timer_start(t, on_session_idle_timeout,
KR_CONN_RTT_MAX, 0);
}
}
if (res != 0) {
/* if any errors, close the session immediately */
- session_close(session);
+ session_close(s);
}
}
task->pending_count = 0;
}
-static void session_close(struct session *session)
-{
- assert(session->tasks.len == 0 && session->waiting.len == 0);
-
- if (session->closing) {
- return;
- }
-
- if (!session->outgoing && session->buffering != NULL) {
- qr_task_complete(session->buffering);
- }
- session->buffering = NULL;
-
- uv_handle_t *handle = session->handle;
- io_stop_read(handle);
- session->closing = true;
- if (session->outgoing &&
- session->peer.ip.sa_family != AF_UNSPEC) {
- struct worker_ctx *worker = get_worker();
- struct sockaddr *peer = &session->peer.ip;
- worker_del_tcp_connected(worker, peer);
- session->connected = false;
- }
-
- if (!uv_is_closing((uv_handle_t *)&session->timeout)) {
- uv_timer_stop(&session->timeout);
- if (session->tls_client_ctx) {
- tls_close(&session->tls_client_ctx->c);
- }
- if (session->tls_ctx) {
- tls_close(&session->tls_ctx->c);
- }
-
- session->timeout.data = session;
- uv_close((uv_handle_t *)&session->timeout, on_session_timer_close);
- }
-}
-
-static int session_add_waiting(struct session *session, struct qr_task *task)
-{
- for (int i = 0; i < session->waiting.len; ++i) {
- if (session->waiting.at[i] == task) {
- return i;
- }
- }
- int ret = array_push(session->waiting, task);
- if (ret >= 0) {
- qr_task_ref(task);
- }
- return ret;
-}
-
-static int session_del_waiting(struct session *session, struct qr_task *task)
-{
- int ret = kr_error(ENOENT);
- for (int i = 0; i < session->waiting.len; ++i) {
- if (session->waiting.at[i] == task) {
- array_del(session->waiting, i);
- qr_task_unref(task);
- ret = kr_ok();
- break;
- }
- }
- return ret;
-}
-
-static int session_add_tasks(struct session *session, struct qr_task *task)
-{
- for (int i = 0; i < session->tasks.len; ++i) {
- if (session->tasks.at[i] == task) {
- return i;
- }
- }
- int ret = array_push(session->tasks, task);
- if (ret >= 0) {
- qr_task_ref(task);
- }
- return ret;
-}
-
-static int session_del_tasks(struct session *session, struct qr_task *task)
-{
- int ret = kr_error(ENOENT);
- for (int i = 0; i < session->tasks.len; ++i) {
- if (session->tasks.at[i] == task) {
- array_del(session->tasks, i);
- qr_task_unref(task);
- ret = kr_ok();
- break;
- }
- }
- return ret;
-}
-
/** @cond This memory layout is internal to mempool.c, use only for debugging. */
#if defined(__SANITIZE_ADDRESS__)
struct mempool_chunk {
/* TODO Relocate pool to struct request */
ctx->worker = worker;
array_init(ctx->tasks);
- struct session *session = handle ? handle->data : NULL;
- if (session) {
- assert(session->outgoing == false);
+ struct session *s = handle ? handle->data : NULL;
+ if (s) {
+ assert(session_is_outgoing(s) == false);
}
- ctx->source.session = session;
+ ctx->source.session = s;
struct kr_request *req = &ctx->req;
req->pool = pool;
struct kr_request *req = &ctx->req;
/* source.session can be empty if request was generated by kresd itself */
- if (!ctx->source.session ||
- ctx->source.session->handle->type == UV_TCP) {
+ struct session *s = ctx->source.session;
+ if (!s || session_get_handle(s)->type == UV_TCP) {
answer_max = KNOT_WIRE_MAX_PKTSIZE;
} else if (knot_pkt_has_edns(query)) { /* EDNS */
answer_max = MAX(knot_edns_get_payload(query->opt_rr),
return ret;
}
-
static struct qr_task *qr_task_create(struct request_ctx *ctx)
{
/* How much can client handle? */
if (!task) {
return NULL;
}
- memset(task, 0, sizeof(*task)); /* avoid accidentally unitialized fields */
+ memset(task, 0, sizeof(*task)); /* avoid accidentally unintialized fields */
/* Create packet buffers for answer and subrequests */
knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
assert(ctx);
/* Process outbound session. */
- struct session *source_session = ctx->source.session;
+ struct session *s = ctx->source.session;
struct worker_ctx *worker = ctx->worker;
/* Process source session. */
- if (source_session &&
- source_session->tasks.len < worker->tcp_pipeline_max/2 &&
- !source_session->closing && source_session->throttled) {
- uv_handle_t *handle = source_session->handle;
+ if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
+ !session_is_closing(s) && !session_is_throttled(s)) {
+ uv_handle_t *handle = session_get_handle(s);
/* Start reading again if the session is throttled and
* the number of outgoing requests is below watermark. */
if (handle) {
io_start_read(handle);
- source_session->throttled = false;
+ session_set_throttled(s, false);
}
}
/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session *session)
{
- assert(session->outgoing == false && session->handle->type == UV_TCP);
+ assert(session_is_outgoing(session) == false &&
+ session_get_handle(session)->type == UV_TCP);
- int ret = array_reserve(session->tasks, session->tasks.len + 1);
- if (ret != 0) {
+ int ret = session_tasklist_add(session, task);
+ if (ret < 0) {
return kr_error(ENOMEM);
}
- session_add_tasks(session, task);
-
struct request_ctx *ctx = task->ctx;
assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
ctx->source.session = session;
* an in effect shrink TCP window size. To get more precise throttling,
* we would need to copy remainder of the unread buffer and reassemble
* when resuming reading. This is NYI. */
- if (session->tasks.len >= task->ctx->worker->tcp_pipeline_max) {
- uv_handle_t *handle = session->handle;
- if (handle && !session->throttled && !session->closing) {
+ if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) {
+ uv_handle_t *handle = session_get_handle(session);
+ if (handle && !session_is_throttled(session) && !session_is_closing(session)) {
io_stop_read(handle);
- session->throttled = true;
+ session_set_throttled(session, true);
}
}
assert(task->waiting.len == 0);
assert(task->leading == false);
- struct session *source_session = ctx->source.session;
- if (source_session) {
- assert(source_session->outgoing == false &&
- source_session->waiting.len == 0);
- session_del_tasks(source_session, task);
+ struct session *s = ctx->source.session;
+ if (s) {
+ assert(!session_is_outgoing(s) && session_waitinglist_is_empty(s));
+ session_tasklist_del(s, task);
}
/* Release primary reference to task. */
if (!handle || handle->type != UV_TCP) {
return status;
}
- struct session* session = handle->data;
- assert(session);
- if (!session->outgoing ||
- session->waiting.len == 0) {
+ struct session* s = handle->data;
+ assert(s);
+ if (!session_is_outgoing(s) || session_waitinglist_is_empty(s)) {
return status;
}
}
if (handle) {
- struct session* session = handle->data;
- if (!session->outgoing && task->ctx->source.session) {
- assert (task->ctx->source.session->handle == handle);
+ struct session* s = handle->data;
+ bool outgoing = session_is_outgoing(s);
+ if (!outgoing) {
+ struct session* source_s = task->ctx->source.session;
+ if (source_s) {
+ assert (session_get_handle(source_s) == handle);
+ }
}
- if (handle->type == UV_TCP && session->outgoing &&
- session->waiting.len > 0) {
- session_del_waiting(session, task);
- if (session->closing) {
+ if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) {
+ session_waitinglist_del(s, task);
+ if (session_is_closing(s)) {
return status;
}
/* Finalize the task, if any errors.
* (for instance: tls; send->tls_push->too many non-critical errors->
* on_send with nonzero status->re-add to waiting->send->etc).*/
if (status != 0) {
- if (session->outgoing) {
+ if (outgoing) {
qr_task_finalize(task, KR_STATE_FAIL);
} else {
- assert(task->ctx->source.session == session);
+ assert(task->ctx->source.session == s);
task->ctx->source.session = NULL;
}
- session_del_tasks(session, task);
+ session_tasklist_del(s, task);
}
- if (session->waiting.len > 0) {
- struct qr_task *t = session->waiting.at[0];
- int ret = qr_task_send(t, handle, &session->peer.ip, t->pktbuf);
+ struct qr_task *waiting_task = session_waitinglist_get_first(s);
+ if (waiting_task) {
+ struct sockaddr *peer = session_get_peer(s);
+ knot_pkt_t *pkt = waiting_task->pktbuf;
+ int ret = qr_task_send(waiting_task, handle, peer, pkt);
if (ret != kr_ok()) {
- while (session->waiting.len > 0) {
- struct qr_task *t = session->waiting.at[0];
- if (session->outgoing) {
- qr_task_finalize(t, KR_STATE_FAIL);
- } else {
- assert(t->ctx->source.session == session);
- t->ctx->source.session = NULL;
- }
- array_del(session->waiting, 0);
- session_del_tasks(session, t);
- qr_task_unref(t);
- }
- while (session->tasks.len > 0) {
- struct qr_task *t = session->tasks.at[0];
- if (session->outgoing) {
- qr_task_finalize(t, KR_STATE_FAIL);
- } else {
- assert(t->ctx->source.session == session);
- t->ctx->source.session = NULL;
- }
- session_del_tasks(session, t);
- }
- session_close(session);
+ session_tasks_finalize(s, KR_STATE_FAIL);
+ session_close(s);
return status;
}
}
}
- if (!session->closing) {
+ if (!session_is_closing(s)) {
io_start_read(handle); /* Start reading new query */
}
}
/* Update statistics */
if (ctx->source.session &&
- handle != ctx->source.session->handle &&
+ handle != session_get_handle(ctx->source.session) &&
addr) {
if (session->has_tls)
worker->stats.tls += 1;
static int session_next_waiting_send(struct session *session)
{
- union inaddr *peer = &session->peer;
int ret = kr_ok();
- if (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf);
+ if (!session_waitinglist_is_empty(session)) {
+ struct sockaddr *peer = session_get_peer(session);
+ struct qr_task *task = session_waitinglist_get_first(session);
+ uv_handle_t *handle = session_get_handle(session);
+ ret = qr_task_send(task, handle, peer, task->pktbuf);
}
return ret;
}
static int session_tls_hs_cb(struct session *session, int status)
{
- struct worker_ctx *worker = get_worker();
- union inaddr *peer = &session->peer;
- int deletion_res = worker_del_tcp_waiting(worker, &peer->ip);
+ assert(session_is_outgoing(session));
+ uv_handle_t *handle = session_get_handle(session);
+ uv_loop_t *loop = handle->loop;
+ struct worker_ctx *worker = loop->data;
+ struct sockaddr *peer = session_get_peer(session);
+ int deletion_res = worker_del_tcp_waiting(worker, peer);
int ret = kr_ok();
if (status) {
- kr_nsrep_update_rtt(NULL, &peer->ip, KR_NS_DEAD,
+ kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
worker->engine->resolver.cache_rtt,
KR_NS_UPDATE_NORESET);
return ret;
}
/* handshake was completed successfully */
- struct tls_client_ctx_t *tls_client_ctx = session->tls_client_ctx;
+ struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
struct tls_client_paramlist_entry *tls_params = tls_client_ctx->params;
gnutls_session_t tls_session = tls_client_ctx->c.tls_session;
if (gnutls_session_is_resumed(tls_session) != 0) {
}
}
- ret = worker_add_tcp_connected(worker, &peer->ip, session);
+ ret = worker_add_tcp_connected(worker, peer, session);
if (deletion_res == kr_ok() && ret == kr_ok()) {
ret = session_next_waiting_send(session);
} else {
* Session isn't in the list of waiting sessions,
* or addition to the list of connected sessions failed,
* or write to upstream failed. */
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- session_del_tasks(session, task);
- array_del(session->waiting, 0);
- qr_task_finalize(task, KR_STATE_FAIL);
- qr_task_unref(task);
- }
- worker_del_tcp_connected(worker, &peer->ip);
- assert(session->tasks.len == 0);
+ session_waitinglist_finalize(session, KR_STATE_FAIL);
+ worker_del_tcp_connected(worker, peer);
+ assert(session_tasklist_is_empty(session));
session_close(session);
} else {
- uv_timer_stop(&session->timeout);
- session->timeout.data = session;
- timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+ uv_timer_t *t = session_get_timer(session);
+ uv_timer_stop(t);
+ t->data = session;
+ session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
}
return kr_ok();
}
-static struct kr_query *session_current_query(struct session *session)
-{
- if (session->waiting.len == 0) {
- return NULL;
- }
- struct qr_task *task = session->waiting.at[0];
- if (task->ctx->req.rplan.pending.len == 0) {
+static struct kr_query *task_get_last_pending_query(struct qr_task *task)
+{
+ if (!task || task->ctx->req.rplan.pending.len == 0) {
return NULL;
}
return array_tail(task->ctx->req.rplan.pending);
}
+
static void on_connect(uv_connect_t *req, int status)
{
struct worker_ctx *worker = get_worker();
uv_stream_t *handle = req->handle;
struct session *session = handle->data;
- union inaddr *peer = &session->peer;
+ struct sockaddr *peer = session_get_peer(session);
+
+ assert(session_is_outgoing(session));
if (status == UV_ECANCELED) {
- worker_del_tcp_waiting(worker, &peer->ip);
- assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0);
+ worker_del_tcp_waiting(worker, peer);
+ assert(session_is_empty(session) && session_is_closing(session));
iorequest_release(worker, req);
return;
}
- if (session->closing) {
- worker_del_tcp_waiting(worker, &peer->ip);
- assert(session->waiting.len == 0 && session->tasks.len == 0);
+ if (session_is_closing(session)) {
+ worker_del_tcp_waiting(worker, peer);
+ assert(session_is_empty(session));
iorequest_release(worker, req);
return;
}
- uv_timer_stop(&session->timeout);
+ uv_timer_t *t = session_get_timer(session);
+ uv_timer_stop(t);
if (status != 0) {
- worker_del_tcp_waiting(worker, &peer->ip);
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- session_del_tasks(session, task);
- array_del(session->waiting, 0);
- assert(task->refs > 1);
- qr_task_unref(task);
- qr_task_step(task, NULL, NULL);
- }
- assert(session->tasks.len == 0);
+ worker_del_tcp_waiting(worker, peer);
+ session_waitinglist_retry(session, false);
+ assert(session_tasklist_is_empty(session));
iorequest_release(worker, req);
session_close(session);
return;
}
- if (!session->has_tls) {
+ if (!session_has_tls(session)) {
/* if there is a TLS, session still waiting for handshake,
* otherwise remove it from waiting list */
- if (worker_del_tcp_waiting(worker, &peer->ip) != 0) {
+ if (worker_del_tcp_waiting(worker, peer) != 0) {
/* session isn't in list of waiting queries, *
* something gone wrong */
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- session_del_tasks(session, task);
- array_del(session->waiting, 0);
- ioreq_kill_pending(task);
- assert(task->pending_count == 0);
- qr_task_finalize(task, KR_STATE_FAIL);
- qr_task_unref(task);
- }
- assert(session->tasks.len == 0);
+ session_waitinglist_finalize(session, KR_STATE_FAIL);
+ assert(session_tasklist_is_empty(session));
iorequest_release(worker, req);
session_close(session);
return;
}
}
- struct kr_query *qry = session_current_query(session);
+ struct qr_task *task = session_waitinglist_get_first(session);
+ struct kr_query *qry = task_get_last_pending_query(task);
WITH_VERBOSE (qry) {
- char addr_str[INET6_ADDRSTRLEN];
- inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
- addr_str, sizeof(addr_str));
- VERBOSE_MSG(qry, "=> connected to '%s'\n", addr_str);
+ struct sockaddr *peer = session_get_peer(session);
+ char peer_str[INET6_ADDRSTRLEN];
+ inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
+ VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str);
}
- session->connected = true;
- session->handle = (uv_handle_t *)handle;
+ session_set_connected(session, true);
+ session_set_handle(session,(uv_handle_t *)handle);
int ret = kr_ok();
- if (session->has_tls) {
- ret = tls_client_connect_start(session->tls_client_ctx,
- session, session_tls_hs_cb);
+ if (session_has_tls(session)) {
+ struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
+ ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
if (ret == kr_error(EAGAIN)) {
iorequest_release(worker, req);
- io_start_read(session->handle);
- timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+ session_start_read(session);
+ session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
return;
}
}
if (ret == kr_ok()) {
ret = session_next_waiting_send(session);
if (ret == kr_ok()) {
- timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
- worker_add_tcp_connected(worker, &session->peer.ip, session);
+ session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+ struct sockaddr *peer = session_get_peer(session);
+ worker_add_tcp_connected(worker, peer, session);
iorequest_release(worker, req);
return;
}
}
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- session_del_tasks(session, task);
- array_del(session->waiting, 0);
- ioreq_kill_pending(task);
- assert(task->pending_count == 0);
- qr_task_finalize(task, KR_STATE_FAIL);
- qr_task_unref(task);
- }
-
- assert(session->tasks.len == 0);
-
+ session_waitinglist_finalize(session, KR_STATE_FAIL);
+ assert(session_tasklist_is_empty(session));
iorequest_release(worker, req);
session_close(session);
}
uv_timer_stop(timer);
struct worker_ctx *worker = get_worker();
- assert (session->waiting.len == session->tasks.len);
+ assert (session_waitinglist_get_len(session) == session_tasklist_get_len(session));
- union inaddr *peer = &session->peer;
- worker_del_tcp_waiting(worker, &peer->ip);
+ struct sockaddr *peer = session_get_peer(session);
+ worker_del_tcp_waiting(worker, peer);
- struct kr_query *qry = session_current_query(session);
+ struct qr_task *task = session_waitinglist_get_first(session);
+ struct kr_query *qry = task_get_last_pending_query(task);
WITH_VERBOSE (qry) {
- char addr_str[INET6_ADDRSTRLEN];
- inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str));
- VERBOSE_MSG(qry, "=> connection to '%s' failed\n", addr_str);
+ char peer_str[INET6_ADDRSTRLEN];
+ inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
+ VERBOSE_MSG(qry, "=> connection to '%s' failed\n", peer_str);
}
- kr_nsrep_update_rtt(NULL, &peer->ip, KR_NS_DEAD,
+ kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
worker->engine->resolver.cache_rtt,
KR_NS_UPDATE_NORESET);
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- assert(task->ctx);
- task->timeouts += 1;
- worker->stats.timeout += 1;
- session_del_tasks(session, task);
- array_del(session->waiting, 0);
- assert(task->refs > 1);
- qr_task_unref(task);
- qr_task_step(task, NULL, NULL);
- }
-
- assert (session->tasks.len == 0);
+ worker->stats.timeout += session_waitinglist_get_len(session);
+ session_waitinglist_retry(session, true);
+ assert (session_tasklist_is_empty(session));
session_close(session);
}
static void on_tcp_watchdog_timeout(uv_timer_t *timer)
{
struct session *session = timer->data;
+ struct worker_ctx *worker = timer->loop->data;
+ struct sockaddr *peer = session_get_peer(session);
+
+ assert(session_is_outgoing(session));
- assert(session->outgoing);
uv_timer_stop(timer);
- struct worker_ctx *worker = get_worker();
- if (session->outgoing) {
- if (session->has_tls) {
- worker_del_tcp_waiting(worker, &session->peer.ip);
- }
- worker_del_tcp_connected(worker, &session->peer.ip);
-
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- task->timeouts += 1;
- worker->stats.timeout += 1;
- array_del(session->waiting, 0);
- session_del_tasks(session, task);
- ioreq_kill_pending(task);
- assert(task->pending_count == 0);
- qr_task_finalize(task, KR_STATE_FAIL);
- qr_task_unref(task);
- }
- }
- while (session->tasks.len > 0) {
- struct qr_task *task = session->tasks.at[0];
- task->timeouts += 1;
- worker->stats.timeout += 1;
- assert(task->refs > 1);
- array_del(session->tasks, 0);
- ioreq_kill_pending(task);
- assert(task->pending_count == 0);
- qr_task_finalize(task, KR_STATE_FAIL);
- qr_task_unref(task);
+ if (session_has_tls(session)) {
+ worker_del_tcp_waiting(worker, peer);
}
+ worker_del_tcp_connected(worker, peer);
+ worker->stats.timeout += session_waitinglist_get_len(session);
+ session_waitinglist_finalize(session, KR_STATE_FAIL);
+ worker->stats.timeout += session_tasklist_get_len(session);
+ session_tasklist_finalize(session, KR_STATE_FAIL);
session_close(session);
}
static void on_udp_timeout(uv_timer_t *timer)
{
struct session *session = timer->data;
- assert(session->handle->data == session);
+ assert(session_get_handle(session)->data == session);
+ assert(session_tasklist_get_len(session) == 1);
+ assert(session_waitinglist_is_empty(session));
uv_timer_stop(timer);
- assert(session->tasks.len == 1);
- assert(session->waiting.len == 0);
/* Penalize all tried nameservers with a timeout. */
- struct qr_task *task = session->tasks.at[0];
+ struct qr_task *task = session_tasklist_get_first(session);
struct worker_ctx *worker = task->ctx->worker;
if (task->leading && task->pending_count > 0) {
struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
struct session *s = timer->data;
assert(s);
uv_timer_stop(timer);
- if (s->closing) {
+ if (session_is_closing(s)) {
return;
}
/* session was not in use during timer timeout
* remove it from connection list and close
*/
- assert(s->tasks.len == 0 && s->waiting.len == 0);
+ assert(session_is_empty(s));
session_close(s);
}
}
struct sockaddr *addr = (struct sockaddr *)choice;
struct session *session = ret->data;
- assert (session->peer.ip.sa_family == AF_UNSPEC && session->outgoing);
- memcpy(&session->peer, addr, sizeof(session->peer));
+ struct sockaddr *peer = session_get_peer(session);
+ assert (peer->sa_family == AF_UNSPEC && session_is_outgoing(session));
+ memcpy(peer, addr, kr_sockaddr_len(addr));
if (qr_task_send(task, ret, (struct sockaddr *)choice,
task->pktbuf) == 0) {
task->addrlist_turn = (task->addrlist_turn + 1) %
static void on_retransmit(uv_timer_t *req)
{
struct session *session = req->data;
- assert(session->tasks.len == 1);
+ assert(session_tasklist_get_len(session) == 1);
uv_timer_stop(req);
- struct qr_task *task = session->tasks.at[0];
+ struct qr_task *task = session_tasklist_get_first(session);
if (retransmit(task) == NULL) {
/* Not possible to spawn request, start timeout timer with remaining deadline. */
uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
}
}
-static int timer_start(struct session *session, uv_timer_cb cb,
- uint64_t timeout, uint64_t repeat)
-{
- uv_timer_t *timer = &session->timeout;
- assert(timer->data == session);
- int ret = uv_timer_start(timer, cb, timeout, repeat);
- if (ret != 0) {
- uv_timer_stop(timer);
- return kr_error(ENOMEM);
- }
- return 0;
-}
-
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
/* Close pending timer */
return true;
}
-
static int qr_task_finalize(struct qr_task *task, int state)
{
assert(task && task->leading == false);
/* Send back answer */
struct session *source_session = ctx->source.session;
- uv_handle_t *handle = source_session->handle;
- assert(source_session->closing == false);
+ uv_handle_t *handle = session_get_handle(source_session);
+ assert(!session_is_closing(source_session));
assert(handle && handle->data == ctx->source.session);
assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
int res = qr_task_send(task, handle,
if (res != kr_ok()) {
(void) qr_task_on_send(task, NULL, kr_error(EIO));
/* Since source session is erroneous detach all tasks. */
- while (source_session->tasks.len > 0) {
- struct qr_task *t = source_session->tasks.at[0];
+ while (!session_tasklist_is_empty(source_session)) {
+ struct qr_task *t = session_tasklist_get_first(source_session);
struct request_ctx *c = t->ctx;
assert(c->source.session == source_session);
c->source.session = NULL;
/* Don't finalize them as there can be other tasks
* waiting for answer to this particular task.
* (ie. task->leading is true) */
- session_del_tasks(source_session, t);
+ session_tasklist_del_index(source_session, 0);
}
session_close(source_session);
} else if (handle->type == UV_TCP && ctx->source.session) {
/* Don't try to close source session at least
* retry_interval_for_timeout_timer milliseconds */
- uv_timer_again(&ctx->source.session->timeout);
+ session_timer_restart(ctx->source.session);
}
qr_task_unref(task);
task->addrlist = NULL;
task->addrlist_count = 0;
task->addrlist_turn = 0;
- req->has_tls = (ctx->source.session && ctx->source.session->has_tls);
+ req->has_tls = (ctx->source.session && session_has_tls(ctx->source.session));
if (worker->too_many_open) {
struct kr_rplan *rplan = &req->rplan;
*/
subreq_lead(task);
struct session *session = handle->data;
- assert(session->handle->type == UV_UDP);
- ret = timer_start(session, on_retransmit, timeout, 0);
+ assert(session_get_handle(session) == handle && (handle->type == UV_UDP));
+ ret = session_timer_start(session, on_retransmit, timeout, 0);
/* Start next step with timeout, fatal if can't start a timer. */
if (ret != 0) {
subreq_finalize(task, packet_source, packet);
}
struct session* session = NULL;
if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
- assert(session->outgoing);
- if (session->closing) {
+ assert(session_is_outgoing(session));
+ if (session_is_closing(session)) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
* It means that connection establishing or data sending
* is coming right now. */
/* Task will be notified in on_connect() or qr_task_on_send(). */
- ret = session_add_waiting(session, task);
+ ret = session_waitinglist_add(session, task);
if (ret < 0) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- ret = session_add_tasks(session, task);
+ ret = session_tasklist_add(session, task);
if (ret < 0) {
- session_del_waiting(session, task);
+ session_waitinglist_del(session, task);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
assert(task->pending_count == 0);
- task->pending[task->pending_count] = session->handle;
+ task->pending[task->pending_count] = session_get_handle(session);
task->pending_count += 1;
} else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
/* Connection has been already established */
- assert(session->outgoing);
- if (session->closing) {
- session_del_tasks(session, task);
+ assert(session_is_outgoing(session));
+ if (session_is_closing(session)) {
+ session_tasklist_del(session, task);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- if (session->tasks.len >= worker->tcp_pipeline_max) {
- session_del_tasks(session, task);
+ if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
+ session_tasklist_del(session, task);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
/* will be removed in qr_task_on_send() */
- ret = session_add_waiting(session, task);
+ ret = session_waitinglist_add(session, task);
if (ret < 0) {
- session_del_tasks(session, task);
+ session_tasklist_del(session, task);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- ret = session_add_tasks(session, task);
+ ret = session_tasklist_add(session, task);
if (ret < 0) {
- session_del_waiting(session, task);
- session_del_tasks(session, task);
+ session_waitinglist_del(session, task);
+ session_tasklist_del(session, task);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- if (session->waiting.len == 1) {
- ret = qr_task_send(task, session->handle,
- &session->peer.ip, task->pktbuf);
+ if (session_waitinglist_get_len(session) == 1) {
+ ret = qr_task_send(task, session_get_handle(session),
+ session_get_peer(session), task->pktbuf);
if (ret < 0) {
- session_del_waiting(session, task);
- session_del_tasks(session, task);
- while (session->tasks.len != 0) {
- struct qr_task *t = session->tasks.at[0];
- qr_task_finalize(t, KR_STATE_FAIL);
- session_del_tasks(session, t);
- }
+ session_waitinglist_del(session, task);
+ session_tasklist_del(session, task);
+ session_tasklist_finalize(session, KR_STATE_FAIL);
subreq_finalize(task, packet_source, packet);
session_close(session);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- if (session->tasks.len == 1) {
- uv_timer_stop(&session->timeout);
- ret = timer_start(session, on_tcp_watchdog_timeout,
- MAX_TCP_INACTIVITY, 0);
+ if (session_tasklist_get_len(session) == 1) {
+ session_timer_stop(session);
+ ret = session_timer_start(session, on_tcp_watchdog_timeout,
+ MAX_TCP_INACTIVITY, 0);
}
if (ret < 0) {
- session_del_waiting(session, task);
- session_del_tasks(session, task);
- while (session->tasks.len != 0) {
- struct qr_task *t = session->tasks.at[0];
- qr_task_finalize(t, KR_STATE_FAIL);
- session_del_tasks(session, t);
- }
+ session_waitinglist_del(session, task);
+ session_tasklist_del(session, task);
+ session_tasklist_finalize(session, KR_STATE_FAIL);
subreq_finalize(task, packet_source, packet);
session_close(session);
return qr_task_finalize(task, KR_STATE_FAIL);
}
}
assert(task->pending_count == 0);
- task->pending[task->pending_count] = session->handle;
+ task->pending[task->pending_count] = session_get_handle(session);
task->pending_count += 1;
} else {
/* Make connection */
session = client->data;
ret = worker_add_tcp_waiting(ctx->worker, addr, session);
if (ret < 0) {
- session_del_tasks(session, task);
+ session_tasklist_del(session, task);
iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
/* will be removed in qr_task_on_send() */
- ret = session_add_waiting(session, task);
+ ret = session_waitinglist_add(session, task);
if (ret < 0) {
- session_del_tasks(session, task);
+ session_tasklist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
const char *key = tcpsess_key(addr);
struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
if (entry) {
- assert(session->tls_client_ctx == NULL);
+ assert(session_tls_get_client_ctx(session) == NULL);
struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
if (!tls_ctx) {
- session_del_tasks(session, task);
- session_del_waiting(session, task);
+ session_tasklist_del(session, task);
+ session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
return qr_task_step(task, NULL, NULL);
}
tls_client_ctx_set_session(tls_ctx, session);
- session->tls_client_ctx = tls_ctx;
- session->has_tls = true;
+ session_tls_set_client_ctx(session, tls_ctx);
+ session_set_has_tls(session, true);
}
conn->data = session;
- memcpy(&session->peer, addr, sizeof(session->peer));
+ struct sockaddr *peer = session_get_peer(session);
+ memcpy(peer, addr, kr_sockaddr_len(addr));
- ret = timer_start(session, on_tcp_connect_timeout,
- KR_CONN_RTT_MAX, 0);
+ ret = session_timer_start(session, on_tcp_connect_timeout,
+ KR_CONN_RTT_MAX, 0);
if (ret != 0) {
- session_del_tasks(session, task);
- session_del_waiting(session, task);
+ session_tasklist_del(session, task);
+ session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- struct kr_query *qry = session_current_query(session);
+ struct qr_task *task = session_waitinglist_get_first(session);
+ struct kr_query *qry = task_get_last_pending_query(task);
WITH_VERBOSE (qry) {
- char addr_str[INET6_ADDRSTRLEN];
- inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), addr_str, sizeof(addr_str));
- VERBOSE_MSG(qry, "=> connecting to: '%s'\n", addr_str);
+ char peer_str[INET6_ADDRSTRLEN];
+ inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
+ VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str);
}
if (uv_tcp_connect(conn, (uv_tcp_t *)client,
addr , on_connect) != 0) {
- uv_timer_stop(&session->timeout);
- session_del_tasks(session, task);
- session_del_waiting(session, task);
+ session_timer_stop(session);
+ session_tasklist_del(session, task);
+ session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
iorequest_release(ctx->worker, conn);
subreq_finalize(task, packet_source, packet);
return ret;
}
-static struct qr_task* find_task(const struct session *session, uint16_t msg_id)
+int worker_submit(struct session *session, knot_pkt_t *query)
{
- struct qr_task *ret = NULL;
- const qr_tasklist_t *tasklist = &session->tasks;
- for (size_t i = 0; i < tasklist->len; ++i) {
- struct qr_task *task = tasklist->at[i];
- uint16_t task_msg_id = knot_wire_get_id(task->pktbuf->wire);
- if (task_msg_id == msg_id) {
- ret = task;
- break;
- }
+ if (!session) {
+ assert(false);
+ return kr_error(EINVAL);
}
- return ret;
-}
-
-int worker_submit(struct worker_ctx *worker, uv_handle_t *handle,
- knot_pkt_t *query, const struct sockaddr* addr)
-{
- bool OK = worker && handle && handle->data;
+ uv_handle_t *handle = session_get_handle(session);
+ bool OK = handle && handle->loop->data;
if (!OK) {
assert(false);
return kr_error(EINVAL);
}
- struct session *session = handle->data;
+ struct worker_ctx *worker = handle->loop->data;
/* Parse packet */
int ret = parse_packet(query);
/* Start new task on listening sockets,
* or resume if this is subrequest */
struct qr_task *task = NULL;
- if (!session->outgoing) { /* request from a client */
+ struct sockaddr *addr = NULL;
+ if (!session_is_outgoing(session)) { /* request from a client */
/* Ignore badly formed queries. */
if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
if (query) worker->stats.dropped += 1;
return kr_error(EILSEQ);
}
- struct request_ctx *ctx = request_create(worker, handle, addr);
+ struct request_ctx *ctx = request_create(worker, handle,
+ session_get_peer(session));
if (!ctx) {
return kr_error(ENOMEM);
}
/* Ignore badly formed responses. */
return kr_error(EILSEQ);
}
- task = find_task(session, knot_wire_get_id(query->wire));
+ task = session_tasklist_find(session, knot_wire_get_id(query->wire));
if (task == NULL) {
return kr_error(ENOENT);
}
- assert(session->closing == false);
+ assert(!session_is_closing(session));
}
- assert(uv_is_closing(session->handle) == false);
+ assert(uv_is_closing(session_get_handle(session)) == false);
/* Consume input and produce next message */
return qr_task_step(task, addr, query);
return ret;
}
-static int worker_add_tcp_connected(struct worker_ctx *worker,
+int worker_add_tcp_connected(struct worker_ctx *worker,
const struct sockaddr* addr,
struct session *session)
{
return map_add_tcp_session(&worker->tcp_connected, addr, session);
}
-static int worker_del_tcp_connected(struct worker_ctx *worker,
+int worker_del_tcp_connected(struct worker_ctx *worker,
const struct sockaddr* addr)
{
assert(addr && tcpsess_key(addr));
return wire_read_u16(msg);
}
-/* If buffering, close last task as it isn't live yet. */
-static void discard_buffered(struct session *session)
-{
- if (session->buffering) {
- qr_task_free(session->buffering);
- session->buffering = NULL;
- session->msg_hdr_idx = 0;
- }
-}
-
-int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle)
+int worker_end_tcp(struct session *session)
{
- if (!worker || !handle) {
+ if (!session) {
return kr_error(EINVAL);
}
- /* If this is subrequest, notify parent task with empty input
- * because in this case session doesn't own tasks, it has just
- * borrowed the task from parent session. */
- struct session *session = handle->data;
- if (session->outgoing) {
- worker_submit(worker, handle, NULL, NULL);
- } else {
- discard_buffered(session);
- }
- return 0;
-}
-int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
- const uint8_t *msg, ssize_t len)
+ session_timer_stop(session);
+
+ uv_handle_t *handle = session_get_handle(session);
+ struct worker_ctx *worker = handle->loop->data;
+ struct sockaddr *peer = session_get_peer(session);
+ worker_del_tcp_connected(worker, peer);
+ session_set_connected(session, false);
-{
- if (!worker || !handle) {
- return kr_error(EINVAL);
+ struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
+ if (tls_client_ctx) {
+ /* Avoid gnutls_bye() call */
+ tls_set_hs_state(&tls_client_ctx->c, TLS_HS_NOT_STARTED);
}
- /* Connection error or forced disconnect */
- struct session *session = handle->data;
- assert(session && session->handle == (uv_handle_t *)handle && handle->type == UV_TCP);
- if (session->closing) {
- return kr_ok();
- }
- if (len <= 0 || !msg) {
- /* If we have pending tasks, we must dissociate them from the
- * connection so they don't try to access closed and freed handle.
- * @warning Do not modify task if this is outgoing request
- * as it is shared with originator.
- */
- struct kr_query *qry = session_current_query(session);
- WITH_VERBOSE (qry) {
- char addr_str[INET6_ADDRSTRLEN];
- inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
- addr_str, sizeof(addr_str));
- VERBOSE_MSG(qry, "=> connection to '%s' closed by peer\n", addr_str);
- }
- uv_timer_t *timer = &session->timeout;
- uv_timer_stop(timer);
- struct sockaddr *peer = &session->peer.ip;
- worker_del_tcp_connected(worker, peer);
- session->connected = false;
-
- if (session->tls_client_ctx) {
- /* Avoid gnutls_bye() call */
- tls_set_hs_state(&session->tls_client_ctx->c,
- TLS_HS_NOT_STARTED);
- }
- if (session->tls_ctx) {
- /* Avoid gnutls_bye() call */
- tls_set_hs_state(&session->tls_ctx->c,
- TLS_HS_NOT_STARTED);
- }
-
- if (session->outgoing && session->buffering) {
- session->buffering = NULL;
- }
+ struct tls_ctx_t *tls_ctx = session_tls_get_server_ctx(session);
+ if (tls_ctx) {
+ /* Avoid gnutls_bye() call */
+ tls_set_hs_state(&tls_ctx->c, TLS_HS_NOT_STARTED);
+ }
- assert(session->tasks.len >= session->waiting.len);
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- array_del(session->waiting, 0);
- assert(task->refs > 1);
- session_del_tasks(session, task);
- if (session->outgoing) {
- if (task->ctx->req.options.FORWARD) {
- /* We are in TCP_FORWARD mode.
- * To prevent failing at kr_resolve_consume()
- * qry.flags.TCP must be cleared.
- * TODO - refactoring is needed. */
+ assert(session_tasklist_get_len(session) >= session_waitinglist_get_len(session));
+ while (!session_waitinglist_is_empty(session)) {
+ struct qr_task *task = session_waitinglist_get_first(session);
+ session_waitinglist_del_index(session, 0);
+ assert(task->refs > 1);
+ session_tasklist_del(session, task);
+ if (session_is_outgoing(session)) {
+ if (task->ctx->req.options.FORWARD) {
+ /* We are in TCP_FORWARD mode.
+ * To prevent failing at kr_resolve_consume()
+ * qry.flags.TCP must be cleared.
+ * TODO - refactoring is needed. */
struct kr_request *req = &task->ctx->req;
struct kr_rplan *rplan = &req->rplan;
struct kr_query *qry = array_tail(rplan->pending);
qry->flags.TCP = false;
- }
- qr_task_step(task, NULL, NULL);
- } else {
- assert(task->ctx->source.session == session);
- task->ctx->source.session = NULL;
- }
- qr_task_unref(task);
- }
- while (session->tasks.len > 0) {
- struct qr_task *task = session->tasks.at[0];
- if (session->outgoing) {
- if (task->ctx->req.options.FORWARD) {
- struct kr_request *req = &task->ctx->req;
- struct kr_rplan *rplan = &req->rplan;
- struct kr_query *qry = array_tail(rplan->pending);
- qry->flags.TCP = false;
- }
- qr_task_step(task, NULL, NULL);
- } else {
- assert(task->ctx->source.session == session);
- task->ctx->source.session = NULL;
- }
- session_del_tasks(session, task);
- }
- session_close(session);
- return kr_ok();
- }
-
- if (session->bytes_to_skip) {
- assert(session->buffering == NULL);
- ssize_t min_len = MIN(session->bytes_to_skip, len);
- len -= min_len;
- msg += min_len;
- session->bytes_to_skip -= min_len;
- if (len < 0 || session->bytes_to_skip < 0) {
- /* Something gone wrong.
- * Better kill the connection */
- return kr_error(EILSEQ);
- }
- if (len == 0) {
- return kr_ok();
- }
- assert(session->bytes_to_skip == 0);
- }
-
- int submitted = 0;
- struct qr_task *task = session->buffering;
- knot_pkt_t *pkt_buf = NULL;
- if (task) {
- pkt_buf = task->pktbuf;
- } else {
- /* Update DNS header in session->msg_hdr* */
- assert(session->msg_hdr_idx <= sizeof(session->msg_hdr));
- ssize_t hdr_amount = sizeof(session->msg_hdr) -
- session->msg_hdr_idx;
- if (hdr_amount > len) {
- hdr_amount = len;
- }
- if (hdr_amount > 0) {
- memcpy(session->msg_hdr + session->msg_hdr_idx, msg, hdr_amount);
- session->msg_hdr_idx += hdr_amount;
- len -= hdr_amount;
- msg += hdr_amount;
- }
- if (len == 0) { /* no data beyond msg_hdr -> not much to do */
- return kr_ok();
- }
- assert(session->msg_hdr_idx == sizeof(session->msg_hdr));
- session->msg_hdr_idx = 0;
- uint16_t msg_size = get_msg_size(session->msg_hdr);
- uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2);
- if (msg_size < KNOT_WIRE_HEADER_SIZE) {
- /* better kill the connection; we would probably get out of sync */
- uv_timer_t *timer = &session->timeout;
- uv_timer_stop(timer);
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- if (session->outgoing) {
- qr_task_finalize(task, KR_STATE_FAIL);
- } else {
- assert(task->ctx->source.session == session);
- task->ctx->source.session = NULL;
- }
- array_del(session->waiting, 0);
- session_del_tasks(session, task);
- qr_task_unref(task);
- }
- while (session->tasks.len > 0) {
- struct qr_task *task = session->tasks.at[0];
- if (session->outgoing) {
- qr_task_finalize(task, KR_STATE_FAIL);
- } else {
- assert(task->ctx->source.session == session);
- task->ctx->source.session = NULL;
- }
- session_del_tasks(session, task);
- }
- session_close(session);
-
- return kr_ok();
- }
-
- /* get task */
- if (!session->outgoing) {
- /* This is a new query, create a new task that we can use
- * to buffer incoming message until it's complete. */
- struct sockaddr *addr = &(session->peer.ip);
- assert(addr->sa_family != AF_UNSPEC);
- struct request_ctx *ctx = request_create(worker,
- (uv_handle_t *)handle,
- addr);
- if (!ctx) {
- return kr_error(ENOMEM);
- }
- task = qr_task_create(ctx);
- if (!task) {
- request_free(ctx);
- return kr_error(ENOMEM);
}
+ qr_task_step(task, NULL, NULL);
} else {
- /* Start of response from upstream.
- * The session task list must contain a task
- * with the same msg id. */
- task = find_task(session, msg_id);
- /* FIXME: on high load over one connection, it's likely
- * that we will get multiple matches sooner or later (!) */
- if (task) {
- /* Make sure we can process maximum packet sizes over TCP for outbound queries.
- * Previous packet is allocated with mempool, so there's no need to free it manually. */
- if (task->pktbuf->max_size < KNOT_WIRE_MAX_PKTSIZE) {
- knot_mm_t *pool = &task->pktbuf->mm;
- pkt_buf = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, pool);
- if (!pkt_buf) {
- return kr_error(ENOMEM);
- }
- task->pktbuf = pkt_buf;
- }
- knot_pkt_clear(task->pktbuf);
- assert(task->leading == false);
- } else {
- session->bytes_to_skip = msg_size - 2;
- ssize_t min_len = MIN(session->bytes_to_skip, len);
- len -= min_len;
- msg += min_len;
- session->bytes_to_skip -= min_len;
- if (len < 0 || session->bytes_to_skip < 0) {
- /* Something gone wrong.
- * Better kill the connection */
- return kr_error(EILSEQ);
- }
- if (len == 0) {
- return submitted;
- }
- assert(session->bytes_to_skip == 0);
- int ret = worker_process_tcp(worker, handle, msg, len);
- if (ret < 0) {
- submitted = ret;
- } else {
- submitted += ret;
- }
- return submitted;
- }
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
}
-
- pkt_buf = task->pktbuf;
- knot_wire_set_id(pkt_buf->wire, msg_id);
- pkt_buf->size = 2;
- task->bytes_remaining = msg_size - 2;
- assert(session->buffering == NULL);
- session->buffering = task;
+ qr_task_unref(task);
}
- /* At this point session must have either created new task
- * or it's already assigned. */
- assert(task);
- assert(len > 0);
-
- /* Message is too long, can't process it. */
- ssize_t to_read = MIN(len, task->bytes_remaining);
- if (pkt_buf->size + to_read > pkt_buf->max_size) {
- // TODO reallocate pkt_buf
- pkt_buf->size = 0;
- len -= to_read;
- msg += to_read;
- session->bytes_to_skip = task->bytes_remaining - to_read;
- task->bytes_remaining = 0;
- if (session->buffering) {
- if (!session->outgoing) {
- qr_task_complete(session->buffering);
- }
- session->buffering = NULL;
- }
- if (len > 0) {
- int ret = worker_process_tcp(worker, handle, msg, len);
- if (ret < 0) {
- submitted = ret;
- } else {
- submitted += ret;
+ while (!session_tasklist_is_empty(session)) {
+ struct qr_task *task = session_tasklist_get_first(session);
+ session_tasklist_del_index(session, 0);
+ if (session_is_outgoing(session)) {
+ if (task->ctx->req.options.FORWARD) {
+ struct kr_request *req = &task->ctx->req;
+ struct kr_rplan *rplan = &req->rplan;
+ struct kr_query *qry = array_tail(rplan->pending);
+ qry->flags.TCP = false;
}
- }
- return submitted;
- }
- /* Buffer message and check if it's complete */
- memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
- pkt_buf->size += to_read;
- task->bytes_remaining -= to_read;
- len -= to_read;
- msg += to_read;
- if (task->bytes_remaining == 0) {
- /* Message was assembled, clear temporary. */
- session->buffering = NULL;
- session->msg_hdr_idx = 0;
- const struct sockaddr *addr = NULL;
- knot_pkt_t *pkt = pkt_buf;
- if (session->outgoing) {
- addr = &session->peer.ip;
- assert ((task->pending_count == 1) && (task->pending[0] == session->handle));
- task->pending_count = 0;
- session_del_tasks(session, task);
- }
- /* Parse the packet and start resolving complete query */
- int ret = parse_packet(pkt);
- if (ret == 0) {
- if (session->outgoing) {
- /* To prevent slow lorris attack restart watchdog only after
- * the whole message was successfully assembled and parsed */
- if (session->tasks.len > 0 || session->waiting.len > 0) {
- uv_timer_stop(&session->timeout);
- timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
- }
- } else {
- /* Start only new queries,
- * not subrequests that are already pending */
- ret = request_start(task->ctx, pkt);
- if (ret != 0) {
- /* Allocation of answer buffer has failed.
- * We can't notify client about failure,
- * so just end the task processing. */
- qr_task_complete(task);
- goto next_msg;
- }
-
- ret = qr_task_register(task, session);
- if (ret != 0) {
- /* Answer buffer has been allocated,
- * but task can't be attached to the given
- * session due to memory problems.
- * Finalize the task, otherwise it becomes orphaned. */
- knot_pkt_init_response(task->ctx->req.answer, pkt);
- qr_task_finalize(task, KR_STATE_FAIL);
- goto next_msg;
- }
- submitted += 1;
- if (task->leading) {
- assert(false);
- }
- }
- } else if (session->outgoing) {
- /* Drop malformed packet and retry resolution */
- pkt = NULL;
- ret = 0;
+ qr_task_step(task, NULL, NULL);
} else {
- qr_task_complete(task);
- }
- /* Only proceed if the message is valid, or it's an invalid response to
- * an outbound query which needs to be treated as a timeout. */
- if (ret == 0) {
- /* since there can be next dns message, we must to proceed
- * even if qr_task_step() returns error */
- qr_task_step(task, addr, pkt);
- }
-next_msg:
- if (len > 0) {
- /* TODO: this is simple via iteration; recursion doesn't really help */
- ret = worker_process_tcp(worker, handle, msg, len);
- if (ret < 0) {
- return ret;
- }
- submitted += ret;
+ assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
}
}
- assert(submitted >= 0);
- return submitted;
+ session_close(session);
+ return kr_ok();
}
struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options)
return qr_task_step(task, NULL, query);
}
+int worker_task_numrefs(const struct qr_task *task)
+{
+ return task->refs;
+}
+
struct kr_request *worker_task_request(struct qr_task *task)
{
if (!task || !task->ctx) {
return qr_task_finalize(task, state);
}
-void worker_session_close(struct session *session)
+ int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
+ knot_pkt_t *packet)
+ {
+ return qr_task_step(task, packet_source, packet);
+ }
+
+void worker_task_complete(struct qr_task *task)
{
- session_close(session);
+ return qr_task_complete(task);
+}
+
+void worker_task_ref(struct qr_task *task)
+{
+ qr_task_ref(task);
+}
+
+void worker_task_unref(struct qr_task *task)
+{
+ qr_task_unref(task);
+}
+
+void worker_task_timeout_inc(struct qr_task *task)
+{
+ task->timeouts += 1;
+}
+
+struct session *worker_session_borrow(struct worker_ctx *worker)
+{
+ struct session *s = NULL;
+ if (worker->pool_sessions.len > 0) {
+ s = array_tail(worker->pool_sessions);
+ array_pop(worker->pool_sessions);
+ kr_asan_custom_unpoison(session, s);
+ } else {
+ s = session_new();
+ }
+ return s;
+}
+
+void worker_session_release(struct worker_ctx *worker, uv_handle_t *handle)
+{
+ if (!worker || !handle) {
+ return;
+ }
+ struct session *s = handle->data;
+ if (!s) {
+ return;
+ }
+ assert(session_is_empty(s));
+ if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
+ session_clear(s);
+ array_push(worker->pool_sessions, s);
+ kr_asan_custom_poison(session, s);
+ } else {
+ session_free(s);
+ }
+}
+
+knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task)
+{
+ return task->pktbuf;
+}
+
+struct request_ctx *worker_task_get_request(struct qr_task *task)
+{
+ return task->ctx;
+}
+
+struct session *worker_request_get_source_session(struct request_ctx *ctx)
+{
+ return ctx->source.session;
+}
+
+void worker_request_set_source_session(struct request_ctx *ctx, struct session *session)
+{
+ ctx->source.session = session;
}
/** Reserve worker buffers */
} \
array_clear(list)
+#define reclaim_freelist_custom(list, type, cb) \
+ for (unsigned i = 0; i < list.len; ++i) { \
+ void *elm = list.at[i]; \
+ kr_asan_custom_unpoison(type, elm); \
+ cb(elm); \
+ } \
+ array_clear(list)
+
void worker_reclaim(struct worker_ctx *worker)
{
reclaim_freelist(worker->pool_mp, struct mempool, mp_delete);
reclaim_freelist(worker->pool_ioreqs, uv_reqs_t, free);
reclaim_freelist(worker->pool_iohandles, uv_handles_t, free);
- reclaim_freelist(worker->pool_sessions, struct session, session_free);
+ reclaim_freelist_custom(worker->pool_sessions, session, session_free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
trie_free(worker->subreq_out);