From: Grigorii Demidov Date: Tue, 23 Oct 2018 12:40:14 +0000 (+0200) Subject: daemon/worker: minor refactoring of qr_task_step X-Git-Tag: v3.1.0~6^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fd0b801514171c4cb10b0376a18c9fef601320a9;p=thirdparty%2Fknot-resolver.git daemon/worker: minor refactoring of qr_task_step --- diff --git a/daemon/worker.c b/daemon/worker.c index 8a885d228..fd2419caf 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -771,6 +771,22 @@ static struct kr_query *task_get_last_pending_query(struct qr_task *task) return array_tail(task->ctx->req.rplan.pending); } +static int send_waiting(struct session *session) +{ + int ret = 0; + while (!session_waitinglist_is_empty(session)) { + struct qr_task *t = session_waitinglist_get(session); + ret = qr_task_send(t, session, NULL, NULL); + if (ret != 0) { + session_waitinglist_finalize(session, KR_STATE_FAIL); + session_tasklist_finalize(session, KR_STATE_FAIL); + session_close(session); + break; + } + session_waitinglist_pop(session, true); + } + return ret; +} static void on_connect(uv_connect_t *req, int status) { @@ -840,18 +856,13 @@ static void on_connect(uv_connect_t *req, int status) } else { worker_add_tcp_connected(worker, peer, session); } - while (!session_waitinglist_is_empty(session)) { - struct qr_task *t = session_waitinglist_get(session); - ret = qr_task_send(t, session, NULL, NULL); - if (ret != 0) { - worker_del_tcp_connected(worker, peer); - session_waitinglist_finalize(session, KR_STATE_FAIL); - session_tasklist_finalize(session, KR_STATE_FAIL); - session_close(session); - return; - } - session_waitinglist_pop(session, true); + + ret = send_waiting(session); + if (ret != 0) { + worker_del_tcp_connected(worker, peer); + return; } + session_timer_stop(session); session_timer_start(session, tcp_timeout_trigger, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY); @@ -1091,6 +1102,235 @@ static int qr_task_finalize(struct qr_task *task, int state) return state == KR_STATE_DONE ? 0 : kr_error(EIO); } +static int udp_task_step(struct qr_task *task, + const struct sockaddr *packet_source, knot_pkt_t *packet) +{ + struct request_ctx *ctx = task->ctx; + struct kr_request *req = &ctx->req; + + /* If there is already outgoing query, enqueue to it. */ + if (subreq_enqueue(task)) { + return kr_ok(); /* Will be notified when outgoing query finishes. */ + } + /* Start transmitting */ + uv_handle_t *handle = retransmit(task); + if (handle == NULL) { + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + /* Check current query NSLIST */ + struct kr_query *qry = array_tail(req->rplan.pending); + assert(qry != NULL); + /* Retransmit at default interval, or more frequently if the mean + * RTT of the server is better. If the server is glued, use default rate. */ + size_t timeout = qry->ns.score; + if (timeout > KR_NS_GLUED) { + /* We don't have information about variance in RTT, expect +10ms */ + timeout = MIN(qry->ns.score + 10, KR_CONN_RETRY); + } else { + timeout = KR_CONN_RETRY; + } + /* Announce and start subrequest. + * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly. + */ + subreq_lead(task); + struct session *session = handle->data; + assert(session_get_handle(session) == handle && (handle->type == UV_UDP)); + int ret = session_timer_start(session, on_retransmit, timeout, 0); + /* Start next step with timeout, fatal if can't start a timer. */ + if (ret != 0) { + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + return kr_ok(); +} + +static int tcp_task_waiting_connection(struct session *session, struct qr_task *task) +{ + assert(session_flags(session)->outgoing); + if (session_flags(session)->closing) { + /* Something went wrong. Better answer with KR_STATE_FAIL. + * TODO: normally should not happen, + * consider possibility to transform this into + * assert(!session_flags(session)->closing). */ + return kr_error(EINVAL); + } + /* Add task to the end of list of waiting tasks. + * It will be notified in on_connect() or qr_task_on_send(). */ + int ret = session_waitinglist_push(session, task); + if (ret < 0) { + return kr_error(EINVAL); + } + return kr_ok(); +} + +static int tcp_task_existing_connection(struct session *session, struct qr_task *task) +{ + assert(session_flags(session)->outgoing); + struct request_ctx *ctx = task->ctx; + struct worker_ctx *worker = ctx->worker; + + if (session_flags(session)->closing) { + /* Something went wrong. Better answer with KR_STATE_FAIL. + * TODO: normally should not happen, + * consider possibility to transform this into + * assert(!session_flags(session)->closing). */ + return kr_error(EINVAL); + } + + /* If there are any unsent queries, send it first. */ + int ret = send_waiting(session); + if (ret != 0) { + return kr_error(EINVAL); + } + + /* No unsent queries at that point. */ + if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) { + /* Too many outstanding queries, answer with SERFVAIL, */ + return kr_error(EINVAL); + } + + /* Send query to upstream. */ + ret = qr_task_send(task, session, NULL, NULL); + if (ret != 0) { + /* Error, finalize task with SERVFAIL and + * close connection to upstream. */ + session_tasklist_finalize(session, KR_STATE_FAIL); + session_close(session); + return kr_error(EINVAL); + } + + return kr_ok(); +} + +static int tcp_task_make_connection(struct session *session, struct qr_task *task, + const struct sockaddr *addr /* , knot_pkt_t *packet */) +{ + struct request_ctx *ctx = task->ctx; + struct worker_ctx *worker = ctx->worker; + + uv_connect_t *conn = malloc(sizeof(uv_connect_t)); + if (!conn) { + return kr_error(EINVAL); + } + uv_handle_t *client = ioreq_spawn(worker, SOCK_STREAM, + addr->sa_family); + if (!client) { + free(conn); + return kr_error(EINVAL); + } + session = client->data; + + /* Add address to the waiting list. + * Now it "is waiting to be connected to." */ + int ret = worker_add_tcp_waiting(ctx->worker, addr, session); + if (ret < 0) { + free(conn); + return kr_error(EINVAL); + } + + /* Check if there must be TLS */ + struct engine *engine = ctx->worker->engine; + struct network *net = &engine->net; + const char *key = tcpsess_key(addr); + struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key); + if (entry) { + /* Address is configured to be used with TLS. + * We need to allocate auxiliary data structure. */ + assert(session_tls_get_client_ctx(session) == NULL); + struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker); + if (!tls_ctx) { + worker_del_tcp_waiting(ctx->worker, addr); + free(conn); + return kr_error(EINVAL); + } + tls_client_ctx_set_session(tls_ctx, session); + session_tls_set_client_ctx(session, tls_ctx); + session_flags(session)->has_tls = true; + } + + conn->data = session; + /* Store peer address for the session. */ + struct sockaddr *peer = session_get_peer(session); + memcpy(peer, addr, kr_sockaddr_len(addr)); + + /* Start watchdog to catch eventual connection timeout. */ + ret = session_timer_start(session, on_tcp_connect_timeout, + KR_CONN_RTT_MAX, 0); + if (ret != 0) { + worker_del_tcp_waiting(ctx->worker, addr); + free(conn); + return kr_error(EINVAL); + } + + struct kr_query *qry = task_get_last_pending_query(task); + WITH_VERBOSE (qry) { + const char *peer_str = kr_straddr(peer); + VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str ? peer_str : ""); + } + + /* Start connection process to upstream. */ + if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) { + session_timer_stop(session); + worker_del_tcp_waiting(ctx->worker, addr); + free(conn); + return kr_error(EAGAIN); + } + + /* Add task to the end of list of waiting tasks. + * Will be notified either in on_connect() or in qr_task_on_send(). */ + ret = session_waitinglist_push(session, task); + if (ret < 0) { + session_timer_stop(session); + worker_del_tcp_waiting(ctx->worker, addr); + free(conn); + return kr_error(EINVAL); + } + + return kr_ok(); +} + +static int tcp_task_step(struct qr_task *task, + const struct sockaddr *packet_source, knot_pkt_t *packet) +{ + assert(task->pending_count == 0); + struct request_ctx *ctx = task->ctx; + + const struct sockaddr *addr = + packet_source ? packet_source : task->addrlist; + if (addr->sa_family == AF_UNSPEC) { + /* Target isn't defined. Finalize task with SERVFAIL. + * Although task->pending_count is zero, there are can be followers, + * so we need to call subreq_finalize() to handle them properly. */ + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + int ret = kr_error(EINVAL); + struct session* session = NULL; + if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) { + /* Connection is in the list of waiting connections. + * It means that connection establishing is coming right now. */ + ret = tcp_task_waiting_connection(session, task); + } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) { + /* Connection has been already established. */ + ret = tcp_task_existing_connection(session, task); + } else { + /* Make connection. */ + ret = tcp_task_make_connection(session, task, addr); + } + + if (ret != kr_ok()) { + subreq_finalize(task, packet_source, packet); + if (ret == kr_error(EAGAIN)) { + ret = qr_task_step(task, NULL, NULL); + } else { + ret = qr_task_finalize(task, KR_STATE_FAIL); + } + } + + return ret; +} + static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet) { @@ -1113,6 +1353,7 @@ static int qr_task_step(struct qr_task *task, req->has_tls = (ctx->source.session && session_flags(ctx->source.session)->has_tls); if (worker->too_many_open) { + /* */ struct kr_rplan *rplan = &req->rplan; if (worker->stats.rconcurrent < worker->rconcurrent_highwatermark - 10) { @@ -1150,180 +1391,16 @@ static int qr_task_step(struct qr_task *task, choice += 1; } - /* Start fast retransmit with UDP, otherwise connect. */ int ret = 0; if (sock_type == SOCK_DGRAM) { - /* If there is already outgoing query, enqueue to it. */ - if (subreq_enqueue(task)) { - return kr_ok(); /* Will be notified when outgoing query finishes. */ - } - /* Start transmitting */ - uv_handle_t *handle = retransmit(task); - if (handle == NULL) { - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - /* Check current query NSLIST */ - struct kr_query *qry = array_tail(req->rplan.pending); - assert(qry != NULL); - /* Retransmit at default interval, or more frequently if the mean - * RTT of the server is better. If the server is glued, use default rate. */ - size_t timeout = qry->ns.score; - if (timeout > KR_NS_GLUED) { - /* We don't have information about variance in RTT, expect +10ms */ - timeout = MIN(qry->ns.score + 10, KR_CONN_RETRY); - } else { - timeout = KR_CONN_RETRY; - } - /* Announce and start subrequest. - * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly. - */ - subreq_lead(task); - struct session *session = handle->data; - assert(session_get_handle(session) == handle && (handle->type == UV_UDP)); - ret = session_timer_start(session, on_retransmit, timeout, 0); - /* Start next step with timeout, fatal if can't start a timer. */ - if (ret != 0) { - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } + /* Start fast retransmit with UDP. */ + ret = udp_task_step(task, packet_source, packet); } else { + /* TCP. Connect to upstream or send the query if connection already exists. */ assert (sock_type == SOCK_STREAM); - assert(task->pending_count == 0); - const struct sockaddr *addr = - packet_source ? packet_source : task->addrlist; - if (addr->sa_family == AF_UNSPEC) { - /* task->pending_count is zero, but there are can be followers */ - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - struct session* session = NULL; - if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) { - assert(session_flags(session)->outgoing); - if (session_flags(session)->closing) { - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - /* Connection is in the list of waiting connections. - * It means that connection establishing is coming right now. - * Add task to the end of list of waiting tasks.. - * It will be notified in on_connect() or qr_task_on_send(). */ - ret = session_waitinglist_push(session, task); - if (ret < 0) { - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) { - /* Connection has been already established */ - assert(session_flags(session)->outgoing); - if (session_flags(session)->closing) { - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - - while (!session_waitinglist_is_empty(session)) { - struct qr_task *t = session_waitinglist_get(session); - ret = qr_task_send(t, session, NULL, NULL); - if (ret != 0) { - session_waitinglist_finalize(session, KR_STATE_FAIL); - session_tasklist_finalize(session, KR_STATE_FAIL); - subreq_finalize(task, packet_source, packet); - session_close(session); - return qr_task_finalize(task, KR_STATE_FAIL); - } - session_waitinglist_pop(session, true); - } - - if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) { - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - - ret = qr_task_send(task, session, NULL, NULL); - if (ret != 0 /* && ret != kr_error(EMFILE) */) { - session_tasklist_finalize(session, KR_STATE_FAIL); - subreq_finalize(task, packet_source, packet); - session_close(session); - return qr_task_finalize(task, KR_STATE_FAIL); - } - } else { - /* Make connection */ - uv_connect_t *conn = malloc(sizeof(uv_connect_t)); - if (!conn) { - return qr_task_step(task, NULL, NULL); - } - uv_handle_t *client = ioreq_spawn(worker, sock_type, - addr->sa_family); - if (!client) { - free(conn); - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - session = client->data; - ret = worker_add_tcp_waiting(ctx->worker, addr, session); - if (ret < 0) { - free(conn); - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - - /* Check if there must be TLS */ - struct engine *engine = ctx->worker->engine; - struct network *net = &engine->net; - const char *key = tcpsess_key(addr); - struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key); - if (entry) { - assert(session_tls_get_client_ctx(session) == NULL); - struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker); - if (!tls_ctx) { - worker_del_tcp_waiting(ctx->worker, addr); - free(conn); - subreq_finalize(task, packet_source, packet); - return qr_task_step(task, NULL, NULL); - } - tls_client_ctx_set_session(tls_ctx, session); - session_tls_set_client_ctx(session, tls_ctx); - session_flags(session)->has_tls = true; - } - - conn->data = session; - struct sockaddr *peer = session_get_peer(session); - memcpy(peer, addr, kr_sockaddr_len(addr)); - - ret = session_timer_start(session, on_tcp_connect_timeout, - KR_CONN_RTT_MAX, 0); - if (ret != 0) { - worker_del_tcp_waiting(ctx->worker, addr); - free(conn); - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - - struct kr_query *qry = task_get_last_pending_query(task); - WITH_VERBOSE (qry) { - char peer_str[INET6_ADDRSTRLEN]; - inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str)); - VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str); - } - - if (uv_tcp_connect(conn, (uv_tcp_t *)client, - addr , on_connect) != 0) { - session_timer_stop(session); - worker_del_tcp_waiting(ctx->worker, addr); - free(conn); - subreq_finalize(task, packet_source, packet); - return qr_task_step(task, NULL, NULL); - } - - /* will be removed in on_connect() or qr_task_on_send() */ - ret = session_waitinglist_push(session, task); - if (ret < 0) { - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); - } - } + ret = tcp_task_step(task, packet_source, packet); } - return kr_ok(); + return ret; } static int parse_packet(knot_pkt_t *query)