]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: replace a few libuv references with generic flags and move stats handling
authorOto Šťáva <oto.stava@nic.cz>
Wed, 7 Sep 2022 07:25:38 +0000 (09:25 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Thu, 26 Jan 2023 11:56:08 +0000 (12:56 +0100)
daemon/io.c
daemon/session2.c
daemon/session2.h
daemon/tls.c
daemon/worker.c
daemon/worker.h

index 0319a58a7bd9550d0ec6c305f8dd590ece904c62..2dc060e39edc21eb24bb0376e03974dda9cb072a 100644 (file)
@@ -209,6 +209,22 @@ static enum protolayer_cb_result pl_udp_wrap(struct protolayer_data *layer, stru
        return protolayer_push(ctx);
 }
 
+static bool pl_udp_event_wrap(enum protolayer_event_type event,
+                              void **baton,
+                              struct protolayer_manager *manager,
+                              struct protolayer_data *layer)
+{
+       if (event == PROTOLAYER_EVENT_STATS_SEND_ERR) {
+               the_worker->stats.err_udp += 1;
+               return false;
+       } else if (event == PROTOLAYER_EVENT_STATS_QRY_OUT) {
+               the_worker->stats.udp += 1;
+               return false;
+       }
+
+       return true;
+}
+
 
 struct pl_tcp_sess_data {
        struct proxy_result proxy;
@@ -333,6 +349,21 @@ static enum protolayer_cb_result pl_tcp_wrap(struct protolayer_data *layer, stru
        return protolayer_push(ctx);
 }
 
+static bool pl_tcp_event_wrap(enum protolayer_event_type event,
+                              void **baton,
+                              struct protolayer_manager *manager,
+                              struct protolayer_data *layer)
+{
+       if (event == PROTOLAYER_EVENT_STATS_SEND_ERR) {
+               the_worker->stats.err_tcp += 1;
+               return false;
+       } else if (event == PROTOLAYER_EVENT_STATS_QRY_OUT) {
+               the_worker->stats.tcp += 1;
+               return false;
+       }
+
+       return true;
+}
 
 void io_protolayers_init(void)
 {
@@ -340,7 +371,8 @@ void io_protolayers_init(void)
                .iter_size = sizeof(struct pl_udp_iter_data),
                .iter_init = pl_udp_iter_init,
                .unwrap = pl_udp_unwrap,
-               .wrap = pl_udp_wrap
+               .wrap = pl_udp_wrap,
+               .event_wrap = pl_udp_event_wrap,
        };
 
        protolayer_globals[PROTOLAYER_TCP] = (struct protolayer_globals){
@@ -348,7 +380,8 @@ void io_protolayers_init(void)
                .sess_init = pl_tcp_sess_init,
                .sess_deinit = pl_tcp_sess_deinit,
                .unwrap = pl_tcp_unwrap,
-               .wrap = pl_tcp_wrap
+               .wrap = pl_tcp_wrap,
+               .event_wrap = pl_tcp_event_wrap,
        };
 }
 
index f5c4de3c532494caa03ea0ff0d5d9ec16d3067c8..8c01374ea9c156583767f86cc6143fe191c38303 100644 (file)
@@ -318,7 +318,8 @@ static void *get_init_param(enum protolayer_protocol p,
        return NULL;
 }
 
-struct protolayer_manager *protolayer_manager_new(
+/** Allocates and initializes a new manager. */
+static struct protolayer_manager *protolayer_manager_new(
                struct session2 *s,
                enum protolayer_grp grp,
                struct protolayer_data_param *layer_param,
@@ -378,7 +379,8 @@ struct protolayer_manager *protolayer_manager_new(
        return m;
 }
 
-void protolayer_manager_free(struct protolayer_manager *m)
+/** Deinitializes all layer data in the manager and deallocates it. */
+static void protolayer_manager_free(struct protolayer_manager *m)
 {
        if (!m) return;
 
index 2dfe53fac1fd85bb5c7b82f04a5e4945cb4eb7ee..d7b1aa6831b2e0e97bec2967869bf097fd80a96d 100644 (file)
@@ -153,7 +153,9 @@ typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
        XX(TIMEOUT) /**< Signal that the session has timed out. */\
        XX(CONNECT) /**< Signal that a connection has been established. */\
        XX(CONNECT_FAIL) /**< Signal that a connection could not have been established. */\
-       XX(DISCONNECT) /**< Signal that a connection has ended. */
+       XX(DISCONNECT) /**< Signal that a connection has ended. */\
+       XX(STATS_SEND_ERR) /**< Failed task send - update stats. */\
+       XX(STATS_QRY_OUT) /**< Outgoing query submission - update stats. */
 
 /** Event type, to be interpreted by a layer. */
 enum protolayer_event_type {
@@ -371,17 +373,6 @@ struct protolayer_data_param {
                      * Only needs to be valid for session initialization. */
 };
 
-/** Allocates and initializes a new manager. */
-struct protolayer_manager *protolayer_manager_new(
-               struct session2 *s,
-               enum protolayer_grp grp,
-               struct protolayer_data_param *layer_param,
-               size_t layer_param_count);
-
-/** Deinitializes all layer data in the manager and deallocates it. */
-void protolayer_manager_free(struct protolayer_manager *m);
-
-
 /** Global data for a specific layered protocol. This is to be initialized in
  * the `protolayer_globals` global array (below) during the start of the
  * resolver. It contains pointers to the specific protocol's functions. */
@@ -595,22 +586,24 @@ struct session2 {
         * to close. */
        bool closing : 1;
 
-       /** If true, a connection is established. Only applicable to sessions
-        * using connection-based protocols.
-        *
-        * TODO: move to `worker`? */
-       bool connected : 1;
-
-       /** If true, session is being rate-limited.
-        *
-        * TODO: move to `worker`? */
-       bool throttled : 1;
-
        /** If true, encryption takes place in this session. Layers may use
         * this to determine whether padding should be applied. A layer that
         * provides security shall set this to `true` during session
         * initialization. */
        bool secure : 1;
+
+       /** If true, the session contains a stream-based protocol layer.
+        * Set during protocol layer initialization by the stream-based layer. */
+       bool stream : 1;
+
+       /** If true, a connection is established. Only applicable to sessions
+        * using connection-based protocols. One of the stream-based protocol
+        * layers is going to be the writer for this flag. */
+       bool connected : 1;
+
+       /** If true, session is being rate-limited. One of the protocol layers
+        * is going to be the writer for this flag. */
+       bool throttled : 1;
 };
 
 /** Allocates and initializes a new session with the specified protocol layer
index 0cfb469670d4f456eee5d7eb5e9247609501af01..0ec2d4e739a1faaaba7d38c2f55d77130ae3892b 100644 (file)
@@ -1303,6 +1303,22 @@ static bool pl_tls_event_unwrap(enum protolayer_event_type event,
        return true;
 }
 
+static bool pl_tls_event_wrap(enum protolayer_event_type event,
+                              void **baton,
+                              struct protolayer_manager *manager,
+                              struct protolayer_data *layer)
+{
+       if (event == PROTOLAYER_EVENT_STATS_SEND_ERR) {
+               the_worker->stats.err_tls += 1;
+               return false;
+       } else if (event == PROTOLAYER_EVENT_STATS_QRY_OUT) {
+               the_worker->stats.tls += 1;
+               return false;
+       }
+
+       return true;
+}
+
 void tls_protolayers_init(void)
 {
        protolayer_globals[PROTOLAYER_TLS] = (struct protolayer_globals){
@@ -1312,6 +1328,7 @@ void tls_protolayers_init(void)
                .unwrap = pl_tls_unwrap,
                .wrap = pl_tls_wrap,
                .event_unwrap = pl_tls_event_unwrap,
+               .event_wrap = pl_tls_event_wrap,
        };
 }
 
index a6f7acc3304d9e8e137f9e1d85634235abc94272..b348f22014c57ebe007c9a5a1d24ead816b4b563 100644 (file)
@@ -347,7 +347,7 @@ static struct request_ctx *request_create(struct session2 *session,
                const struct sockaddr *dst_addr = comm->dst_addr;
                const struct proxy_result *proxy = comm->proxy;
 
-               req->qsource.comm_flags.tcp = session2_get_handle(session)->type == UV_TCP;
+               req->qsource.comm_flags.tcp = session->stream;
                req->qsource.comm_flags.tls = session->secure;
 //             req->qsource.comm_flags.http = session->has_http; /* TODO */
                req->qsource.comm_flags.http = false;
@@ -521,7 +521,7 @@ static void qr_task_free(struct qr_task *task)
 /*@ Register new qr_task within session. */
 static int qr_task_register(struct qr_task *task, struct session2 *session)
 {
-       if (kr_fails_assert(!session->outgoing && session2_get_handle(session)->type == UV_TCP))
+       if (kr_fails_assert(!session->outgoing && session->stream))
                return kr_error(EINVAL);
 
        session2_tasklist_add(session, task);
@@ -568,18 +568,17 @@ static void qr_task_complete(struct qr_task *task)
 }
 
 /* This is called when we send subrequest / answer */
-int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
+int qr_task_on_send(struct qr_task *task, struct session2 *s, int status)
 {
        if (task->finished) {
                kr_require(task->leading == false);
                qr_task_complete(task);
        }
 
-       if (!handle || kr_fails_assert(handle->data))
+       if (!s)
                return status;
-       struct session2* s = handle->data;
 
-       if (handle->type == UV_UDP && s->outgoing) {
+       if (!s->stream && s->outgoing) {
                // This should ensure that we are only dealing with our question to upstream
                if (kr_fails_assert(!knot_wire_get_qr(task->pktbuf->wire)))
                        return status;
@@ -596,7 +595,7 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
                }
        }
 
-       if (handle->type == UV_TCP) {
+       if (s->stream) {
                if (status != 0) { // session probably not usable anymore; typically: ECONNRESET
                        const struct kr_request *req = &task->ctx->req;
                        if (kr_log_is_debug(WORKER, req)) {
@@ -635,8 +634,7 @@ static void qr_task_wrap_finished(int status, struct session2 *session,
                                   const void *target, void *baton)
 {
        struct qr_task *task = baton;
-       uv_handle_t *handle = session2_get_handle(session);
-       qr_task_on_send(task, handle, status);
+       qr_task_on_send(task, session, status);
        qr_task_unref(task);
        wire_buf_reset(&session->wire_buf);
 }
@@ -650,19 +648,13 @@ static int qr_task_send(struct qr_task *task, struct session2 *session,
        int ret = 0;
 //     struct request_ctx *ctx = task->ctx; /* TODO: used with doh below */
 
-       uv_handle_t *handle = session2_get_handle(session);
-       if (kr_fails_assert(handle && handle->data == session))
-               return qr_task_on_send(task, NULL, kr_error(EINVAL));
-       const bool is_stream = handle->type == UV_TCP;
-       kr_require(is_stream || handle->type == UV_UDP);
-
        if (addr == NULL)
                addr = session2_get_peer(session);
 
        if (pkt == NULL)
                pkt = worker_task_get_pktbuf(task);
 
-       if (session->outgoing && handle->type == UV_TCP) {
+       if (session->outgoing && session->stream) {
                size_t try_limit = session2_tasklist_get_len(session) + 1;
                uint16_t msg_id = knot_wire_get_id(pkt->wire);
                size_t try_count = 0;
@@ -720,26 +712,12 @@ static int qr_task_send(struct qr_task *task, struct session2 *session,
                        ret = kr_error(UV_EMFILE);
                }
 
-               /* TODO: doh */
-//             if (session_flags(session)->has_http)
-//                     the_worker->stats.err_http += 1;
-//             else
-               if (session->secure)
-                       the_worker->stats.err_tls += 1;
-               else if (handle->type == UV_UDP)
-                       the_worker->stats.err_udp += 1;
-               else
-                       the_worker->stats.err_tcp += 1;
+               session2_event(session, PROTOLAYER_EVENT_STATS_SEND_ERR, NULL);
        }
 
        /* Update outgoing query statistics */
        if (session->outgoing && addr) {
-               if (session->secure)
-                       the_worker->stats.tls += 1;
-               else if (handle->type == UV_UDP)
-                       the_worker->stats.udp += 1;
-               else
-                       the_worker->stats.tcp += 1;
+               session2_event(session, PROTOLAYER_EVENT_STATS_QRY_OUT, NULL);
 
                if (addr->sa_family == AF_INET6)
                        the_worker->stats.ipv6 += 1;
@@ -976,48 +954,46 @@ static void on_connect(uv_connect_t *req, int status)
        session2_timer_start(session, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
 }
 
-static uv_handle_t *transmit(struct qr_task *task)
+static int transmit(struct qr_task *task)
 {
-       uv_handle_t *ret = NULL;
+       if (!task)
+               return kr_error(EINVAL);
 
-       if (task) {
-               struct kr_transport* transport = task->transport;
+       struct kr_transport* transport = task->transport;
+       struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
 
-               struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
+       if (!choice)
+               return kr_error(EINVAL);
+       if (task->pending_count >= MAX_PENDING)
+               return kr_error(EBUSY);
+       /* Checkout answer before sending it */
+       struct request_ctx *ctx = task->ctx;
+       int ret = kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf);
+       if (ret)
+               return ret;
 
-               if (!choice) {
-                       return ret;
-               }
-               if (task->pending_count >= MAX_PENDING) {
-                       return ret;
-               }
-               /* Checkout answer before sending it */
-               struct request_ctx *ctx = task->ctx;
-               if (kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf) != 0) {
-                       return ret;
-               }
-               ret = ioreq_spawn(SOCK_DGRAM, choice->sin6_family,
-                               PROTOLAYER_GRP_DOUDP, NULL, 0);
-               if (!ret) {
-                       return ret;
-               }
-               struct sockaddr *addr = (struct sockaddr *)choice;
-               struct session2 *session = ret->data;
-               struct sockaddr *peer = session2_get_peer(session);
-               kr_assert(peer->sa_family == AF_UNSPEC && session->outgoing);
-               kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
-               memcpy(peer, addr, kr_sockaddr_len(addr));
-               if (qr_task_send(task, session, (struct sockaddr *)choice,
-                                task->pktbuf) != 0) {
-                       session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
-                       ret = NULL;
-               } else {
-                       task->pending[task->pending_count] = session;
-                       task->pending_count += 1;
-                       session2_start_read(session); /* Start reading answer */
-               }
+       uv_handle_t *handle = ioreq_spawn(SOCK_DGRAM, choice->sin6_family,
+                       PROTOLAYER_GRP_DOUDP, NULL, 0);
+       if (!handle)
+               return kr_error(EINVAL);
+
+       struct sockaddr *addr = (struct sockaddr *)choice;
+       struct session2 *session = handle->data;
+       struct sockaddr *peer = session2_get_peer(session);
+       kr_assert(peer->sa_family == AF_UNSPEC && session->outgoing);
+       kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
+       memcpy(peer, addr, kr_sockaddr_len(addr));
+
+       ret = qr_task_send(task, session, (struct sockaddr *)choice, task->pktbuf);
+       if (ret) {
+               session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
+               return ret;
        }
-       return ret;
+
+       task->pending[task->pending_count] = session;
+       task->pending_count += 1;
+       session2_start_read(session); /* Start reading answer */
+       return kr_ok();
 }
 
 
@@ -1123,7 +1099,7 @@ static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
        struct request_ctx *ctx = task->ctx;
        xdp_handle_data_t *xhd = src_handle->data;
        if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session))
-               return qr_task_on_send(task, src_handle, kr_error(EINVAL));
+               return qr_task_on_send(task, NULL, kr_error(EINVAL));
 
        knot_xdp_msg_t msg;
 #if KNOT_VERSION_HEX >= 0x030100
@@ -1149,7 +1125,7 @@ static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
        uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
        kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
 
-       return qr_task_on_send(task, src_handle, ret);
+       return qr_task_on_send(task, xhd->session, ret);
 #else
        kr_assert(!EINVAL);
        return kr_error(EINVAL);
@@ -1222,9 +1198,10 @@ static int udp_task_step(struct qr_task *task,
        if (subreq_enqueue(task)) {
                return kr_ok(); /* Will be notified when outgoing query finishes. */
        }
+
        /* Start transmitting */
-       uv_handle_t *handle = transmit(task);
-       if (handle == NULL) {
+       int err = transmit(task);
+       if (err) {
                subreq_finalize(task, packet_source, packet);
                return qr_task_finalize(task, KR_STATE_FAIL);
        }
@@ -1508,37 +1485,12 @@ static int qr_task_step(struct qr_task *task,
        }
 }
 
-static int parse_packet(knot_pkt_t *query)
-{
-       if (!query){
-               return kr_error(EINVAL);
-       }
-
-       /* Parse query packet. */
-       int ret = knot_pkt_parse(query, 0);
-       if (ret == KNOT_ETRAIL) {
-               /* Extra data after message end. */
-               ret = kr_error(EMSGSIZE);
-       } else if (ret != KNOT_EOK) {
-               /* Malformed query. */
-               ret = kr_error(EPROTO);
-       } else {
-               ret = kr_ok();
-       }
-
-       return ret;
-}
-
 int worker_submit(struct session2 *session, struct comm_info *comm,
                   const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt)
 {
        if (!session || !pkt)
                return kr_error(EINVAL);
 
-       uv_handle_t *handle = session2_get_handle(session);
-       if (!handle || !handle->loop->data)
-               return kr_error(EINVAL);
-
        const bool is_query = (knot_wire_get_qr(pkt->wire) == 0);
        const bool is_outgoing = session->outgoing;
 
@@ -1597,7 +1549,7 @@ int worker_submit(struct session2 *session, struct comm_info *comm,
                        return kr_error(ENOMEM);
                }
 
-               if (handle->type == UV_TCP && qr_task_register(task, session)) {
+               if (session->stream && qr_task_register(task, session)) {
                        return kr_error(ENOMEM);
                }
        } else { /* response from upstream */
@@ -1614,7 +1566,7 @@ int worker_submit(struct session2 *session, struct comm_info *comm,
                /* Note receive time for RTT calculation */
                task->recv_time = kr_now();
        }
-       if (kr_fails_assert(!uv_is_closing(session2_get_handle(session))))
+       if (kr_fails_assert(!session->closing))
                return kr_error(EINVAL);
 
        /* Packet was successfully parsed.
@@ -2019,9 +1971,11 @@ struct pl_dns_stream_iter_data {
        } sent;
 };
 
-static void pl_dns_stream_sess_init_common(struct pl_dns_stream_sess_data *stream,
+static void pl_dns_stream_sess_init_common(struct session2 *session,
+                                           struct pl_dns_stream_sess_data *stream,
                                            bool single)
 {
+       session->stream = true;
        *stream = (struct pl_dns_stream_sess_data){
                .single = single
        };
@@ -2032,7 +1986,7 @@ static int pl_dns_mstream_sess_init(struct protolayer_manager *manager,
                                     void *param)
 {
        struct pl_dns_stream_sess_data *stream = protolayer_sess_data(layer);
-       pl_dns_stream_sess_init_common(stream, false);
+       pl_dns_stream_sess_init_common(manager->session, stream, false);
        return kr_ok();
 }
 
@@ -2041,12 +1995,12 @@ static int pl_dns_sstream_sess_init(struct protolayer_manager *manager,
                                     void *param)
 {
        struct pl_dns_stream_sess_data *stream = protolayer_sess_data(layer);
-       pl_dns_stream_sess_init_common(stream, true);
+       pl_dns_stream_sess_init_common(manager->session, stream, true);
        return kr_ok();
 }
 
 static int pl_dns_stream_iter_init(struct protolayer_manager *manager,
-                                     struct protolayer_data *layer)
+                                   struct protolayer_data *layer)
 {
        struct pl_dns_stream_iter_data *stream = protolayer_iter_data(layer);
        *stream = (struct pl_dns_stream_iter_data){0};
index 40e1df0c2cc65cba37299ac4d61c700dc6a4b2dd..233aa290d1b95e4f1a3c04a771089485e8c6b855 100644 (file)
@@ -112,7 +112,7 @@ void worker_task_subreq_finalize(struct qr_task *task);
 bool worker_task_finished(struct qr_task *task);
 
 /** To be called after sending a DNS message.  It mainly deals with cleanups. */
-int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status);
+int qr_task_on_send(struct qr_task *task, struct session2 *s, int status);
 
 /** Various worker statistics.  Sync with wrk_stats() */
 struct worker_stats {