#include "daemon/tls.h"
#include "daemon/worker.h"
#include "daemon/io.h"
-
-/** List of tasks. */
-typedef array_t(struct qr_task *) session_tasklist_t;
+#include "lib/generic/queue.h"
/* Per-session (TCP or UDP) persistent structure,
* that exists between remote counterpart and a local socket.
struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */
struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
- session_tasklist_t tasks; /**< list of tasks which assotiated with given session. */
- session_tasklist_t waiting; /**< list of tasks been waiting for IO (subset of taska). */
+ trie_t *tasks; /**< list of tasks assotiated with given session. */
+ queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */
uint8_t *wire_buf; /**< Buffer for DNS message. */
ssize_t wire_buf_size; /**< Buffer size. */
void session_free(struct session *session)
{
if (session) {
- assert(session->tasks.len == 0 && session->waiting.len == 0);
+ assert(session_is_empty(session));
session_clear(session);
free(session);
}
void session_clear(struct session *session)
{
- assert(session->tasks.len == 0 && session->waiting.len == 0);
+ assert(session_is_empty(session));
if (session->handle && session->handle->type == UV_TCP) {
free(session->wire_buf);
}
- array_clear(session->tasks);
- array_clear(session->waiting);
+ trie_clear(session->tasks);
+ trie_free(session->tasks);
+ queue_deinit(session->waiting);
tls_free(session->tls_ctx);
tls_client_ctx_free(session->tls_client_ctx);
memset(session, 0, sizeof(*session));
void session_close(struct session *session)
{
- assert(session->tasks.len == 0 && session->waiting.len == 0);
-
+ assert(session_is_empty(session));
if (session->sflags.closing) {
return;
}
uv_handle_t *handle = session->handle;
io_stop_read(handle);
session->sflags.closing = true;
- if (session->sflags.outgoing &&
- session->peer.ip.sa_family != AF_UNSPEC) {
+ if (session->peer.ip.sa_family != AF_UNSPEC) {
struct worker_ctx *worker = handle->loop->data;
struct sockaddr *peer = &session->peer.ip;
worker_del_tcp_connected(worker, peer);
return io_start_read(session->handle);
}
-int session_waitinglist_add(struct session *session, struct qr_task *task)
+int session_waitinglist_push(struct session *session, struct qr_task *task)
{
- for (int i = 0; i < session->waiting.len; ++i) {
- if (session->waiting.at[i] == task) {
- return i;
- }
- }
- int ret = array_push(session->waiting, task);
- if (ret >= 0) {
- worker_task_ref(task);
- }
- return ret;
+ queue_push(session->waiting, task);
+ worker_task_ref(task);
+ return kr_ok();
}
-int session_waitinglist_del(struct session *session, struct qr_task *task)
+struct qr_task *session_waitinglist_get(const struct session *session)
{
- int ret = kr_error(ENOENT);
- for (int i = 0; i < session->waiting.len; ++i) {
- if (session->waiting.at[i] == task) {
- array_del(session->waiting, i);
- worker_task_unref(task);
- ret = kr_ok();
- break;
- }
- }
- return ret;
+ return queue_head(session->waiting);
}
-int session_waitinglist_del_index(struct session *session, int index)
+struct qr_task *session_waitinglist_pop(struct session *session, bool deref)
{
- int ret = kr_error(ENOENT);
- if (index < session->waiting.len) {
- struct qr_task *task = session->waiting.at[index];
- array_del(session->waiting, index);
- worker_task_unref(task);
- ret = kr_ok();
+ struct qr_task *t = session_waitinglist_get(session);
+ queue_pop(session->waiting);
+ if (deref) {
+ worker_task_unref(t);
}
- return ret;
+ return t;
}
int session_tasklist_add(struct session *session, struct qr_task *task)
{
- for (int i = 0; i < session->tasks.len; ++i) {
- if (session->tasks.at[i] == task) {
- return i;
- }
+ trie_t *t = session->tasks;
+ uint16_t task_msg_id = 0;
+ const char *key = NULL;
+ size_t key_len = 0;
+ if (session->sflags.outgoing) {
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ task_msg_id = knot_wire_get_id(pktbuf->wire);
+ key = (const char *)&task_msg_id;
+ key_len = sizeof(task_msg_id);
+ } else {
+ key = (const char *)task;
+ key_len = sizeof(task);
+ }
+ trie_val_t *v = trie_get_ins(t, key, key_len);
+ if (unlikely(!v)) {
+ assert(false);
+ return kr_error(ENOMEM);
}
- int ret = array_push(session->tasks, task);
- if (ret >= 0) {
+ if (*v == NULL) {
+ *v = task;
worker_task_ref(task);
+ } else if (*v != task) {
+ assert(false);
+ return kr_error(ENOMEM);
}
- return ret;
+ return kr_ok();
}
int session_tasklist_del(struct session *session, struct qr_task *task)
{
- int ret = kr_error(ENOENT);
- for (int i = 0; i < session->tasks.len; ++i) {
- if (session->tasks.at[i] == task) {
- array_del(session->tasks, i);
- worker_task_unref(task);
- ret = kr_ok();
- break;
- }
+ trie_t *t = session->tasks;
+ uint16_t task_msg_id = 0;
+ const char *key = NULL;
+ size_t key_len = 0;
+ trie_val_t val;
+ if (session->sflags.outgoing) {
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ task_msg_id = knot_wire_get_id(pktbuf->wire);
+ key = (const char *)&task_msg_id;
+ key_len = sizeof(task_msg_id);
+ } else {
+ key = (const char *)task;
+ key_len = sizeof(task);
+ }
+ int ret = trie_del(t, key, key_len, &val);
+ if (ret == kr_ok()) {
+ assert(val == task);
+ worker_task_unref(val);
}
return ret;
}
-int session_tasklist_del_index(struct session *session, int index)
+struct qr_task *session_tasklist_get_first(struct session *session)
{
- int ret = kr_error(ENOENT);
- if (index < session->tasks.len) {
- struct qr_task *task = session->tasks.at[index];
- array_del(session->tasks, index);
- worker_task_unref(task);
- ret = kr_ok();
+ trie_val_t *val = trie_get_first(session->tasks, NULL, NULL);
+ return val ? (struct qr_task *) *val : NULL;
+}
+
+struct qr_task *session_tasklist_del_first(struct session *session, bool deref)
+{
+ trie_val_t val = NULL;
+ int res = trie_del_first(session->tasks, NULL, NULL, &val);
+ if (res != kr_ok()) {
+ val = NULL;
+ } else if (deref) {
+ worker_task_unref(val);
+ }
+ return (struct qr_task *)val;
+}
+struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id)
+{
+ trie_t *t = session->tasks;
+ assert(session->sflags.outgoing);
+ struct qr_task *ret = NULL;
+ const char *key = (const char *)&msg_id;
+ size_t key_len = sizeof(msg_id);
+ trie_val_t val;
+ int res = trie_del(t, key, key_len, &val);
+ if (res == kr_ok()) {
+ ret = val;
+ assert(worker_task_numrefs(ret) > 1);
+ worker_task_unref(ret);
}
return ret;
}
-struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id)
+struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id)
{
+ trie_t *t = session->tasks;
+ assert(session->sflags.outgoing);
struct qr_task *ret = NULL;
- const session_tasklist_t *tasklist = &session->tasks;
- for (size_t i = 0; i < tasklist->len; ++i) {
- struct qr_task *task = tasklist->at[i];
- knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
- uint16_t task_msg_id = knot_wire_get_id(pktbuf->wire);
- if (task_msg_id == msg_id) {
- ret = task;
- break;
- }
+ trie_val_t *val = trie_get_try(t, (char *)&msg_id, sizeof(msg_id));
+ if (val) {
+ ret = *val;
}
return ret;
}
return session->handle;
}
+struct session *session_get(uv_handle_t *h)
+{
+ return h->data;
+}
+
struct session *session_new(uv_handle_t *handle)
{
if (!handle) {
return NULL;
}
+ queue_init(session->waiting);
+ session->tasks = trie_create(NULL);
if (handle->type == UV_TCP) {
uint8_t *wire_buf = malloc(KNOT_WIRE_MAX_PKTSIZE);
if (!wire_buf) {
size_t session_tasklist_get_len(const struct session *session)
{
- return session->tasks.len;
+ return trie_weight(session->tasks);
}
size_t session_waitinglist_get_len(const struct session *session)
{
- return session->waiting.len;
+ return queue_len(session->waiting);
}
bool session_tasklist_is_empty(const struct session *session)
session->sflags.has_tls = has_tls;
}
-struct qr_task *session_waitinglist_get_first(const struct session *session)
-{
- struct qr_task *t = NULL;
- if (session->waiting.len > 0) {
- t = session->waiting.at[0];
- }
- return t;
-}
-
-struct qr_task *session_tasklist_get_first(const struct session *session)
-{
- struct qr_task *t = NULL;
- if (session->tasks.len > 0) {
- t = session->tasks.at[0];
- }
- return t;
-}
-
void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt)
{
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- session_tasklist_del(session, task);
- array_del(session->waiting, 0);
+ while (!session_waitinglist_is_empty(session)) {
+ struct qr_task *task = session_waitinglist_pop(session, false);
assert(worker_task_numrefs(task) > 1);
if (increase_timeout_cnt) {
worker_task_timeout_inc(task);
void session_waitinglist_finalize(struct session *session, int status)
{
- while (session->waiting.len > 0) {
- struct qr_task *t = session->waiting.at[0];
- array_del(session->waiting, 0);
- session_tasklist_del(session, t);
+ while (!session_waitinglist_is_empty(session)) {
+ struct qr_task *t = session_waitinglist_pop(session, false);
if (session->sflags.outgoing) {
worker_task_finalize(t, status);
} else {
void session_tasklist_finalize(struct session *session, int status)
{
- while (session->tasks.len > 0) {
- struct qr_task *t = session->tasks.at[0];
- array_del(session->tasks, 0);
+ while (session_tasklist_get_len(session) > 0) {
+ struct qr_task *t = session_tasklist_del_first(session, false);
+ assert(worker_task_numrefs(t) > 0);
if (session->sflags.outgoing) {
worker_task_finalize(t, status);
} else {
}
/* TCP-specific code now. */
if (s->handle->type != UV_TCP) abort();
- session_waitinglist_del(s, task);
- session_tasklist_del(s, task);
int res = 0;
session_close(s);
}
}
-
static int qr_task_step(struct qr_task *task,
const struct sockaddr *packet_source,
knot_pkt_t *packet);
-static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
+static int qr_task_send(struct qr_task *task, struct session *session,
struct sockaddr *addr, knot_pkt_t *pkt);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
* socktype is SOCK_* */
-static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family)
+static uv_handle_t *ioreq_spawn(struct worker_ctx *worker, int socktype, sa_family_t family)
{
bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
&& (family == AF_INET || family == AF_INET6);
return NULL;
}
- if (task->pending_count >= MAX_PENDING) {
- return NULL;
- }
/* Create connection for iterative query */
- struct worker_ctx *worker = task->ctx->worker;
uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
if (!handle) {
}
}
- /* Set current handle as a subrequest type. */
- struct session *session = handle->data;
- if (ret == 0) {
- session_flags(session)->outgoing = true;
- ret = session_tasklist_add(session, task);
- }
- if (ret < 0) {
+ if (ret != 0) {
io_deinit(handle);
free(handle);
return NULL;
}
+
+ /* Set current handle as a subrequest type. */
+ struct session *session = handle->data;
+ session_flags(session)->outgoing = true;
/* Connect or issue query datagram */
- task->pending[task->pending_count] = handle;
- task->pending_count += 1;
return handle;
}
assert (session_get_handle(source_s) == handle);
}
}
- if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) {
- session_waitinglist_del(s, task);
- if (session_flags(s)->closing) {
- return status;
- }
- /* Finalize the task, if any errors.
- * We can't add it to the end of waiting list for retrying
- * since it may lead endless loop in some circumstances
- * (for instance: tls; send->tls_push->too many non-critical errors->
- * on_send with nonzero status->re-add to waiting->send->etc).*/
- if (status != 0) {
- if (outgoing) {
- qr_task_finalize(task, KR_STATE_FAIL);
- } else {
- assert(task->ctx->source.session == s);
- task->ctx->source.session = NULL;
- }
- session_tasklist_del(s, task);
- }
- struct qr_task *waiting_task = session_waitinglist_get_first(s);
- if (waiting_task) {
- struct sockaddr *peer = session_get_peer(s);
- knot_pkt_t *pkt = waiting_task->pktbuf;
- int ret = qr_task_send(waiting_task, handle, peer, pkt);
- if (ret != kr_ok()) {
- session_tasks_finalize(s, KR_STATE_FAIL);
- session_close(s);
- return status;
- }
- }
- }
if (!session_flags(s)->closing) {
io_start_read(handle); /* Start reading new query */
}
static void on_send(uv_udp_send_t *req, int status)
{
struct qr_task *task = req->data;
- qr_task_on_send(task, (uv_handle_t *)(req->handle), status);
+ uv_handle_t *h = (uv_handle_t *)req->handle;
+ qr_task_on_send(task, h, status);
qr_task_unref(task);
free(req);
}
-static void on_task_write(uv_write_t *req, int status)
+
+static void on_write(uv_write_t *req, int status)
{
struct qr_task *task = req->data;
- qr_task_on_send(task, (uv_handle_t *)(req->handle), status);
+ uv_handle_t *h = (uv_handle_t *)req->handle;
+ qr_task_on_send(task, h, status);
qr_task_unref(task);
free(req);
}
-static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
+static int qr_task_send(struct qr_task *task, struct session *session,
struct sockaddr *addr, knot_pkt_t *pkt)
{
- if (!handle) {
- return qr_task_on_send(task, handle, kr_error(EIO));
+ if (!session) {
+ return qr_task_on_send(task, NULL, kr_error(EIO));
}
int ret = 0;
struct request_ctx *ctx = task->ctx;
struct kr_request *req = &ctx->req;
+ uv_handle_t *handle = session_get_handle(session);
+ assert(handle && handle->data == session);
const bool is_stream = handle->type == UV_TCP;
if (!is_stream && handle->type != UV_UDP) abort();
+ if (addr == NULL) {
+ addr = session_get_peer(session);
+ }
+
+ if (pkt == NULL) {
+ pkt = worker_task_get_pktbuf(task);
+ }
+
+ if (session_flags(session)->outgoing) {
+ size_t try_limit = session_tasklist_get_len(session) + 1;
+ uint16_t msg_id = knot_wire_get_id(pkt->wire);
+ int try_count = 0;
+ while (session_tasklist_find_msgid(session, msg_id) &&
+ try_count <= try_limit) {
+ ++msg_id;
+ ++try_count;
+ }
+ if (try_count > try_limit) {
+ return qr_task_on_send(task, handle, kr_error(EIO));
+ }
+ worker_task_pkt_set_msgid(task, msg_id);
+ }
+
if (knot_wire_get_qr(pkt->wire) == 0) {
/*
* Query must be finalised using destination address before
/* Pending ioreq on current task */
qr_task_ref(task);
+ struct worker_ctx *worker = ctx->worker;
/* Send using given protocol */
- struct session *session = handle->data;
assert(!session_flags(session)->closing);
if (session_flags(session)->has_tls) {
uv_write_t *write_req = (uv_write_t *)ioreq;
write_req->data = task;
- ret = tls_write(write_req, handle, pkt, &on_task_write);
+ ret = tls_write(write_req, handle, pkt, &on_write);
} else if (handle->type == UV_UDP) {
uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
{ (char *)pkt->wire, pkt->size }
};
write_req->data = task;
- ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_task_write);
+ ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
} else {
assert(false);
}
- struct worker_ctx *worker = ctx->worker;
if (ret == 0) {
+ if (session_flags(session)->outgoing) {
+ session_tasklist_add(session, task);
+ }
if (worker->too_many_open &&
worker->stats.rconcurrent <
worker->rconcurrent_highwatermark - 10) {
if (ret == UV_EMFILE) {
worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
+ ret = kr_error(UV_EMFILE);
}
}
return ret;
}
-static int session_next_waiting_send(struct session *session)
-{
- int ret = kr_ok();
- if (!session_waitinglist_is_empty(session)) {
- struct sockaddr *peer = session_get_peer(session);
- struct qr_task *task = session_waitinglist_get_first(session);
- uv_handle_t *handle = session_get_handle(session);
- ret = qr_task_send(task, handle, peer, task->pktbuf);
- }
- return ret;
-}
-
static int session_tls_hs_cb(struct session *session, int status)
{
assert(session_flags(session)->outgoing);
ret = worker_add_tcp_connected(worker, peer, session);
if (deletion_res == kr_ok() && ret == kr_ok()) {
- ret = session_next_waiting_send(session);
+ while (!session_waitinglist_is_empty(session)) {
+ struct qr_task *t = session_waitinglist_get(session);
+ ret = qr_task_send(t, session, NULL, NULL);
+ if (ret != 0) {
+ break;
+ }
+ session_waitinglist_pop(session, true);
+ }
} else {
ret = kr_error(EINVAL);
}
uv_stream_t *handle = req->handle;
struct session *session = handle->data;
struct sockaddr *peer = session_get_peer(session);
+ free(req);
assert(session_flags(session)->outgoing);
if (status == UV_ECANCELED) {
worker_del_tcp_waiting(worker, peer);
assert(session_is_empty(session) && session_flags(session)->closing);
- free(req);
return;
}
if (session_flags(session)->closing) {
worker_del_tcp_waiting(worker, peer);
assert(session_is_empty(session));
- free(req);
return;
}
if (status != 0) {
worker_del_tcp_waiting(worker, peer);
- session_waitinglist_retry(session, false);
assert(session_tasklist_is_empty(session));
- free(req);
+ session_waitinglist_retry(session, false);
session_close(session);
return;
}
* something gone wrong */
session_waitinglist_finalize(session, KR_STATE_FAIL);
assert(session_tasklist_is_empty(session));
- free(req);
session_close(session);
return;
}
}
- struct qr_task *task = session_waitinglist_get_first(session);
+ struct qr_task *task = session_waitinglist_get(session);
struct kr_query *qry = task_get_last_pending_query(task);
WITH_VERBOSE (qry) {
struct sockaddr *peer = session_get_peer(session);
struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
if (ret == kr_error(EAGAIN)) {
- free(req);
session_start_read(session);
session_timer_start(session, on_tcp_watchdog_timeout,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
} else {
worker_add_tcp_connected(worker, peer, session);
}
-
- if (ret == kr_ok()) {
- ret = session_next_waiting_send(session);
- if (ret == kr_ok()) {
- session_timer_start(session, on_tcp_watchdog_timeout,
- MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
- free(req);
+ while (!session_waitinglist_is_empty(session)) {
+ struct qr_task *t = session_waitinglist_get(session);
+ ret = qr_task_send(t, session, NULL, NULL);
+ if (ret != 0) {
+ assert(session_tasklist_is_empty(session));
+ assert(false);
+ worker_del_tcp_connected(worker, peer);
+ session_waitinglist_finalize(session, KR_STATE_FAIL);
+ session_close(session);
return;
}
+ session_waitinglist_pop(session, true);
}
-
- session_waitinglist_finalize(session, KR_STATE_FAIL);
- assert(session_tasklist_is_empty(session));
- free(req);
- session_close(session);
}
static void on_tcp_connect_timeout(uv_timer_t *timer)
uv_timer_stop(timer);
struct worker_ctx *worker = get_worker();
- assert (session_waitinglist_get_len(session) == session_tasklist_get_len(session));
+ assert (session_tasklist_is_empty(session));
struct sockaddr *peer = session_get_peer(session);
worker_del_tcp_waiting(worker, peer);
- struct qr_task *task = session_waitinglist_get_first(session);
+ struct qr_task *task = session_waitinglist_get(session);
struct kr_query *qry = task_get_last_pending_query(task);
WITH_VERBOSE (qry) {
char peer_str[INET6_ADDRSTRLEN];
static void on_tcp_watchdog_timeout(uv_timer_t *timer)
{
struct session *session = timer->data;
- struct worker_ctx *worker = timer->loop->data;
- struct sockaddr *peer = session_get_peer(session);
assert(session_flags(session)->outgoing);
+ assert(!session_flags(session)->closing);
+
+ struct worker_ctx *worker = timer->loop->data;
+ struct sockaddr *peer = session_get_peer(session);
uv_timer_stop(timer);
if (!choice) {
return ret;
}
- ret = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
+ if (task->pending_count >= MAX_PENDING) {
+ return ret;
+ }
+ ret = ioreq_spawn(task->ctx->worker, SOCK_DGRAM, choice->sin6_family);
if (!ret) {
return ret;
}
struct sockaddr *peer = session_get_peer(session);
assert (peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
memcpy(peer, addr, kr_sockaddr_len(addr));
- if (qr_task_send(task, ret, (struct sockaddr *)choice,
- task->pktbuf) == 0) {
+ if (qr_task_send(task, session, (struct sockaddr *)choice,
+ task->pktbuf) != 0) {
+ session_close(session);
+ ret = NULL;
+ } else {
+ task->pending[task->pending_count] = session_get_handle(session);
+ task->pending_count += 1;
task->addrlist_turn = (task->addrlist_turn + 1) %
task->addrlist_count; /* Round robin */
}
/* Send back answer */
struct session *source_session = ctx->source.session;
- uv_handle_t *handle = session_get_handle(source_session);
assert(!session_flags(source_session)->closing);
- assert(handle && handle->data == ctx->source.session);
assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
- int res = qr_task_send(task, handle,
+ int res = qr_task_send(task, source_session,
(struct sockaddr *)&ctx->source.addr,
ctx->req.answer);
if (res != kr_ok()) {
(void) qr_task_on_send(task, NULL, kr_error(EIO));
/* Since source session is erroneous detach all tasks. */
while (!session_tasklist_is_empty(source_session)) {
- struct qr_task *t = session_tasklist_get_first(source_session);
+ struct qr_task *t = session_tasklist_del_first(source_session, false);
struct request_ctx *c = t->ctx;
assert(c->source.session == source_session);
c->source.session = NULL;
/* Don't finalize them as there can be other tasks
* waiting for answer to this particular task.
* (ie. task->leading is true) */
- session_tasklist_del_index(source_session, 0);
+ worker_task_unref(t);
}
session_close(source_session);
- } else if (handle->type == UV_TCP && ctx->source.session) {
+ } else if (session_get_handle(source_session)->type == UV_TCP) {
/* Don't try to close source session at least
* retry_interval_for_timeout_timer milliseconds */
- session_timer_restart(ctx->source.session);
+ session_timer_restart(source_session);
}
qr_task_unref(task);
/* Start transmitting */
uv_handle_t *handle = retransmit(task);
if (handle == NULL) {
- return qr_task_step(task, NULL, NULL);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
}
/* Check current query NSLIST */
struct kr_query *qry = array_tail(req->rplan.pending);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- /* There are waiting tasks.
- * It means that connection establishing or data sending
- * is coming right now. */
- /* Task will be notified in on_connect() or qr_task_on_send(). */
- ret = session_waitinglist_add(session, task);
- if (ret < 0) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- ret = session_tasklist_add(session, task);
+ /* Connection is in the list of waiting connections.
+ * It means that connection establishing is coming right now.
+ * Add task to the end of list of waiting tasks..
+ * It will be notified in on_connect() or qr_task_on_send(). */
+ ret = session_waitinglist_push(session, task);
if (ret < 0) {
- session_waitinglist_del(session, task);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- assert(task->pending_count == 0);
- task->pending[task->pending_count] = session_get_handle(session);
- task->pending_count += 1;
} else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
/* Connection has been already established */
assert(session_flags(session)->outgoing);
if (session_flags(session)->closing) {
- session_tasklist_del(session, task);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
- session_tasklist_del(session, task);
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
+ session_timer_stop(session);
+ while (!session_waitinglist_is_empty(session)) {
+ struct qr_task *t = session_waitinglist_get(session);
+ ret = qr_task_send(t, session, NULL, NULL);
+ if (ret != 0) {
+ session_waitinglist_finalize(session, KR_STATE_FAIL);
+ session_tasklist_finalize(session, KR_STATE_FAIL);
+ subreq_finalize(task, packet_source, packet);
+ session_close(session);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ session_waitinglist_pop(session, true);
}
- /* will be removed in qr_task_on_send() */
- ret = session_waitinglist_add(session, task);
- if (ret < 0) {
- session_tasklist_del(session, task);
+ ret = qr_task_send(task, session, NULL, NULL);
+ if (ret != 0 /* && ret != kr_error(EMFILE) */) {
+ session_tasklist_finalize(session, KR_STATE_FAIL);
subreq_finalize(task, packet_source, packet);
+ session_close(session);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- ret = session_tasklist_add(session, task);
+ ret = session_timer_start(session, on_tcp_watchdog_timeout,
+ MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
if (ret < 0) {
- session_waitinglist_del(session, task);
- session_tasklist_del(session, task);
+ session_tasklist_finalize(session, KR_STATE_FAIL);
subreq_finalize(task, packet_source, packet);
+ session_close(session);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- if (session_waitinglist_get_len(session) == 1) {
- ret = qr_task_send(task, session_get_handle(session),
- session_get_peer(session), task->pktbuf);
- if (ret < 0) {
- session_waitinglist_del(session, task);
- session_tasklist_del(session, task);
- session_tasklist_finalize(session, KR_STATE_FAIL);
- subreq_finalize(task, packet_source, packet);
- session_close(session);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- if (session_tasklist_get_len(session) == 1) {
- session_timer_stop(session);
- ret = session_timer_start(session, on_tcp_watchdog_timeout,
- MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
- }
- if (ret < 0) {
- session_waitinglist_del(session, task);
- session_tasklist_del(session, task);
- session_tasklist_finalize(session, KR_STATE_FAIL);
- subreq_finalize(task, packet_source, packet);
- session_close(session);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- }
+
assert(task->pending_count == 0);
task->pending[task->pending_count] = session_get_handle(session);
task->pending_count += 1;
if (!conn) {
return qr_task_step(task, NULL, NULL);
}
- uv_handle_t *client = ioreq_spawn(task, sock_type,
+ uv_handle_t *client = ioreq_spawn(worker, sock_type,
addr->sa_family);
if (!client) {
free(conn);
session = client->data;
ret = worker_add_tcp_waiting(ctx->worker, addr, session);
if (ret < 0) {
- session_tasklist_del(session, task);
- free(conn);
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- /* will be removed in qr_task_on_send() */
- ret = session_waitinglist_add(session, task);
- if (ret < 0) {
- session_tasklist_del(session, task);
- worker_del_tcp_waiting(ctx->worker, addr);
free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
assert(session_tls_get_client_ctx(session) == NULL);
struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
if (!tls_ctx) {
- session_tasklist_del(session, task);
- session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
free(conn);
subreq_finalize(task, packet_source, packet);
ret = session_timer_start(session, on_tcp_connect_timeout,
KR_CONN_RTT_MAX, 0);
if (ret != 0) {
- session_tasklist_del(session, task);
- session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- struct qr_task *task = session_waitinglist_get_first(session);
struct kr_query *qry = task_get_last_pending_query(task);
WITH_VERBOSE (qry) {
char peer_str[INET6_ADDRSTRLEN];
if (uv_tcp_connect(conn, (uv_tcp_t *)client,
addr , on_connect) != 0) {
session_timer_stop(session);
- session_tasklist_del(session, task);
- session_waitinglist_del(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
free(conn);
subreq_finalize(task, packet_source, packet);
return qr_task_step(task, NULL, NULL);
}
+
+ /* will be removed in on_connect() or qr_task_on_send() */
+ ret = session_waitinglist_push(session, task);
+ if (ret < 0) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
}
}
return kr_ok();
/* Parse packet */
int ret = parse_packet(query);
+ bool is_query = (knot_wire_get_qr(query->wire) == 0);
+ /* Ignore badly formed queries. */
+ if (!query ||
+ (ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
+ (is_query == session_flags(session)->outgoing)) {
+ if (query) worker->stats.dropped += 1;
+ return kr_error(EILSEQ);
+ }
+
/* Start new task on listening sockets,
* or resume if this is subrequest */
struct qr_task *task = NULL;
struct sockaddr *addr = NULL;
if (!session_flags(session)->outgoing) { /* request from a client */
- /* Ignore badly formed queries. */
- if (!query || (ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
- knot_wire_get_qr(query->wire)) {
- if (query) worker->stats.dropped += 1;
- return kr_error(EILSEQ);
- }
struct request_ctx *ctx = request_create(worker, handle,
session_get_peer(session));
if (!ctx) {
return kr_error(ENOMEM);
}
} else if (query) { /* response from upstream */
- if ((ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
- !knot_wire_get_qr(query->wire)) {
- /* Ignore badly formed responses. */
- return kr_error(EILSEQ);
- }
- task = session_tasklist_find(session, knot_wire_get_id(query->wire));
+ task = session_tasklist_del_msgid(session, knot_wire_get_id(query->wire));
if (task == NULL) {
return kr_error(ENOENT);
}
uv_handle_t *handle = session_get_handle(session);
struct worker_ctx *worker = handle->loop->data;
struct sockaddr *peer = session_get_peer(session);
+
worker_del_tcp_connected(worker, peer);
session_flags(session)->connected = false;
assert(session_tasklist_get_len(session) >= session_waitinglist_get_len(session));
while (!session_waitinglist_is_empty(session)) {
- struct qr_task *task = session_waitinglist_get_first(session);
- session_waitinglist_del_index(session, 0);
+ struct qr_task *task = session_waitinglist_pop(session, false);
assert(task->refs > 1);
session_tasklist_del(session, task);
if (session_flags(session)->outgoing) {
assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
}
+ worker_task_unref(task);
}
while (!session_tasklist_is_empty(session)) {
- struct qr_task *task = session_tasklist_get_first(session);
- session_tasklist_del_index(session, 0);
+ struct qr_task *task = session_tasklist_del_first(session, false);
if (session_flags(session)->outgoing) {
if (task->ctx->req.options.FORWARD) {
struct kr_request *req = &task->ctx->req;
assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
}
+ worker_task_unref(task);
}
session_close(session);
return kr_ok();
ctx->source.session = session;
}
+uint16_t worker_task_pkt_get_msgid(struct qr_task *task)
+{
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ uint16_t msg_id = knot_wire_get_id(pktbuf->wire);
+ return msg_id;
+}
+
+void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
+{
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ knot_wire_set_id(pktbuf->wire, msgid);
+}
+
/** Reserve worker buffers */
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{