]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/defer: allow recursive time accounting, fix subreq accounting docs-develop-defe-x6j6qe/deployments/5754
authorLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 25 Nov 2024 18:10:20 +0000 (19:10 +0100)
committerLukáš Ondráček <lukas.ondracek@nic.cz>
Mon, 25 Nov 2024 18:38:46 +0000 (19:38 +0100)
daemon/defer.c
daemon/defer.h
daemon/session2.c
daemon/worker.c

index e4456dfe31baf157f3c9ae00459540647ae86aba..ff65a963d8dd9feab0904849923b29922b660021 100644 (file)
@@ -102,7 +102,7 @@ static inline void phase_set(enum phase p)
                phase = p;
        }
 }
-static inline void phase_account(uint64_t nsec)
+static inline void phase_charge(uint64_t nsec)
 {
        kr_assert(phase != PHASE_ANY);
        phase_elapsed += nsec;
@@ -135,10 +135,10 @@ static bool using_avx2(void)
 }
 
 /// Increment KRU counters by given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream)
+void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
 {
        if (phase_accounting) {
-               phase_account(nsec);
+               phase_charge(nsec);
                phase_accounting = false;
        }
 
@@ -321,7 +321,7 @@ static inline void process_single_deferred(void)
        if (kr_fails_assert(ctx)) return;
 
        defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
-       phase_accounting = true;
+       phase_accounting = true;  // TODO check there are no suspensions of sampling
 
        struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
        struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
@@ -448,10 +448,15 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
        push_query(ctx, priority, false);
        waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
                // for stream, payload is counted in session wire buffer
-       while (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
-               defer_sample_restart();
-               process_single_deferred();  // possibly defers again without decreasing waiting_requests_size
-               // defer_sample_stop should be called soon outside
+
+       if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
+               defer_sample_state_t prev_sample_state;
+               defer_sample_start(&prev_sample_state);
+               do {
+                       process_single_deferred();  // possibly defers again without decreasing waiting_requests_size
+                       defer_sample_restart();
+               } while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
+               defer_sample_stop(&prev_sample_state, true);
        }
 
        return protolayer_async();
@@ -489,14 +494,14 @@ static void defer_queues_idle(uv_idle_t *handle)
        kr_assert(waiting_requests > 0);
        VERBOSE_LOG("IDLE\n");
        VERBOSE_LOG("  %d waiting\n", waiting_requests);
-       defer_sample_start();
+       defer_sample_start(NULL);
        uint64_t idle_stamp = defer_sample_state.stamp;
-       while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) {
+       do {
                process_single_deferred();
                defer_sample_restart();
-       }
+       } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
+       defer_sample_stop(NULL, true);
        cleanup_queues();
-       defer_sample_stop();  // TODO skip calling and use just restart elsewhere?
        udp_queue_send_all();
 
        if (waiting_requests > 0) {
index cd899fa185e01abb2a19ebb9209099a9fd022278..20e4ac92b3d56b871d558815cb6ba763c9e6ee8b 100644 (file)
@@ -18,10 +18,10 @@ int defer_init_idle(uv_loop_t *loop);
 void defer_deinit(void);
 
 /// Increment KRU counters by the given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream);
+void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream);
 
 typedef struct {
-       int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
+       bool is_accounting; /// whether currently accounting the time to someone
        union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
        bool stream;
        uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
@@ -35,23 +35,13 @@ extern bool defer_initialized; /// defer_init was called, possibly keeping defer
 // TODO: reconsider `static inline` cases below
 
 #include <time.h>
-static inline uint64_t get_stamp(void)
+static inline uint64_t defer_get_stamp(void)
 {
        struct timespec now_ts = {0};
        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts);
        return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec;
 }
 
-/// Start accounting work, if not doing it already.
-static inline void defer_sample_start(void)
-{
-       if (!defer) return;
-       kr_assert(!defer_sample_state.is_accounting);
-       ++defer_sample_state.is_accounting;
-       defer_sample_state.stamp = get_stamp();
-       defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
-}
-
 /// Annotate the work currently being accounted by an IP address.
 static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
 {
@@ -78,38 +68,74 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
        defer_sample_state.stream = stream;
 }
 
-/// Stop accounting work - and change the source if applicable.
-static inline void defer_sample_stop(void)
+/// Internal; start accounting work at specified timestamp.
+static inline void defer_sample_start_stamp(uint64_t stamp)
 {
        if (!defer) return;
+       kr_assert(!defer_sample_state.is_accounting);
+       defer_sample_state.is_accounting = true;
+       defer_sample_state.stamp = stamp;
+       defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
+}
 
