return udp_bind_finalize((uv_handle_t *)handle);
}
-static void tcp_timeout_trigger(uv_timer_t *timer)
+void tcp_timeout_trigger(uv_timer_t *timer)
{
struct session *s = timer->data;
- assert(!session_flags(s)->outgoing);
+ assert(!session_flags(s)->closing);
+ assert(session_waitinglist_is_empty(s));
+
+ struct worker_ctx *worker = timer->loop->data;
+
+ if (!session_tasklist_is_empty(s)) {
+ int finalized = session_tasklist_finalize_expired(s);
+ worker->stats.timeout += finalized;
+ /* session_tasklist_finalize_expired() may call worker_task_finalize().
+ * If session is a source session and there were IO errors,
+ * worker_task_finalize() can filnalize all tasks and close session. */
+ if (session_flags(s)->closing) {
+ return;
+ }
+
+ }
if (!session_tasklist_is_empty(s)) {
- uv_timer_again(timer);
- } else if (!session_flags(s)->closing) {
uv_timer_stop(timer);
- session_close(s);
+ session_timer_start(s, tcp_timeout_trigger,
+ KR_RESOLVE_TIME_LIMIT / 2,
+ KR_RESOLVE_TIME_LIMIT / 2);
+ } else {
+ const struct engine *engine = worker->engine;
+ const struct network *net = &engine->net;
+ uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
+ uint64_t last_activity = session_last_input_activity(s);
+ uint64_t idle_time = kr_now() - last_activity;
+ if (idle_time < idle_in_timeout) {
+ idle_in_timeout -= idle_time;
+ uv_timer_stop(timer);
+ session_timer_start(s, tcp_timeout_trigger,
+ idle_in_timeout, idle_in_timeout);
+ } else {
+ session_close(s);
+ }
}
}
if (ret < 0) {
/* An error has occurred, close the session. */
worker_end_tcp(s);
- } else if (ret > 0 && !session_flags(s)->closing) {
- /* Connection spawned at least one request
- * or
- * valid answer has been received from upstream.
- * Reset deadline for next query.
- * https://tools.ietf.org/html/rfc7766#section-6.2.3
- */
- session_timer_restart(s);
}
session_wirebuf_compress(s);
mp_flush(worker->pkt_pool.ctx);
int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr);
int tcp_bindfd(uv_tcp_t *handle, int fd);
int tcp_bindfd_tls(uv_tcp_t *handle, int fd);
+void tcp_timeout_trigger(uv_timer_t *timer);
/** Initialize the handle, incl. ->data = struct session * instance.
* \param type = SOCK_*
* that exists between remote counterpart and a local socket.
*/
struct session {
- struct session_flags sflags; /**< miscellaneous flags. */
- union inaddr peer; /**< address of peer; is not set for client's UDP sessions. */
- uv_handle_t *handle; /**< libuv handle for IO operations. */
- uv_timer_t timeout; /**< libuv handle for timer. */
+ struct session_flags sflags; /**< miscellaneous flags. */
+ union inaddr peer; /**< address of peer; is not set for client's UDP sessions. */
+ uv_handle_t *handle; /**< libuv handle for IO operations. */
+ uv_timer_t timeout; /**< libuv handle for timer. */
- struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */
+ struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */
struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
- trie_t *tasks; /**< list of tasks assotiated with given session. */
+ trie_t *tasks; /**< list of tasks assotiated with given session. */
queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */
- uint8_t *wire_buf; /**< Buffer for DNS message. */
- ssize_t wire_buf_size; /**< Buffer size. */
- ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */
- ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
+ uint8_t *wire_buf; /**< Buffer for DNS message. */
+ ssize_t wire_buf_size; /**< Buffer size. */
+ ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */
+ ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
+ uint64_t last_input_activity; /**< Either creatoion time or time of peer's last activity */
};
static void on_session_close(uv_handle_t *handle)
worker_task_ref(task);
} else if (*v != task) {
assert(false);
- return kr_error(ENOMEM);
+ return kr_error(EINVAL);
}
return kr_ok();
}
trie_val_t val;
int res = trie_del(t, key, key_len, &val);
if (res == kr_ok()) {
- ret = val;
- assert(worker_task_numrefs(ret) > 1);
- worker_task_unref(ret);
+ if (worker_task_numrefs(val) > 1) {
+ ret = val;
+ }
+ worker_task_unref(val);
}
return ret;
}
session->handle = handle;
handle->data = session;
session->timeout.data = session;
+ session_touch(session);
return session;
}
{
while (!session_waitinglist_is_empty(session)) {
struct qr_task *task = session_waitinglist_pop(session, false);
- assert(worker_task_numrefs(task) > 1);
if (increase_timeout_cnt) {
worker_task_timeout_inc(task);
}
- worker_task_unref(task);
worker_task_step(task, NULL, NULL);
+ worker_task_unref(task);
}
}
{
while (!session_waitinglist_is_empty(session)) {
struct qr_task *t = session_waitinglist_pop(session, false);
- if (session->sflags.outgoing) {
- worker_task_finalize(t, status);
- } else {
- struct request_ctx *ctx = worker_task_get_request(t);
- assert(worker_request_get_source_session(ctx) == session);
- worker_request_set_source_session(ctx, NULL);
- }
+ worker_task_finalize(t, status);
worker_task_unref(t);
}
}
while (session_tasklist_get_len(session) > 0) {
struct qr_task *t = session_tasklist_del_first(session, false);
assert(worker_task_numrefs(t) > 0);
- if (session->sflags.outgoing) {
- worker_task_finalize(t, status);
- } else {
- struct request_ctx *ctx = worker_task_get_request(t);
- assert(worker_request_get_source_session(ctx) == session);
- worker_request_set_source_session(ctx, NULL);
- }
+ worker_task_finalize(t, status);
worker_task_unref(t);
}
}
-void session_tasks_finalize(struct session *session, int status)
+int session_tasklist_finalize_expired(struct session *session)
{
- session_waitinglist_finalize(session, status);
- session_tasklist_finalize(session, status);
+ int ret = 0;
+ queue_t(struct qr_task *) q;
+ uint64_t now = kr_now();
+ trie_t *t = session->tasks;
+ trie_it_t *it;
+ queue_init(q);
+ for (it = trie_it_begin(t); !trie_it_finished(it); trie_it_next(it)) {
+ trie_val_t *v = trie_it_val(it);
+ struct qr_task *task = (struct qr_task *)*v;
+ if ((now - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
+ queue_push(q, task);
+ worker_task_ref(task);
+ }
+ }
+ trie_it_free(it);
+
+ struct qr_task *task = NULL;
+ uint16_t msg_id = 0;
+ char *key = (char *)&task;
+ int32_t keylen = sizeof(struct qr_task *);
+ if (session->sflags.outgoing) {
+ key = (char *)&msg_id;
+ keylen = sizeof(msg_id);
+ }
+ while (queue_len(q) > 0) {
+ task = queue_head(q);
+ if (session->sflags.outgoing) {
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ msg_id = knot_wire_get_id(pktbuf->wire);
+ }
+ int res = trie_del(t, key, keylen, NULL);
+ if (!worker_task_finished(task)) {
+ /* task->pending_count must be zero,
+ * but there are can be followers,
+ * so run worker_task_subreq_finalize() to ensure retrying
+ * for all the followers. */
+ worker_task_subreq_finalize(task);
+ worker_task_finalize(task, KR_STATE_FAIL);
+ }
+ if (res == KNOT_EOK) {
+ worker_task_unref(task);
+ }
+ queue_pop(q);
+ worker_task_unref(task);
+ ++ret;
+ }
+
+ queue_deinit(q);
+ return ret;
}
int session_timer_start(struct session *session, uv_timer_cb cb,
return ret;
}
-static void on_session_idle_timeout(uv_timer_t *timer)
+void session_kill_ioreq(struct session *s, struct qr_task *task)
{
- struct session *s = timer->data;
- assert(s);
- uv_timer_stop(timer);
- if (s->sflags.closing) {
+ if (!s) {
return;
}
- /* session was not in use during timer timeout
- * remove it from connection list and close
- */
- assert(session_is_empty(s));
- session_close(s);
-}
-
-void session_kill_ioreq(struct session *s, struct qr_task *task)
-{
- assert(s && s->sflags.outgoing && s->handle);
+ assert(s->sflags.outgoing && s->handle);
if (s->sflags.closing) {
return;
}
+ session_tasklist_del(s, task);
if (s->handle->type == UV_UDP) {
- uv_timer_stop(&s->timeout);
- session_tasklist_del(s, task);
assert(session_tasklist_is_empty(s));
session_close(s);
return;
}
- /* TCP-specific code now. */
- if (s->handle->type != UV_TCP) abort();
-
- int res = 0;
-
- const struct sockaddr *peer = &s->peer.ip;
- if (peer->sa_family != AF_UNSPEC && session_is_empty(s) && !s->sflags.closing) {
- assert(peer->sa_family == AF_INET || peer->sa_family == AF_INET6);
- res = 1;
- if (s->sflags.connected) {
- /* This is outbound TCP connection which can be reused.
- * Close it after timeout */
- s->timeout.data = s;
- uv_timer_stop(&s->timeout);
- res = uv_timer_start(&s->timeout, on_session_idle_timeout,
- KR_CONN_RTT_MAX, 0);
- }
- }
+}
- if (res != 0) {
- /* if any errors, close the session immediately */
- session_close(s);
- }
+/** Update timestamp */
+void session_touch(struct session *s)
+{
+ s->last_input_activity = kr_now();
+}
+
+uint64_t session_last_input_activity(struct session *s)
+{
+ return s->last_input_activity;
}
struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
/** Finalize all tasks in the list. */
void session_tasklist_finalize(struct session *session, int status);
+/** Finalize all expired tasks in the list. */
+int session_tasklist_finalize_expired(struct session *session);
/** Both of task lists (associated & waiting). */
/** Check if empty. */
bool session_is_empty(const struct session *session);
-/** Finalize all tasks. */
-void session_tasks_finalize(struct session *session, int status);
/** Get pointer to session flags */
struct session_flags *session_flags(struct session *session);
/** Get peer address. */
int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
void session_kill_ioreq(struct session *s, struct qr_task *task);
+/** Update timestamp */
+void session_touch(struct session *s);
+uint64_t session_last_input_activity(struct session *s);
uint32_t refs;
bool finished : 1;
bool leading : 1;
+ uint64_t creation_time;
};
do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
-#define qr_valid_handle(task, checked) \
- (!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))
/** @internal get key for tcp session
* @note kr_straddr() return pointer to static string
static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
const struct sockaddr *addr);
static void on_tcp_connect_timeout(uv_timer_t *timer);
-static void on_tcp_watchdog_timeout(uv_timer_t *timer);
/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
ctx->task = task;
/* Make the primary reference to task. */
qr_task_ref(task);
+ task->creation_time = kr_now();
ctx->worker->stats.concurrent += 1;
return task;
}
assert(ctx);
- /* Process outbound session. */
- struct session *s = ctx->source.session;
struct worker_ctx *worker = ctx->worker;
- /* Process source session. */
- if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
- !session_flags(s)->closing && session_flags(s)->throttled) {
- uv_handle_t *handle = session_get_handle(s);
- /* Start reading again if the session is throttled and
- * the number of outgoing requests is below watermark. */
- if (handle) {
- io_start_read(handle);
- session_flags(s)->throttled = false;
- }
- }
-
- task->ctx = NULL;
-
if (ctx->task == NULL) {
request_free(ctx);
}
struct session *s = ctx->source.session;
if (s) {
assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
+ ctx->source.session = NULL;
session_tasklist_del(s, task);
}
session_close(session);
} else {
session_timer_stop(session);
- session_timer_start(session, on_tcp_watchdog_timeout,
+ session_timer_start(session, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
return kr_ok();
return;
}
- session_timer_stop(session);
-
if (status != 0) {
worker_del_tcp_waiting(worker, peer);
assert(session_tasklist_is_empty(session));
struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
if (ret == kr_error(EAGAIN)) {
- session_timer_start(session, on_tcp_watchdog_timeout,
+ session_timer_stop(session);
+ session_timer_start(session, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
return;
}
struct qr_task *t = session_waitinglist_get(session);
ret = qr_task_send(t, session, NULL, NULL);
if (ret != 0) {
- assert(session_tasklist_is_empty(session));
worker_del_tcp_connected(worker, peer);
session_waitinglist_finalize(session, KR_STATE_FAIL);
+ session_tasklist_finalize(session, KR_STATE_FAIL);
session_close(session);
return;
}
session_waitinglist_pop(session, true);
}
- session_timer_start(session, on_tcp_watchdog_timeout,
+ session_timer_stop(session);
+ session_timer_start(session, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
session_close(session);
}
-static void on_tcp_watchdog_timeout(uv_timer_t *timer)
-{
- struct session *session = timer->data;
-
- assert(session_flags(session)->outgoing);
- assert(!session_flags(session)->closing);
-
- struct worker_ctx *worker = timer->loop->data;
- struct sockaddr *peer = session_get_peer(session);
-
- uv_timer_stop(timer);
-
- if (session_flags(session)->has_tls) {
- worker_del_tcp_waiting(worker, peer);
- }
-
- worker_del_tcp_connected(worker, peer);
- worker->stats.timeout += session_waitinglist_get_len(session);
- session_waitinglist_finalize(session, KR_STATE_FAIL);
- worker->stats.timeout += session_tasklist_get_len(session);
- session_tasklist_finalize(session, KR_STATE_FAIL);
- session_close(session);
-}
-
/* This is called when I/O timeouts */
static void on_udp_timeout(uv_timer_t *timer)
{
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
+ if (!task || task->finished) {
+ return;
+ }
/* Close pending timer */
ioreq_kill_pending(task);
/* Clear from outgoing table. */
worker_task_unref(t);
}
session_close(source_session);
- } else if (session_get_handle(source_session)->type == UV_TCP) {
- /* Don't try to close source session at least
- * retry_interval_for_timeout_timer milliseconds */
- session_timer_restart(source_session);
}
qr_task_unref(task);
}
} else {
assert (sock_type == SOCK_STREAM);
+ assert(task->pending_count == 0);
const struct sockaddr *addr =
packet_source ? packet_source : task->addrlist;
if (addr->sa_family == AF_UNSPEC) {
+ /* task->pending_count is zero, but there are can be followers */
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
return qr_task_finalize(task, KR_STATE_FAIL);
}
- session_timer_stop(session);
while (!session_waitinglist_is_empty(session)) {
struct qr_task *t = session_waitinglist_get(session);
ret = qr_task_send(t, session, NULL, NULL);
session_close(session);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- ret = session_timer_start(session, on_tcp_watchdog_timeout,
- MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
- if (ret < 0) {
- session_tasklist_finalize(session, KR_STATE_FAIL);
- subreq_finalize(task, packet_source, packet);
- session_close(session);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
-
- assert(task->pending_count == 0);
- task->pending[task->pending_count] = session_get_handle(session);
- task->pending_count += 1;
} else {
/* Make connection */
uv_connect_t *conn = malloc(sizeof(uv_connect_t));
}
assert(uv_is_closing(session_get_handle(session)) == false);
+ /* Packet was successfully parsed.
+ * Task was created (found). */
+ session_touch(session);
/* Consume input and produce next message */
return qr_task_step(task, addr, query);
}
tls_set_hs_state(&tls_ctx->c, TLS_HS_NOT_STARTED);
}
- assert(session_tasklist_get_len(session) >= session_waitinglist_get_len(session));
while (!session_waitinglist_is_empty(session)) {
struct qr_task *task = session_waitinglist_pop(session, false);
assert(task->refs > 1);
q->id = msgid;
}
+uint64_t worker_task_creation_time(struct qr_task *task)
+{
+ return task->creation_time;
+}
+
+void worker_task_subreq_finalize(struct qr_task *task)
+{
+ subreq_finalize(task, NULL, NULL);
+}
+
+bool worker_task_finished(struct qr_task *task)
+{
+ return task->finished;
+}
/** Reserve worker buffers */
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
+uint64_t worker_task_creation_time(struct qr_task *task);
+void worker_task_subreq_finalize(struct qr_task *task);
+bool worker_task_finished(struct qr_task *task);
/** @cond internal */