]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/worker: minor refactoring of qr_task_step
authorGrigorii Demidov <grigorii.demidov@nic.cz>
Tue, 23 Oct 2018 12:40:14 +0000 (14:40 +0200)
committerGrigorii Demidov <grigorii.demidov@nic.cz>
Wed, 24 Oct 2018 07:08:55 +0000 (09:08 +0200)
daemon/worker.c

index 8a885d228cf72b0c27158dc56ade0db045e374cd..fd2419caffff1e07a0c5565f1c7c70ee6f915d01 100644 (file)
@@ -771,6 +771,22 @@ static struct kr_query *task_get_last_pending_query(struct qr_task *task)
        return array_tail(task->ctx->req.rplan.pending);
 }
 
+static int send_waiting(struct session *session)
+{
+       int ret = 0;
+       while (!session_waitinglist_is_empty(session)) {
+               struct qr_task *t = session_waitinglist_get(session);
+               ret = qr_task_send(t, session, NULL, NULL);
+               if (ret != 0) {
+                       session_waitinglist_finalize(session, KR_STATE_FAIL);
+                       session_tasklist_finalize(session, KR_STATE_FAIL);
+                       session_close(session);
+                       break;
+               }
+               session_waitinglist_pop(session, true);
+       }
+       return ret;
+}
 
 static void on_connect(uv_connect_t *req, int status)
 {
@@ -840,18 +856,13 @@ static void on_connect(uv_connect_t *req, int status)
        } else {
                worker_add_tcp_connected(worker, peer, session);
        }
