return array_tail(task->ctx->req.rplan.pending);
}
+static int send_waiting(struct session *session)
+{
+ int ret = 0;
+ while (!session_waitinglist_is_empty(session)) {
+ struct qr_task *t = session_waitinglist_get(session);
+ ret = qr_task_send(t, session, NULL, NULL);
+ if (ret != 0) {
+ session_waitinglist_finalize(session, KR_STATE_FAIL);
+ session_tasklist_finalize(session, KR_STATE_FAIL);
+ session_close(session);
+ break;
+ }
+ session_waitinglist_pop(session, true);
+ }
+ return ret;
+}
static void on_connect(uv_connect_t *req, int status)
{
} else {
worker_add_tcp_connected(worker, peer, session);
}
- while (!session_waitinglist_is_empty(session)) {
- struct qr_task *t = session_waitinglist_get(session);
- ret = qr_task_send(t, session, NULL, NULL);
- if (ret != 0) {
- worker_del_tcp_connected(worker, peer);
- session_waitinglist_finalize(session, KR_STATE_FAIL);
- session_tasklist_finalize(session, KR_STATE_FAIL);
- session_close(session);
- return;
- }
- session_waitinglist_pop(session, true);
+
+ ret = send_waiting(session);
+ if (ret != 0) {
+ worker_del_tcp_connected(worker, peer);
+ return;
}
+
session_timer_stop(session);
session_timer_start(session, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
return state == KR_STATE_DONE ? 0 : kr_error(EIO);
}
+static int udp_task_step(struct qr_task *task,
+ const struct sockaddr *packet_source, knot_pkt_t *packet)
+{
+ struct request_ctx *ctx = task->ctx;
+ struct kr_request *req = &ctx->req;
+
+ /* If there is already outgoing query, enqueue to it. */
+ if (subreq_enqueue(task)) {
+ return kr_ok(); /* Will be notified when outgoing query finishes. */
+ }
+ /* Start transmitting */
+ uv_handle_t *handle = retransmit(task);
+ if (handle == NULL) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ /* Check current query NSLIST */
+ struct kr_query *qry = array_tail(req->rplan.pending);
+ assert(qry != NULL);
+ /* Retransmit at default interval, or more frequently if the mean
+ * RTT of the server is better. If the server is glued, use default rate. */
+ size_t timeout = qry->ns.score;
+ if (timeout > KR_NS_GLUED) {
+ /* We don't have information about variance in RTT, expect +10ms */
+ timeout = MIN(qry->ns.score + 10, KR_CONN_RETRY);
+ } else {
+ timeout = KR_CONN_RETRY;
+ }
+ /* Announce and start subrequest.
+ * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
+ */
+ subreq_lead(task);
+ struct session *session = handle->data;
+ assert(session_get_handle(session) == handle && (handle->type == UV_UDP));
+ int ret = session_timer_start(session, on_retransmit, timeout, 0);
+ /* Start next step with timeout, fatal if can't start a timer. */
+ if (ret != 0) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ return kr_ok();
+}
+
+static int tcp_task_waiting_connection(struct session *session, struct qr_task *task)
+{
+ assert(session_flags(session)->outgoing);
+ if (session_flags(session)->closing) {
+ /* Something went wrong. Better answer with KR_STATE_FAIL.
+ * TODO: normally should not happen,
+ * consider possibility to transform this into
+ * assert(!session_flags(session)->closing). */
+ return kr_error(EINVAL);
+ }
+ /* Add task to the end of list of waiting tasks.
+ * It will be notified in on_connect() or qr_task_on_send(). */
+ int ret = session_waitinglist_push(session, task);
+ if (ret < 0) {
+ return kr_error(EINVAL);
+ }
+ return kr_ok();
+}
+
+static int tcp_task_existing_connection(struct session *session, struct qr_task *task)
+{
+ assert(session_flags(session)->outgoing);
+ struct request_ctx *ctx = task->ctx;
+ struct worker_ctx *worker = ctx->worker;
+
+ if (session_flags(session)->closing) {
+ /* Something went wrong. Better answer with KR_STATE_FAIL.
+ * TODO: normally should not happen,
+ * consider possibility to transform this into
+ * assert(!session_flags(session)->closing). */
+ return kr_error(EINVAL);
+ }
+
+ /* If there are any unsent queries, send it first. */
+ int ret = send_waiting(session);
+ if (ret != 0) {
+ return kr_error(EINVAL);
+ }
+
+ /* No unsent queries at that point. */
+ if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
+ /* Too many outstanding queries, answer with SERFVAIL, */
+ return kr_error(EINVAL);
+ }
+
+ /* Send query to upstream. */
+ ret = qr_task_send(task, session, NULL, NULL);
+ if (ret != 0) {
+ /* Error, finalize task with SERVFAIL and
+ * close connection to upstream. */
+ session_tasklist_finalize(session, KR_STATE_FAIL);
+ session_close(session);
+ return kr_error(EINVAL);
+ }
+
+ return kr_ok();
+}
+
+static int tcp_task_make_connection(struct session *session, struct qr_task *task,
+ const struct sockaddr *addr /* , knot_pkt_t *packet */)
+{
+ struct request_ctx *ctx = task->ctx;
+ struct worker_ctx *worker = ctx->worker;
+
+ uv_connect_t *conn = malloc(sizeof(uv_connect_t));
+ if (!conn) {
+ return kr_error(EINVAL);
+ }
+ uv_handle_t *client = ioreq_spawn(worker, SOCK_STREAM,
+ addr->sa_family);
+ if (!client) {
+ free(conn);
+ return kr_error(EINVAL);
+ }
+ session = client->data;
+
+ /* Add address to the waiting list.
+ * Now it "is waiting to be connected to." */
+ int ret = worker_add_tcp_waiting(ctx->worker, addr, session);
+ if (ret < 0) {
+ free(conn);
+ return kr_error(EINVAL);
+ }
+
+ /* Check if there must be TLS */
+ struct engine *engine = ctx->worker->engine;
+ struct network *net = &engine->net;
+ const char *key = tcpsess_key(addr);
+ struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
+ if (entry) {
+ /* Address is configured to be used with TLS.
+ * We need to allocate auxiliary data structure. */
+ assert(session_tls_get_client_ctx(session) == NULL);
+ struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
+ if (!tls_ctx) {
+ worker_del_tcp_waiting(ctx->worker, addr);
+ free(conn);
+ return kr_error(EINVAL);
+ }
+ tls_client_ctx_set_session(tls_ctx, session);
+ session_tls_set_client_ctx(session, tls_ctx);
+ session_flags(session)->has_tls = true;
+ }
+
+ conn->data = session;
+ /* Store peer address for the session. */
+ struct sockaddr *peer = session_get_peer(session);
+ memcpy(peer, addr, kr_sockaddr_len(addr));
+
+ /* Start watchdog to catch eventual connection timeout. */
+ ret = session_timer_start(session, on_tcp_connect_timeout,
+ KR_CONN_RTT_MAX, 0);
+ if (ret != 0) {
+ worker_del_tcp_waiting(ctx->worker, addr);
+ free(conn);
+ return kr_error(EINVAL);
+ }
+
+ struct kr_query *qry = task_get_last_pending_query(task);
+ WITH_VERBOSE (qry) {
+ const char *peer_str = kr_straddr(peer);
+ VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str ? peer_str : "");
+ }
+
+ /* Start connection process to upstream. */
+ if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) {
+ session_timer_stop(session);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ free(conn);
+ return kr_error(EAGAIN);
+ }
+
+ /* Add task to the end of list of waiting tasks.
+ * Will be notified either in on_connect() or in qr_task_on_send(). */
+ ret = session_waitinglist_push(session, task);
+ if (ret < 0) {
+ session_timer_stop(session);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ free(conn);
+ return kr_error(EINVAL);
+ }
+
+ return kr_ok();
+}
+
+static int tcp_task_step(struct qr_task *task,
+ const struct sockaddr *packet_source, knot_pkt_t *packet)
+{
+ assert(task->pending_count == 0);
+ struct request_ctx *ctx = task->ctx;
+
+ const struct sockaddr *addr =
+ packet_source ? packet_source : task->addrlist;
+ if (addr->sa_family == AF_UNSPEC) {
+ /* Target isn't defined. Finalize task with SERVFAIL.
+ * Although task->pending_count is zero, there are can be followers,
+ * so we need to call subreq_finalize() to handle them properly. */
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ int ret = kr_error(EINVAL);
+ struct session* session = NULL;
+ if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
+ /* Connection is in the list of waiting connections.
+ * It means that connection establishing is coming right now. */
+ ret = tcp_task_waiting_connection(session, task);
+ } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
+ /* Connection has been already established. */
+ ret = tcp_task_existing_connection(session, task);
+ } else {
+ /* Make connection. */
+ ret = tcp_task_make_connection(session, task, addr);
+ }
+
+ if (ret != kr_ok()) {
+ subreq_finalize(task, packet_source, packet);
+ if (ret == kr_error(EAGAIN)) {
+ ret = qr_task_step(task, NULL, NULL);
+ } else {
+ ret = qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ }
+
+ return ret;
+}
+
static int qr_task_step(struct qr_task *task,
const struct sockaddr *packet_source, knot_pkt_t *packet)
{
req->has_tls = (ctx->source.session && session_flags(ctx->source.session)->has_tls);
if (worker->too_many_open) {
+ /* */
struct kr_rplan *rplan = &req->rplan;
if (worker->stats.rconcurrent <
worker->rconcurrent_highwatermark - 10) {
choice += 1;
}
- /* Start fast retransmit with UDP, otherwise connect. */
int ret = 0;
if (sock_type == SOCK_DGRAM) {
- /* If there is already outgoing query, enqueue to it. */
- if (subreq_enqueue(task)) {
- return kr_ok(); /* Will be notified when outgoing query finishes. */
- }
- /* Start transmitting */
- uv_handle_t *handle = retransmit(task);
- if (handle == NULL) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- /* Check current query NSLIST */
- struct kr_query *qry = array_tail(req->rplan.pending);
- assert(qry != NULL);
- /* Retransmit at default interval, or more frequently if the mean
- * RTT of the server is better. If the server is glued, use default rate. */
- size_t timeout = qry->ns.score;
- if (timeout > KR_NS_GLUED) {
- /* We don't have information about variance in RTT, expect +10ms */
- timeout = MIN(qry->ns.score + 10, KR_CONN_RETRY);
- } else {
- timeout = KR_CONN_RETRY;
- }
- /* Announce and start subrequest.
- * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
- */
- subreq_lead(task);
- struct session *session = handle->data;
- assert(session_get_handle(session) == handle && (handle->type == UV_UDP));
- ret = session_timer_start(session, on_retransmit, timeout, 0);
- /* Start next step with timeout, fatal if can't start a timer. */
- if (ret != 0) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
+ /* Start fast retransmit with UDP. */
+ ret = udp_task_step(task, packet_source, packet);
} else {
+ /* TCP. Connect to upstream or send the query if connection already exists. */
assert (sock_type == SOCK_STREAM);
- assert(task->pending_count == 0);
- const struct sockaddr *addr =
- packet_source ? packet_source : task->addrlist;
- if (addr->sa_family == AF_UNSPEC) {
- /* task->pending_count is zero, but there are can be followers */
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- struct session* session = NULL;
- if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
- assert(session_flags(session)->outgoing);
- if (session_flags(session)->closing) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- /* Connection is in the list of waiting connections.
- * It means that connection establishing is coming right now.
- * Add task to the end of list of waiting tasks..
- * It will be notified in on_connect() or qr_task_on_send(). */
- ret = session_waitinglist_push(session, task);
- if (ret < 0) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
- /* Connection has been already established */
- assert(session_flags(session)->outgoing);
- if (session_flags(session)->closing) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
-
- while (!session_waitinglist_is_empty(session)) {
- struct qr_task *t = session_waitinglist_get(session);
- ret = qr_task_send(t, session, NULL, NULL);
- if (ret != 0) {
- session_waitinglist_finalize(session, KR_STATE_FAIL);
- session_tasklist_finalize(session, KR_STATE_FAIL);
- subreq_finalize(task, packet_source, packet);
- session_close(session);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- session_waitinglist_pop(session, true);
- }
-
- if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
-
- ret = qr_task_send(task, session, NULL, NULL);
- if (ret != 0 /* && ret != kr_error(EMFILE) */) {
- session_tasklist_finalize(session, KR_STATE_FAIL);
- subreq_finalize(task, packet_source, packet);
- session_close(session);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- } else {
- /* Make connection */
- uv_connect_t *conn = malloc(sizeof(uv_connect_t));
- if (!conn) {
- return qr_task_step(task, NULL, NULL);
- }
- uv_handle_t *client = ioreq_spawn(worker, sock_type,
- addr->sa_family);
- if (!client) {
- free(conn);
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- session = client->data;
- ret = worker_add_tcp_waiting(ctx->worker, addr, session);
- if (ret < 0) {
- free(conn);
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
-
- /* Check if there must be TLS */
- struct engine *engine = ctx->worker->engine;
- struct network *net = &engine->net;
- const char *key = tcpsess_key(addr);
- struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
- if (entry) {
- assert(session_tls_get_client_ctx(session) == NULL);
- struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
- if (!tls_ctx) {
- worker_del_tcp_waiting(ctx->worker, addr);
- free(conn);
- subreq_finalize(task, packet_source, packet);
- return qr_task_step(task, NULL, NULL);
- }
- tls_client_ctx_set_session(tls_ctx, session);
- session_tls_set_client_ctx(session, tls_ctx);
- session_flags(session)->has_tls = true;
- }
-
- conn->data = session;
- struct sockaddr *peer = session_get_peer(session);
- memcpy(peer, addr, kr_sockaddr_len(addr));
-
- ret = session_timer_start(session, on_tcp_connect_timeout,
- KR_CONN_RTT_MAX, 0);
- if (ret != 0) {
- worker_del_tcp_waiting(ctx->worker, addr);
- free(conn);
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
-
- struct kr_query *qry = task_get_last_pending_query(task);
- WITH_VERBOSE (qry) {
- char peer_str[INET6_ADDRSTRLEN];
- inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
- VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str);
- }
-
- if (uv_tcp_connect(conn, (uv_tcp_t *)client,
- addr , on_connect) != 0) {
- session_timer_stop(session);
- worker_del_tcp_waiting(ctx->worker, addr);
- free(conn);
- subreq_finalize(task, packet_source, packet);
- return qr_task_step(task, NULL, NULL);
- }
-
- /* will be removed in on_connect() or qr_task_on_send() */
- ret = session_waitinglist_push(session, task);
- if (ret < 0) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
- }
- }
+ ret = tcp_task_step(task, packet_source, packet);
}
- return kr_ok();
+ return ret;
}
static int parse_packet(knot_pkt_t *query)