From: Lukáš Ondráček Date: Mon, 25 Nov 2024 18:10:20 +0000 (+0100) Subject: daemon/defer: allow recursive time accounting, fix subreq accounting X-Git-Tag: v6.0.10~6^2~33 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=refs%2Fenvironments%2Fdocs-develop-defe-x6j6qe%2Fdeployments%2F5754;p=thirdparty%2Fknot-resolver.git daemon/defer: allow recursive time accounting, fix subreq accounting --- diff --git a/daemon/defer.c b/daemon/defer.c index e4456dfe3..ff65a963d 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -102,7 +102,7 @@ static inline void phase_set(enum phase p) phase = p; } } -static inline void phase_account(uint64_t nsec) +static inline void phase_charge(uint64_t nsec) { kr_assert(phase != PHASE_ANY); phase_elapsed += nsec; @@ -135,10 +135,10 @@ static bool using_avx2(void) } /// Increment KRU counters by given time. -void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream) +void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream) { if (phase_accounting) { - phase_account(nsec); + phase_charge(nsec); phase_accounting = false; } @@ -321,7 +321,7 @@ static inline void process_single_deferred(void) if (kr_fails_assert(ctx)) return; defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream); - phase_accounting = true; + phase_accounting = true; // TODO check there are no suspensions of sampling struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx); @@ -448,10 +448,15 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( push_query(ctx, priority, false); waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream); // for stream, payload is counted in session wire buffer - while (waiting_requests_size > MAX_WAITING_REQS_SIZE) { - defer_sample_restart(); - process_single_deferred(); // possibly defers again without decreasing waiting_requests_size - // defer_sample_stop should be called soon outside + + if (waiting_requests_size > MAX_WAITING_REQS_SIZE) { + defer_sample_state_t prev_sample_state; + defer_sample_start(&prev_sample_state); + do { + process_single_deferred(); // possibly defers again without decreasing waiting_requests_size + defer_sample_restart(); + } while (waiting_requests_size > MAX_WAITING_REQS_SIZE); + defer_sample_stop(&prev_sample_state, true); } return protolayer_async(); @@ -489,14 +494,14 @@ static void defer_queues_idle(uv_idle_t *handle) kr_assert(waiting_requests > 0); VERBOSE_LOG("IDLE\n"); VERBOSE_LOG(" %d waiting\n", waiting_requests); - defer_sample_start(); + defer_sample_start(NULL); uint64_t idle_stamp = defer_sample_state.stamp; - while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) { + do { process_single_deferred(); defer_sample_restart(); - } + } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)); + defer_sample_stop(NULL, true); cleanup_queues(); - defer_sample_stop(); // TODO skip calling and use just restart elsewhere? udp_queue_send_all(); if (waiting_requests > 0) { diff --git a/daemon/defer.h b/daemon/defer.h index cd899fa18..20e4ac92b 100644 --- a/daemon/defer.h +++ b/daemon/defer.h @@ -18,10 +18,10 @@ int defer_init_idle(uv_loop_t *loop); void defer_deinit(void); /// Increment KRU counters by the given time. -void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream); +void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream); typedef struct { - int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1 + bool is_accounting; /// whether currently accounting the time to someone union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet bool stream; uint64_t stamp; /// monotonic nanoseconds, probably won't wrap @@ -35,23 +35,13 @@ extern bool defer_initialized; /// defer_init was called, possibly keeping defer // TODO: reconsider `static inline` cases below #include -static inline uint64_t get_stamp(void) +static inline uint64_t defer_get_stamp(void) { struct timespec now_ts = {0}; clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts); return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec; } -/// Start accounting work, if not doing it already. -static inline void defer_sample_start(void) -{ - if (!defer) return; - kr_assert(!defer_sample_state.is_accounting); - ++defer_sample_state.is_accounting; - defer_sample_state.stamp = get_stamp(); - defer_sample_state.addr.ip.sa_family = AF_UNSPEC; -} - /// Annotate the work currently being accounted by an IP address. static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream) { @@ -78,38 +68,74 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream) defer_sample_state.stream = stream; } -/// Stop accounting work - and change the source if applicable. -static inline void defer_sample_stop(void) +/// Internal; start accounting work at specified timestamp. +static inline void defer_sample_start_stamp(uint64_t stamp) { if (!defer) return; + kr_assert(!defer_sample_state.is_accounting); + defer_sample_state.is_accounting = true; + defer_sample_state.stamp = stamp; + defer_sample_state.addr.ip.sa_family = AF_UNSPEC; +} - if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird - if (--defer_sample_state.is_accounting) return; - if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return; +/// Internal; stop accounting work at specified timestamp and charge the source if applicable. +static inline void defer_sample_stop_stamp(uint64_t stamp) +{ + if (!defer) return; + kr_assert(defer_sample_state.is_accounting); + defer_sample_state.is_accounting = false; - const uint64_t elapsed = get_stamp() - defer_sample_state.stamp; + if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return; - // we accounted something + const uint64_t elapsed = stamp - defer_sample_state.stamp; + if (elapsed == 0) return; // TODO: some queries of internal origin have suspicioiusly high numbers. // We won't be really accounting those, but it might suggest some other issue. - defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream); + defer_charge(elapsed, &defer_sample_state.addr, defer_sample_state.stream); } -/// Stop accounting if active, then start again. Uses just one stamp. -static inline void defer_sample_restart(void) -{ +/// Start accounting work; optionally save state of current accounting. +/// Current state can be saved only after having an address assigned. +static inline void defer_sample_start(defer_sample_state_t *prev_state_out) { if (!defer) return; + uint64_t stamp = defer_get_stamp(); - uint64_t stamp = get_stamp(); - - if (defer_sample_state.is_accounting > 0) { - const uint64_t elapsed = stamp - defer_sample_state.stamp; - defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream); + // suspend + if (prev_state_out) { + *prev_state_out = defer_sample_state; // TODO stamp is not needed + if (defer_sample_state.is_accounting) + defer_sample_stop_stamp(stamp); } - defer_sample_state.stamp = stamp; - defer_sample_state.addr.ip.sa_family = AF_UNSPEC; - defer_sample_state.is_accounting = 1; + // start + defer_sample_start_stamp(stamp); +} + +/// Stop accounting and start it again. +static inline void defer_sample_restart(void) { + if (!defer) return; + uint64_t stamp = defer_get_stamp(); + + // stop + defer_sample_stop_stamp(stamp); + + // start + defer_sample_start_stamp(stamp); +} + +/// Stop accounting and charge the source if applicable; optionally resume previous accounting. +static inline void defer_sample_stop(defer_sample_state_t *prev_state, bool reuse_last_stamp) { + if (!defer) return; + uint64_t stamp = reuse_last_stamp ? defer_sample_state.stamp : defer_get_stamp(); + + // stop + defer_sample_stop_stamp(stamp); + + // resume + if (prev_state) { + defer_sample_state = *prev_state; + defer_sample_state.stamp = stamp; + } } diff --git a/daemon/session2.c b/daemon/session2.c index eb3bedb3c..d16112b1f 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -630,7 +630,7 @@ static int session2_submit( // Note two cases: incoming session (new request) // vs. outgoing session (resuming work on some request) if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0)) - defer_sample_start(); + defer_sample_start(NULL); struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size); kr_require(ctx); @@ -692,7 +692,7 @@ static int session2_submit( int ret = protolayer_step(ctx); if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0)) - defer_sample_stop(); + defer_sample_stop(NULL, false); return ret; } @@ -980,10 +980,10 @@ uv_handle_t *session2_get_handle(struct session2 *s) static void session2_on_timeout(uv_timer_t *timer) { - defer_sample_start(); + defer_sample_start(NULL); struct session2 *s = timer->data; session2_event(s, s->timer_event, NULL); - defer_sample_stop(); + defer_sample_stop(NULL, false); } int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat) diff --git a/daemon/worker.c b/daemon/worker.c index 8596e93bf..b4516d2e5 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -879,26 +879,32 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_ kr_assert(ret == KNOT_EOK && val_deleted == task); } /* Notify waiting tasks. */ - struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending); - for (size_t i = task->waiting.len; i > 0; i--) { - struct qr_task *follower = task->waiting.at[i - 1]; - /* Reuse MSGID and 0x20 secret */ - if (follower->ctx->req.rplan.pending.len > 0) { - struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending); - qry->id = leader_qry->id; - qry->secret = leader_qry->secret; - - // Note that this transport may not be present in `leader_qry`'s server selection - follower->transport = task->transport; - if(follower->transport) { - follower->transport->deduplicated = true; + if (task->waiting.len > 0) { + struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending); + defer_sample_state_t defer_prev_sample_state; + defer_sample_start(&defer_prev_sample_state); + for (size_t i = task->waiting.len; i > 0; i--) { + struct qr_task *follower = task->waiting.at[i - 1]; + /* Reuse MSGID and 0x20 secret */ + if (follower->ctx->req.rplan.pending.len > 0) { + struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending); + qry->id = leader_qry->id; + qry->secret = leader_qry->secret; + + // Note that this transport may not be present in `leader_qry`'s server selection + follower->transport = task->transport; + if(follower->transport) { + follower->transport->deduplicated = true; + } + leader_qry->secret = 0; /* Next will be already decoded */ } - leader_qry->secret = 0; /* Next will be already decoded */ + qr_task_step(follower, packet_source, pkt); + qr_task_unref(follower); + defer_sample_restart(); } - qr_task_step(follower, packet_source, pkt); - qr_task_unref(follower); + defer_sample_stop(&defer_prev_sample_state, true); + task->waiting.len = 0; } - task->waiting.len = 0; task->leading = false; }