static enum protolayer_cb_result pl_tcp_unwrap_timeout(
struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
{
- /* TODO - connecting timeout? */
struct session2 *s = ctx->manager->session;
if (kr_fails_assert(!s->closing))
session2_timer_stop(s);
session2_timer_start(s,
KR_RESOLVE_TIME_LIMIT / 2,
- KR_RESOLVE_TIME_LIMIT / 2,
- PROTOLAYER_UNWRAP);
+ KR_RESOLVE_TIME_LIMIT / 2);
} else {
/* Normally it should not happen,
* but better to check if there anything in this list. */
if (idle_time < idle_in_timeout) {
idle_in_timeout -= idle_time;
session2_timer_stop(s);
- session2_timer_start(s,
- idle_in_timeout, idle_in_timeout,
- PROTOLAYER_UNWRAP);
+ session2_timer_start(s, idle_in_timeout, idle_in_timeout);
} else {
struct sockaddr *peer = session2_get_peer(s);
char *peer_str = kr_straddr(peer);
worker_del_tcp_waiting(peer);
worker_del_tcp_connected(peer);
}
- session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), NULL, NULL, NULL);
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
}
}
uv_strerror(nread));
}
worker_end_tcp(s);
- session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_FORCE_CLOSE),
- NULL, NULL, NULL);
+ session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
return;
}
if (uv_accept(master, (uv_stream_t *)client) != 0) {
/* close session, close underlying uv handles and
* deallocate (or return to memory pool) memory. */
- session2_unwrap(s,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
return;
}
int sa_len = sizeof(struct sockaddr_in6);
int ret = uv_tcp_getpeername(client, sa, &sa_len);
if (ret || sa->sa_family == AF_UNSPEC) {
- session2_unwrap(s,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
return;
}
sa = session2_get_sockname(s);
sa_len = sizeof(struct sockaddr_in6);
ret = uv_tcp_getsockname(client, sa, &sa_len);
if (ret || sa->sa_family == AF_UNSPEC) {
- session2_unwrap(s,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
return;
}
// }
// }
//#endif
- session2_timer_start(s, timeout, idle_in_timeout, PROTOLAYER_UNWRAP);
+ session2_timer_start(s, timeout, idle_in_timeout);
io_start_read((uv_handle_t *)client);
}
* session - i.e. layers SHOULD NOT add
* any disconnection ceremony, if
* avoidable. */\
- XX(TIMEOUT) /**< Signal that the session has timed out. */
+ XX(TIMEOUT) /**< Signal that the session has timed out. */\
+ XX(CONNECT) /**< Signal that a connection has been established. */
/** Event type, to be interpreted by a layer. */
enum protolayer_event_type {
struct protolayer_manager *layers; /**< Protocol layers of this session. */
knot_mm_t pool;
- uv_timer_t timer;
- enum protolayer_direction timer_direction; /**< Timeout event direction. */
+ uv_timer_t timer; /**< For session-wide timeout events. */
- trie_t *tasks; /**< list of tasks associated with given session. */
- queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */
+ trie_t *tasks; /**< List of tasks associated with given session. */
+ queue_t(struct qr_task *) waiting; /**< List of tasks waiting for
+ * sending to upstream. */
struct wire_buf wire_buf;
uint64_t last_activity; /**< Time of last IO activity (if any occurs).
* Otherwise session creation time. */
- bool closing : 1;
- bool throttled : 1;
- bool outgoing : 1;
- bool secure : 1; /**< Whether encryption takes place in this session.
+ void *data; /**< Pointer to arbitrary data for callbacks. */
+
+ bool outgoing : 1; /**< True: session's transport is towards an upstream
+ * server. Otherwise, it is towards a client
+ * connected to the resolver. */
+ bool closing : 1; /**< True: session is at the end of its lifecycle and
+ * is going to close. */
+ bool connected : 1; /**< For connection-based sessions. True: connection
+ * is established. */
+ bool throttled : 1; /**< True: session is being rate-limited. */
+ bool secure : 1; /**< True: encryption takes place in this session.
* Layers may use this to determine whether padding
* should be applied. */
};
* needed.
*
* May return `NULL` if no peer is set. */
-uv_handle_t *session2_get_handle(struct session2 *s);
+KR_EXPORT uv_handle_t *session2_get_handle(struct session2 *s);
/** Start the session timer. When the timer ends, a `_TIMEOUT` event is sent
- * in the specified `direction`. */
-int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat,
- enum protolayer_direction direction);
+ * in the `_UNWRAP` direction. */
+int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat);
/** Restart the session timer without changing any of its parameters. */
int session2_timer_restart(struct session2 *s);
int session2_wrap(struct session2 *s, struct protolayer_payload payload,
const void *target, protolayer_finished_cb cb, void *baton);
+/** Convenience function to send the specified event to be processed in the
+ * `unwrap` direction. `data` may be `NULL`.
+ *
+ * See `session2_unwrap` for more information. */
+static inline int session2_event(struct session2 *s,
+ enum protolayer_event_type type, void *data)
+{
+ struct protolayer_event event = {
+ .type = type,
+ .data = { .ptr = data }
+ };
+ return session2_unwrap(s, protolayer_event(event), NULL, NULL, NULL);
+}
+
/** Removes the specified request task from the session's tasklist. The session
* must be outgoing. If the session is UDP, a signal to close is also sent to it. */
void session2_kill_ioreq(struct session2 *session, struct qr_task *task);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
static int worker_add_tcp_waiting(const struct sockaddr* addr, struct session2 *session);
-static void on_tcp_connect_timeout(uv_timer_t *timer);
-static void on_udp_timeout(uv_timer_t *timer);
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt);
/* Create connection for iterative query */
uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
- if (!handle) {
- return NULL;
- }
+ kr_require(handle);
+
int ret = io_create(the_worker->loop, handle, socktype, family, grp, true);
if (ret) {
if (ret == UV_EMFILE) {
req->qsource.comm_flags.tcp = session2_get_handle(session)->type == UV_TCP;
req->qsource.comm_flags.tls = session->secure;
// req->qsource.comm_flags.http = session->has_http; /* TODO */
+ req->qsource.comm_flags.http = false;
req->qsource.flags = req->qsource.comm_flags;
if (proxy) {
if (kr_fails_assert(qry && task->transport))
return status;
size_t timeout = task->transport->timeout;
- int ret = session2_timer_start(s, timeout, 0, PROTOLAYER_UNWRAP);
+ int ret = session2_timer_start(s, timeout, 0);
/* Start next step with timeout, fatal if can't start a timer. */
if (ret != 0) {
subreq_finalize(task, &task->transport->address.ip, task->pktbuf);
peer_str, uv_strerror(status));
}
worker_end_tcp(s);
+ session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
return status;
}
return status;
}
-static void on_send(uv_udp_send_t *req, int status)
-{
- struct qr_task *task = req->data;
- uv_handle_t *h = (uv_handle_t *)req->handle;
- qr_task_on_send(task, h, status);
- qr_task_unref(task);
- free(req);
-}
-
-static void on_write(uv_write_t *req, int status)
-{
- struct qr_task *task = req->data;
- uv_handle_t *h = (uv_handle_t *)req->handle;
- qr_task_on_send(task, h, status);
- qr_task_unref(task);
- free(req);
-}
-
static void qr_task_wrap_finished(int status, struct session2 *session, const void *target, void *baton)
{
struct qr_task *task = baton;
session2_waitinglist_finalize(session, KR_STATE_FAIL);
session2_tasklist_finalize(session, KR_STATE_FAIL);
worker_del_tcp_connected(peer);
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
break;
}
session2_waitinglist_pop(session, true);
}
kr_assert(session2_tasklist_is_empty(session));
session2_waitinglist_retry(session, false);
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return;
}
}
kr_assert(session2_tasklist_is_empty(session));
session2_waitinglist_retry(session, false);
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return;
}
}
kr_assert(session2_tasklist_is_empty(session));
session2_waitinglist_retry(session, false);
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return;
}
* something gone wrong */
session2_waitinglist_finalize(session, KR_STATE_FAIL);
kr_assert(session2_tasklist_is_empty(session));
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return;
}
}
}
/* TODO */
-// session->connected = true;
+ session->connected = true;
session2_start_read(session);
int ret = kr_ok();
}
session2_timer_stop(session);
- session2_timer_start(session,
- MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY,
- PROTOLAYER_UNWRAP);
-}
-
-static void on_tcp_connect_timeout(uv_timer_t *timer)
-{
- struct session2 *session = timer->data;
-
- uv_timer_stop(timer);
- kr_require(the_worker);
-
- kr_assert(session2_tasklist_is_empty(session));
-
- struct sockaddr *peer = session2_get_peer(session);
- worker_del_tcp_waiting(peer);
-
- struct qr_task *task = session2_waitinglist_get(session);
- if (!task) {
- /* Normally shouldn't happen. */
- const char *peer_str = kr_straddr(peer);
- VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
- peer_str ? peer_str : "");
- return;
- }
-
- struct kr_query *qry = task_get_last_pending_query(task);
- if (kr_log_is_debug_qry(WORKER, qry)) {
- const char *peer_str = kr_straddr(peer);
- VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n",
- peer_str ? peer_str : "");
- }
-
- qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
-
- the_worker->stats.timeout += session2_waitinglist_get_len(session);
- session2_waitinglist_retry(session, true);
- kr_assert(session2_tasklist_is_empty(session));
- /* uv_cancel() doesn't support uv_connect_t request,
- * so that we can't cancel it.
- * There still exists possibility of successful connection
- * for this request.
- * So connection callback (on_connect()) must check
- * if connection is in the list of waiting connection.
- * If no, most likely this is timed out connection even if
- * it was successful. */
-}
-
-/* This is called when I/O timeouts */
-static void on_udp_timeout(uv_timer_t *timer)
-{
- struct session2 *session = timer->data;
- kr_assert(session2_get_handle(session)->data == session);
- kr_assert(session2_tasklist_get_len(session) == 1);
- kr_assert(session2_waitinglist_is_empty(session));
-
- uv_timer_stop(timer);
-
- struct qr_task *task = session2_tasklist_get_first(session);
- if (!task)
- return;
-
- if (task->leading && task->pending_count > 0) {
- struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
- qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT);
- }
-
- task->timeouts += 1;
- the_worker->stats.timeout += 1;
- qr_task_step(task, NULL, NULL);
+ session2_timer_start(session, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
static uv_handle_t *transmit(struct qr_task *task)
memcpy(peer, addr, kr_sockaddr_len(addr));
if (qr_task_send(task, session, (struct sockaddr *)choice,
task->pktbuf) != 0) {
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
ret = NULL;
} else {
task->pending[task->pending_count] = session;
* (ie. task->leading is true) */
worker_task_unref(t);
}
- session2_unwrap(source_session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(source_session, PROTOLAYER_EVENT_CLOSE, NULL);
}
qr_task_unref(task);
* close connection to upstream. */
session2_tasklist_finalize(session, KR_STATE_FAIL);
worker_del_tcp_connected(session2_get_peer(session));
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return kr_error(EINVAL);
}
static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr *addr)
{
/* Check if there must be TLS */
+ /* TODO: tls */
// struct tls_client_ctx *tls_ctx = NULL;
// tls_client_param_t *entry = tls_client_param_get(
// the_network->tls_client_params, addr);
int ret = worker_add_tcp_waiting(addr, session);
if (ret < 0) {
free(conn);
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return kr_error(EINVAL);
}
memcpy(peer, addr, kr_sockaddr_len(addr));
/* Start watchdog to catch eventual connection timeout. */
- ret = session2_timer_start(session,
- KR_CONN_RTT_MAX, 0, PROTOLAYER_UNWRAP);
+ ret = session2_timer_start(session, KR_CONN_RTT_MAX, 0);
if (ret != 0) {
worker_del_tcp_waiting(addr);
free(conn);
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return kr_error(EINVAL);
}
session2_timer_stop(session);
worker_del_tcp_waiting(addr);
free(conn);
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_FAILED);
return kr_error(EAGAIN);
}
session2_timer_stop(session);
worker_del_tcp_waiting(addr);
free(conn);
- session2_unwrap(session,
- protolayer_event_nd(PROTOLAYER_EVENT_CLOSE),
- NULL, NULL, NULL);
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return kr_error(EINVAL);
}
struct http_ctx *http_ctx = NULL;
#if ENABLE_DOH2
- /* TODO: devise a way to do this... don't know yet */
+ /* TODO: http. Devise a way to do this... don't know yet */
// http_ctx = session_http_get_server_ctx(session);
//
// /* Badly formed query when using DoH leads to a Bad Request */
worker_del_tcp_waiting(peer);
worker_del_tcp_connected(peer);
+ session->connected = false;
while (!session2_waitinglist_is_empty(session)) {
struct qr_task *task = session2_waitinglist_pop(session, false);
knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
knot_wire_set_id(pktbuf->wire, msgid);
struct kr_query *q = task_get_last_pending_query(task);
- q->id = msgid;
+ if (q)
+ q->id = msgid;
}
uint64_t worker_task_creation_time(struct qr_task *task)
return knot_pkt_new(buf, buf_len, &the_worker->pkt_pool);
}
-static enum protolayer_cb_result pl_dns_dgram_unwrap(
+static enum protolayer_cb_result pl_dns_dgram_unwrap_timeout(
struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
{
struct session2 *session = ctx->manager->session;
+ kr_assert(session2_get_handle(session)->data == session);
+ kr_assert(session2_tasklist_get_len(session) == 1);
+ kr_assert(session2_waitinglist_is_empty(session));
+
+ session2_timer_stop(session);
+
+ struct qr_task *task = session2_tasklist_get_first(session);
+ if (!task)
+ return protolayer_continue(ctx);
+
+ if (task->leading && task->pending_count > 0) {
+ struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
+ qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT);
+ }
+
+ task->timeouts += 1;
+ the_worker->stats.timeout += 1;
+ qr_task_step(task, NULL, NULL);
+
+ return protolayer_continue(ctx);
+}
+
+static enum protolayer_cb_result pl_dns_dgram_unwrap(
+ struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
+ if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
+ return pl_dns_dgram_unwrap_timeout(layer, ctx);
+
/* pass thru */
return protolayer_continue(ctx);
}
+ struct session2 *session = ctx->manager->session;
+
if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
int ret = kr_ok();
for (int i = 0; i < ctx->payload.iovec.cnt; i++) {
return kr_ok();
}
+static enum protolayer_cb_result pl_dns_stream_unwrap_timeout(
+ struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
+ struct session2 *session = ctx->manager->session;
+ if (session->connected || session->closing)
+ return protolayer_continue(ctx);
+
+ /* Connection timeout */
+ session2_timer_stop(session);
+
+ kr_assert(session2_tasklist_is_empty(session));
+
+ struct sockaddr *peer = session2_get_peer(session);
+ worker_del_tcp_waiting(peer);
+
+ struct qr_task *task = session2_waitinglist_get(session);
+ if (!task) {
+ /* Normally shouldn't happen. */
+ const char *peer_str = kr_straddr(peer);
+ VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
+ peer_str ? peer_str : "");
+ return protolayer_continue(ctx);
+ }
+
+ struct kr_query *qry = task_get_last_pending_query(task);
+ if (kr_log_is_debug_qry(WORKER, qry)) {
+ const char *peer_str = kr_straddr(peer);
+ VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n",
+ peer_str ? peer_str : "");
+ }
+
+ qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
+
+ the_worker->stats.timeout += session2_waitinglist_get_len(session);
+ session2_waitinglist_retry(session, true);
+ kr_assert(session2_tasklist_is_empty(session));
+ /* uv_cancel() doesn't support uv_connect_t request,
+ * so that we can't cancel it.
+ * There still exists possibility of successful connection
+ * for this request.
+ * So connection callback (on_connect()) must check
+ * if connection is in the list of waiting connection.
+ * If no, most likely this is timed out connection even if
+ * it was successful. */
+
+ return protolayer_continue(ctx);
+}
+
static enum protolayer_cb_result pl_dns_stream_unwrap(
struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
{
if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
+ if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
+ return pl_dns_stream_unwrap_timeout(layer, ctx);
+
/* pass thru */
return protolayer_continue(ctx);
}