-       while (!session_waitinglist_is_empty(session)) {
-               struct qr_task *t = session_waitinglist_get(session);
-               ret = qr_task_send(t, session, NULL, NULL);
-               if (ret != 0) {
-                       worker_del_tcp_connected(worker, peer);
-                       session_waitinglist_finalize(session, KR_STATE_FAIL);
-                       session_tasklist_finalize(session, KR_STATE_FAIL);
-                       session_close(session);
-                       return;
-               }
-               session_waitinglist_pop(session, true);
+
+       ret = send_waiting(session);
+       if (ret != 0) {
+               worker_del_tcp_connected(worker, peer);
+               return;
        }
+
        session_timer_stop(session);
        session_timer_start(session, tcp_timeout_trigger,
                            MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
@@ -1091,6 +1102,235 @@ static int qr_task_finalize(struct qr_task *task, int state)
        return state == KR_STATE_DONE ? 0 : kr_error(EIO);
 }
 
+static int udp_task_step(struct qr_task *task,
+                        const struct sockaddr *packet_source, knot_pkt_t *packet)
+{
+       struct request_ctx *ctx = task->ctx;
+       struct kr_request *req = &ctx->req;
+
+       /* If there is already outgoing query, enqueue to it. */
+       if (subreq_enqueue(task)) {
+               return kr_ok(); /* Will be notified when outgoing query finishes. */
+       }
+       /* Start transmitting */
+       uv_handle_t *handle = retransmit(task);
+       if (handle == NULL) {
+               subreq_finalize(task, packet_source, packet);
+               return qr_task_finalize(task, KR_STATE_FAIL);
+       }
+       /* Check current query NSLIST */
+       struct kr_query *qry = array_tail(req->rplan.pending);
+       assert(qry != NULL);
+       /* Retransmit at default interval, or more frequently if the mean
+        * RTT of the server is better. If the server is glued, use default rate. */
+       size_t timeout = qry->ns.score;
+       if (timeout > KR_NS_GLUED) {
+               /* We don't have information about variance in RTT, expect +10ms */
+               timeout = MIN(qry->ns.score + 10, KR_CONN_RETRY);
+       } else {
+               timeout = KR_CONN_RETRY;
+       }
+       /* Announce and start subrequest.
+        * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
+        */
+       subreq_lead(task);
+       struct session *session = handle->data;
+       assert(session_get_handle(session) == handle && (handle->type == UV_UDP));
+       int ret = session_timer_start(session, on_retransmit, timeout, 0);
+       /* Start next step with timeout, fatal if can't start a timer. */
+       if (ret != 0) {
+               subreq_finalize(task, packet_source, packet);
+               return qr_task_finalize(task, KR_STATE_FAIL);
+       }
+       return kr_ok();
+}
+
+static int tcp_task_waiting_connection(struct session *session, struct qr_task *task)
+{
+       assert(session_flags(session)->outgoing);
+       if (session_flags(session)->closing) {
+               /* Something went wrong. Better answer with KR_STATE_FAIL.
+                * TODO: normally should not happen,
+                * consider possibility to transform this into
+                * assert(!session_flags(session)->closing). */
+               return kr_error(EINVAL);
+       }
+       /* Add task to the end of list of waiting tasks.
+        * It will be notified in on_connect() or qr_task_on_send(). */
+       int ret = session_waitinglist_push(session, task);
+       if (ret < 0) {
+               return kr_error(EINVAL);
+       }
+       return kr_ok();
+}
+
+static int tcp_task_existing_connection(struct session *session, struct qr_task *task)
+{
+       assert(session_flags(session)->outgoing);
+       struct request_ctx *ctx = task->ctx;
+       struct worker_ctx *worker = ctx->worker;
+
+       if (session_flags(session)->closing) {
+               /* Something went wrong. Better answer with KR_STATE_FAIL.
+                * TODO: normally should not happen,
+                * consider possibility to transform this into
+                * assert(!session_flags(session)->closing). */
+               return kr_error(EINVAL);
+       }
+
+       /* If there are any unsent queries, send it first. */
+       int ret = send_waiting(session);
+       if (ret != 0) {
+               return kr_error(EINVAL);
+       }
+
+       /* No unsent queries at that point. */
+       if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
+               /* Too many outstanding queries, answer with SERFVAIL, */
+               return kr_error(EINVAL);
+       }
+
+       /* Send query to upstream. */
+       ret = qr_task_send(task, session, NULL, NULL);
+       if (ret != 0) {
+               /* Error, finalize task with SERVFAIL and
+                * close connection to upstream. */
+               session_tasklist_finalize(session, KR_STATE_FAIL);
+               session_close(session);
+               return kr_error(EINVAL);
+       }
+
+       return kr_ok();
+}
+
+static int tcp_task_make_connection(struct session *session, struct qr_task *task,
+                                   const struct sockaddr *addr /* , knot_pkt_t *packet */)
+{
+       struct request_ctx *ctx = task->ctx;
+       struct worker_ctx *worker = ctx->worker;
+
+       uv_connect_t *conn = malloc(sizeof(uv_connect_t));
+       if (!conn) {
+               return kr_error(EINVAL);
+       }
+       uv_handle_t *client = ioreq_spawn(worker, SOCK_STREAM,
+                                                 addr->sa_family);
+       if (!client) {
+               free(conn);
+               return kr_error(EINVAL);
+       }
+       session = client->data;
+
+       /* Add address to the waiting list.
+        * Now it "is waiting to be connected to." */
+       int ret = worker_add_tcp_waiting(ctx->worker, addr, session);
+       if (ret < 0) {
+               free(conn);
+               return kr_error(EINVAL);
+       }
+
+       /* Check if there must be TLS */
+       struct engine *engine = ctx->worker->engine;
+       struct network *net = &engine->net;
+       const char *key = tcpsess_key(addr);
+       struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
+       if (entry) {
+               /* Address is configured to be used with TLS.
+                * We need to allocate auxiliary data structure. */
+               assert(session_tls_get_client_ctx(session) == NULL);
+               struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
+               if (!tls_ctx) {
+                       worker_del_tcp_waiting(ctx->worker, addr);
+                       free(conn);
+                       return kr_error(EINVAL);
+               }
+               tls_client_ctx_set_session(tls_ctx, session);
+               session_tls_set_client_ctx(session, tls_ctx);
+               session_flags(session)->has_tls = true;
+       }
+
+       conn->data = session;
+       /*  Store peer address for the session. */
+       struct sockaddr *peer = session_get_peer(session);
+       memcpy(peer, addr, kr_sockaddr_len(addr));
+
+       /*  Start watchdog to catch eventual connection timeout. */
+       ret = session_timer_start(session, on_tcp_connect_timeout,
+                                 KR_CONN_RTT_MAX, 0);
+       if (ret != 0) {
+               worker_del_tcp_waiting(ctx->worker, addr);
+               free(conn);
+               return kr_error(EINVAL);
+       }
+
+       struct kr_query *qry = task_get_last_pending_query(task);
+       WITH_VERBOSE (qry) {
+               const char *peer_str = kr_straddr(peer);
+               VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str ? peer_str : "");
+       }
+
+       /*  Start connection process to upstream. */
+       if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) {
+               session_timer_stop(session);
+               worker_del_tcp_waiting(ctx->worker, addr);
+               free(conn);
+               return kr_error(EAGAIN);
+       }
+
+       /* Add task to the end of list of waiting tasks.
+        * Will be notified either in on_connect() or in qr_task_on_send(). */
+       ret = session_waitinglist_push(session, task);
+       if (ret < 0) {
+               session_timer_stop(session);
+               worker_del_tcp_waiting(ctx->worker, addr);
+               free(conn);
+               return kr_error(EINVAL);
+       }
+
+       return kr_ok();
+}
+
+static int tcp_task_step(struct qr_task *task,
+                        const struct sockaddr *packet_source, knot_pkt_t *packet)
+{
+       assert(task->pending_count == 0);
+       struct request_ctx *ctx = task->ctx;
+
+       const struct sockaddr *addr =
+               packet_source ? packet_source : task->addrlist;
+       if (addr->sa_family == AF_UNSPEC) {
+               /* Target isn't defined. Finalize task with SERVFAIL.
+                * Although task->pending_count is zero, there are can be followers,
+                * so we need to call subreq_finalize() to handle them properly. */
+               subreq_finalize(task, packet_source, packet);
+               return qr_task_finalize(task, KR_STATE_FAIL);
+       }
+       int ret = kr_error(EINVAL);
+       struct session* session = NULL;
+       if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
+               /* Connection is in the list of waiting connections.
+                * It means that connection establishing is coming right now. */
+               ret = tcp_task_waiting_connection(session, task);
+       } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
+               /* Connection has been already established. */
+               ret = tcp_task_existing_connection(session, task);
+       } else {
+               /* Make connection. */
+               ret = tcp_task_make_connection(session, task, addr);
+       }
+
+       if (ret != kr_ok()) {
+               subreq_finalize(task, packet_source, packet);
+               if (ret == kr_error(EAGAIN)) {
+                       ret = qr_task_step(task, NULL, NULL);
+               } else {
+                       ret = qr_task_finalize(task, KR_STATE_FAIL);
+               }
+       }
+
+       return ret;
+}
+
 static int qr_task_step(struct qr_task *task,
                        const struct sockaddr *packet_source, knot_pkt_t *packet)
 {
@@ -1113,6 +1353,7 @@ static int qr_task_step(struct qr_task *task,
        req->has_tls = (ctx->source.session && session_flags(ctx->source.session)->has_tls);
 
        if (worker->too_many_open) {
+               /* */
                struct kr_rplan *rplan = &req->rplan;
                if (worker->stats.rconcurrent <
                        worker->rconcurrent_highwatermark - 10) {
@@ -1150,180 +1391,16 @@ static int qr_task_step(struct qr_task *task,
                choice += 1;
        }
 
-       /* Start fast retransmit with UDP, otherwise connect. */
        int ret = 0;
        if (sock_type == SOCK_DGRAM) {
-               /* If there is already outgoing query, enqueue to it. */
-               if (subreq_enqueue(task)) {
-                       return kr_ok(); /* Will be notified when outgoing query finishes. */
-               }
-               /* Start transmitting */
-               uv_handle_t *handle = retransmit(task);
-               if (handle == NULL) {
-                       subreq_finalize(task, packet_source, packet);
-                       return qr_task_finalize(task, KR_STATE_FAIL);
-               }
-               /* Check current query NSLIST */
-               struct kr_query *qry = array_tail(req->rplan.pending);
-               assert(qry != NULL);
-               /* Retransmit at default interval, or more frequently if the mean
-                * RTT of the server is better. If the server is glued, use default rate. */
-               size_t timeout = qry->ns.score;
-               if (timeout > KR_NS_GLUED) {
-                       /* We don't have information about variance in RTT, expect +10ms */
-                       timeout = MIN(qry->ns.score + 10, KR_CONN_RETRY);
-               } else {
-                       timeout = KR_CONN_RETRY;
-               }
-               /* Announce and start subrequest.
-                * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
-                */
-               subreq_lead(task);
-               struct session *session = handle->data;
-               assert(session_get_handle(session) == handle && (handle->type == UV_UDP));
-               ret = session_timer_start(session, on_retransmit, timeout, 0);
-               /* Start next step with timeout, fatal if can't start a timer. */
-               if (ret != 0) {
-                       subreq_finalize(task, packet_source, packet);
-                       return qr_task_finalize(task, KR_STATE_FAIL);
-               }
+               /* Start fast retransmit with UDP. */
+               ret = udp_task_step(task, packet_source, packet);
        } else {
+               /* TCP. Connect to upstream or send the query if connection already exists. */
                assert (sock_type == SOCK_STREAM);
-               assert(task->pending_count == 0);
-               const struct sockaddr *addr =
-                       packet_source ? packet_source : task->addrlist;
-               if (addr->sa_family == AF_UNSPEC) {
-                       /* task->pending_count is zero, but there are can be followers */
-                       subreq_finalize(task, packet_source, packet);
-                       return qr_task_finalize(task, KR_STATE_FAIL);
-               }
-               struct session* session = NULL;
-               if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
-                       assert(session_flags(session)->outgoing);
-                       if (session_flags(session)->closing) {
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-                       /* Connection is in the list of waiting connections.
-                        * It means that connection establishing is coming right now.
-                        * Add task to the end of list of waiting tasks..
-                        * It will be notified in on_connect() or qr_task_on_send(). */
-                       ret = session_waitinglist_push(session, task);
-                       if (ret < 0) {
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-               } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
-                       /* Connection has been already established */
-                       assert(session_flags(session)->outgoing);
-                       if (session_flags(session)->closing) {
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-
-                       while (!session_waitinglist_is_empty(session)) {
-                               struct qr_task *t = session_waitinglist_get(session);
-                               ret = qr_task_send(t, session, NULL, NULL);
-                               if (ret != 0) {
-                                       session_waitinglist_finalize(session, KR_STATE_FAIL);
-                                       session_tasklist_finalize(session, KR_STATE_FAIL);
-                                       subreq_finalize(task, packet_source, packet);
-                                       session_close(session);
-                                       return qr_task_finalize(task, KR_STATE_FAIL);
-                               }
-                               session_waitinglist_pop(session, true);
-                       }
-
-                       if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-
-                       ret = qr_task_send(task, session, NULL, NULL);
-                       if (ret != 0 /* && ret != kr_error(EMFILE) */) {
-                               session_tasklist_finalize(session, KR_STATE_FAIL);
-                               subreq_finalize(task, packet_source, packet);
-                               session_close(session);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-               } else {
-                       /* Make connection */
-                       uv_connect_t *conn = malloc(sizeof(uv_connect_t));
-                       if (!conn) {
-                               return qr_task_step(task, NULL, NULL);
-                       }
-                       uv_handle_t *client = ioreq_spawn(worker, sock_type,
-                                                         addr->sa_family);
-                       if (!client) {
-                               free(conn);
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-                       session = client->data;
-                       ret = worker_add_tcp_waiting(ctx->worker, addr, session);
-                       if (ret < 0) {
-                               free(conn);
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-
-                       /* Check if there must be TLS */
-                       struct engine *engine = ctx->worker->engine;
-                       struct network *net = &engine->net;
-                       const char *key = tcpsess_key(addr);
-                       struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
-                       if (entry) {
-                               assert(session_tls_get_client_ctx(session) == NULL);
-                               struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
-                               if (!tls_ctx) {
-                                       worker_del_tcp_waiting(ctx->worker, addr);
-                                       free(conn);
-                                       subreq_finalize(task, packet_source, packet);
-                                       return qr_task_step(task, NULL, NULL);
-                               }
-                               tls_client_ctx_set_session(tls_ctx, session);
-                               session_tls_set_client_ctx(session, tls_ctx);
-                               session_flags(session)->has_tls = true;
-                       }
-
-                       conn->data = session;
-                       struct sockaddr *peer = session_get_peer(session);
-                       memcpy(peer, addr, kr_sockaddr_len(addr));
-
-                       ret = session_timer_start(session, on_tcp_connect_timeout,
-                                                 KR_CONN_RTT_MAX, 0);
-                       if (ret != 0) {
-                               worker_del_tcp_waiting(ctx->worker, addr);
-                               free(conn);
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-
-                       struct kr_query *qry = task_get_last_pending_query(task);
-                       WITH_VERBOSE (qry) {
-                               char peer_str[INET6_ADDRSTRLEN];
-                               inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
-                               VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str);
-                       }
-
-                       if (uv_tcp_connect(conn, (uv_tcp_t *)client,
-                                          addr , on_connect) != 0) {
-                               session_timer_stop(session);
-                               worker_del_tcp_waiting(ctx->worker, addr);
-                               free(conn);
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_step(task, NULL, NULL);
-                       }
-
-                       /* will be removed in on_connect() or qr_task_on_send() */
-                       ret = session_waitinglist_push(session, task);
-                       if (ret < 0) {
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-               }
+               ret = tcp_task_step(task, packet_source, packet);
        }
-       return kr_ok();
+       return ret;
 }
 
 static int parse_packet(knot_pkt_t *query)