-       if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird
-       if (--defer_sample_state.is_accounting) return;
-       if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return;
+/// Internal; stop accounting work at specified timestamp and charge the source if applicable.
+static inline void defer_sample_stop_stamp(uint64_t stamp)
+{
+       if (!defer) return;
+       kr_assert(defer_sample_state.is_accounting);
+       defer_sample_state.is_accounting = false;
 
-       const uint64_t elapsed = get_stamp() - defer_sample_state.stamp;
+       if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return;
 
-       // we accounted something
+       const uint64_t elapsed = stamp - defer_sample_state.stamp;
+       if (elapsed == 0) return;
 
        // TODO: some queries of internal origin have suspicioiusly high numbers.
        // We won't be really accounting those, but it might suggest some other issue.
 
-       defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
+       defer_charge(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
 }
 
-/// Stop accounting if active, then start again. Uses just one stamp.
-static inline void defer_sample_restart(void)
-{
+/// Start accounting work; optionally save state of current accounting.
+/// Current state can be saved only after having an address assigned.
+static inline void defer_sample_start(defer_sample_state_t *prev_state_out) {
        if (!defer) return;
+       uint64_t stamp = defer_get_stamp();
 
-       uint64_t stamp = get_stamp();
-
-       if (defer_sample_state.is_accounting > 0) {
-               const uint64_t elapsed = stamp - defer_sample_state.stamp;
-               defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
+       // suspend
+       if (prev_state_out) {
+               *prev_state_out = defer_sample_state;  // TODO stamp is not needed
+               if (defer_sample_state.is_accounting)
+                       defer_sample_stop_stamp(stamp);
        }
 
-       defer_sample_state.stamp = stamp;
-       defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
-       defer_sample_state.is_accounting = 1;
+       // start
+       defer_sample_start_stamp(stamp);
+}
+
+/// Stop accounting and start it again.
+static inline void defer_sample_restart(void) {
+       if (!defer) return;
+       uint64_t stamp = defer_get_stamp();
+
+       // stop
+       defer_sample_stop_stamp(stamp);
+
+       // start
+       defer_sample_start_stamp(stamp);
+}
+
+/// Stop accounting and charge the source if applicable; optionally resume previous accounting.
+static inline void defer_sample_stop(defer_sample_state_t *prev_state, bool reuse_last_stamp) {
+       if (!defer) return;
+       uint64_t stamp = reuse_last_stamp ? defer_sample_state.stamp : defer_get_stamp();
+
+       // stop
+       defer_sample_stop_stamp(stamp);
+
+       // resume
+       if (prev_state) {
+               defer_sample_state = *prev_state;
+               defer_sample_state.stamp = stamp;
+       }
 }
index eb3bedb3c98911987a645b8a4a5f342bc9d878a8..d16112b1fcb2aa5c882c093e039ad18dca8aa45c 100644 (file)
@@ -630,7 +630,7 @@ static int session2_submit(
        // Note two cases: incoming session (new request)
        // vs. outgoing session (resuming work on some request)
        if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
-               defer_sample_start();
+               defer_sample_start(NULL);
 
        struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
        kr_require(ctx);
@@ -692,7 +692,7 @@ static int session2_submit(
 
        int ret = protolayer_step(ctx);
        if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
-               defer_sample_stop();
+               defer_sample_stop(NULL, false);
        return ret;
 }
 
@@ -980,10 +980,10 @@ uv_handle_t *session2_get_handle(struct session2 *s)
 
 static void session2_on_timeout(uv_timer_t *timer)
 {
-       defer_sample_start();
+       defer_sample_start(NULL);
        struct session2 *s = timer->data;
        session2_event(s, s->timer_event, NULL);
-       defer_sample_stop();
+       defer_sample_stop(NULL, false);
 }
 
 int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat)
index 8596e93bfd4597549daafa76f8413907610e8d07..b4516d2e5c5ca335c24f22b6051e5274c97f0c0c 100644 (file)
@@ -879,26 +879,32 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
                kr_assert(ret == KNOT_EOK && val_deleted == task);
        }
        /* Notify waiting tasks. */
-       struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
-       for (size_t i = task->waiting.len; i > 0; i--) {
-               struct qr_task *follower = task->waiting.at[i - 1];
-               /* Reuse MSGID and 0x20 secret */
-               if (follower->ctx->req.rplan.pending.len > 0) {
-                       struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
-                       qry->id = leader_qry->id;
-                       qry->secret = leader_qry->secret;
-
-                       // Note that this transport may not be present in `leader_qry`'s server selection
-                       follower->transport = task->transport;
-                       if(follower->transport) {
-                               follower->transport->deduplicated = true;
+       if (task->waiting.len > 0) {
+               struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
+               defer_sample_state_t defer_prev_sample_state;
+               defer_sample_start(&defer_prev_sample_state);
+               for (size_t i = task->waiting.len; i > 0; i--) {
+                       struct qr_task *follower = task->waiting.at[i - 1];
+                       /* Reuse MSGID and 0x20 secret */
+                       if (follower->ctx->req.rplan.pending.len > 0) {
+                               struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
+                               qry->id = leader_qry->id;
+                               qry->secret = leader_qry->secret;
+
+                               // Note that this transport may not be present in `leader_qry`'s server selection
+                               follower->transport = task->transport;
+                               if(follower->transport) {
+                                       follower->transport->deduplicated = true;
+                               }
+                               leader_qry->secret = 0; /* Next will be already decoded */
                        }
-                       leader_qry->secret = 0; /* Next will be already decoded */
+                       qr_task_step(follower, packet_source, pkt);
+                       qr_task_unref(follower);
+                       defer_sample_restart();
                }
-               qr_task_step(follower, packet_source, pkt);
-               qr_task_unref(follower);
+               defer_sample_stop(&defer_prev_sample_state, true);
+               task->waiting.len = 0;
        }
-       task->waiting.len = 0;
        task->leading = false;
 }