phase = p;
}
}
-static inline void phase_account(uint64_t nsec)
+static inline void phase_charge(uint64_t nsec)
{
kr_assert(phase != PHASE_ANY);
phase_elapsed += nsec;
}
/// Increment KRU counters by given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream)
+void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
{
if (phase_accounting) {
- phase_account(nsec);
+ phase_charge(nsec);
phase_accounting = false;
}
if (kr_fails_assert(ctx)) return;
defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
- phase_accounting = true;
+ phase_accounting = true; // TODO check there are no suspensions of sampling
struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
push_query(ctx, priority, false);
waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
// for stream, payload is counted in session wire buffer
- while (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
- defer_sample_restart();
- process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
- // defer_sample_stop should be called soon outside
+
+ if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
+ defer_sample_state_t prev_sample_state;
+ defer_sample_start(&prev_sample_state);
+ do {
+ process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
+ defer_sample_restart();
+ } while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
+ defer_sample_stop(&prev_sample_state, true);
}
return protolayer_async();
kr_assert(waiting_requests > 0);
VERBOSE_LOG("IDLE\n");
VERBOSE_LOG(" %d waiting\n", waiting_requests);
- defer_sample_start();
+ defer_sample_start(NULL);
uint64_t idle_stamp = defer_sample_state.stamp;
- while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)) {
+ do {
process_single_deferred();
defer_sample_restart();
- }
+ } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
+ defer_sample_stop(NULL, true);
cleanup_queues();
- defer_sample_stop(); // TODO skip calling and use just restart elsewhere?
udp_queue_send_all();
if (waiting_requests > 0) {
void defer_deinit(void);
/// Increment KRU counters by the given time.
-void defer_account(uint64_t nsec, union kr_sockaddr *addr, bool stream);
+void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream);
typedef struct {
- int8_t is_accounting; /// whether currently accounting the time to someone; should be 0/1
+ bool is_accounting; /// whether currently accounting the time to someone
union kr_sockaddr addr; /// request source (to which we account) or AF_UNSPEC if unknown yet
bool stream;
uint64_t stamp; /// monotonic nanoseconds, probably won't wrap
// TODO: reconsider `static inline` cases below
#include <time.h>
-static inline uint64_t get_stamp(void)
+static inline uint64_t defer_get_stamp(void)
{
struct timespec now_ts = {0};
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &now_ts);
return now_ts.tv_nsec + 1000*1000*1000 * (uint64_t)now_ts.tv_sec;
}
-/// Start accounting work, if not doing it already.
-static inline void defer_sample_start(void)
-{
- if (!defer) return;
- kr_assert(!defer_sample_state.is_accounting);
- ++defer_sample_state.is_accounting;
- defer_sample_state.stamp = get_stamp();
- defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
-}
-
/// Annotate the work currently being accounted by an IP address.
static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)
{
defer_sample_state.stream = stream;
}
-/// Stop accounting work - and change the source if applicable.
-static inline void defer_sample_stop(void)
+/// Internal; start accounting work at specified timestamp.
+static inline void defer_sample_start_stamp(uint64_t stamp)
{
if (!defer) return;
+ kr_assert(!defer_sample_state.is_accounting);
+ defer_sample_state.is_accounting = true;
+ defer_sample_state.stamp = stamp;
+ defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
+}
- if (kr_fails_assert(defer_sample_state.is_accounting > 0)) return; // weird
- if (--defer_sample_state.is_accounting) return;
- if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return;
+/// Internal; stop accounting work at specified timestamp and charge the source if applicable.
+static inline void defer_sample_stop_stamp(uint64_t stamp)
+{
+ if (!defer) return;
+ kr_assert(defer_sample_state.is_accounting);
+ defer_sample_state.is_accounting = false;
- const uint64_t elapsed = get_stamp() - defer_sample_state.stamp;
+ if (defer_sample_state.addr.ip.sa_family == AF_UNSPEC) return;
- // we accounted something
+ const uint64_t elapsed = stamp - defer_sample_state.stamp;
+ if (elapsed == 0) return;
// TODO: some queries of internal origin have suspicioiusly high numbers.
// We won't be really accounting those, but it might suggest some other issue.
- defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
+ defer_charge(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
}
-/// Stop accounting if active, then start again. Uses just one stamp.
-static inline void defer_sample_restart(void)
-{
+/// Start accounting work; optionally save state of current accounting.
+/// Current state can be saved only after having an address assigned.
+static inline void defer_sample_start(defer_sample_state_t *prev_state_out) {
if (!defer) return;
+ uint64_t stamp = defer_get_stamp();
- uint64_t stamp = get_stamp();
-
- if (defer_sample_state.is_accounting > 0) {
- const uint64_t elapsed = stamp - defer_sample_state.stamp;
- defer_account(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
+ // suspend
+ if (prev_state_out) {
+ *prev_state_out = defer_sample_state; // TODO stamp is not needed
+ if (defer_sample_state.is_accounting)
+ defer_sample_stop_stamp(stamp);
}
- defer_sample_state.stamp = stamp;
- defer_sample_state.addr.ip.sa_family = AF_UNSPEC;
- defer_sample_state.is_accounting = 1;
+ // start
+ defer_sample_start_stamp(stamp);
+}
+
+/// Stop accounting and start it again.
+static inline void defer_sample_restart(void) {
+ if (!defer) return;
+ uint64_t stamp = defer_get_stamp();
+
+ // stop
+ defer_sample_stop_stamp(stamp);
+
+ // start
+ defer_sample_start_stamp(stamp);
+}
+
+/// Stop accounting and charge the source if applicable; optionally resume previous accounting.
+static inline void defer_sample_stop(defer_sample_state_t *prev_state, bool reuse_last_stamp) {
+ if (!defer) return;
+ uint64_t stamp = reuse_last_stamp ? defer_sample_state.stamp : defer_get_stamp();
+
+ // stop
+ defer_sample_stop_stamp(stamp);
+
+ // resume
+ if (prev_state) {
+ defer_sample_state = *prev_state;
+ defer_sample_state.stamp = stamp;
+ }
}
// Note two cases: incoming session (new request)
// vs. outgoing session (resuming work on some request)
if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
- defer_sample_start();
+ defer_sample_start(NULL);
struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
kr_require(ctx);
int ret = protolayer_step(ctx);
if ((direction == PROTOLAYER_UNWRAP) && (layer_ix == 0))
- defer_sample_stop();
+ defer_sample_stop(NULL, false);
return ret;
}
static void session2_on_timeout(uv_timer_t *timer)
{
- defer_sample_start();
+ defer_sample_start(NULL);
struct session2 *s = timer->data;
session2_event(s, s->timer_event, NULL);
- defer_sample_stop();
+ defer_sample_stop(NULL, false);
}
int session2_timer_start(struct session2 *s, enum protolayer_event_type event, uint64_t timeout, uint64_t repeat)
kr_assert(ret == KNOT_EOK && val_deleted == task);
}
/* Notify waiting tasks. */
- struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
- for (size_t i = task->waiting.len; i > 0; i--) {
- struct qr_task *follower = task->waiting.at[i - 1];
- /* Reuse MSGID and 0x20 secret */
- if (follower->ctx->req.rplan.pending.len > 0) {
- struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
- qry->id = leader_qry->id;
- qry->secret = leader_qry->secret;
-
- // Note that this transport may not be present in `leader_qry`'s server selection
- follower->transport = task->transport;
- if(follower->transport) {
- follower->transport->deduplicated = true;
+ if (task->waiting.len > 0) {
+ struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
+ defer_sample_state_t defer_prev_sample_state;
+ defer_sample_start(&defer_prev_sample_state);
+ for (size_t i = task->waiting.len; i > 0; i--) {
+ struct qr_task *follower = task->waiting.at[i - 1];
+ /* Reuse MSGID and 0x20 secret */
+ if (follower->ctx->req.rplan.pending.len > 0) {
+ struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
+ qry->id = leader_qry->id;
+ qry->secret = leader_qry->secret;
+
+ // Note that this transport may not be present in `leader_qry`'s server selection
+ follower->transport = task->transport;
+ if(follower->transport) {
+ follower->transport->deduplicated = true;
+ }
+ leader_qry->secret = 0; /* Next will be already decoded */
}
- leader_qry->secret = 0; /* Next will be already decoded */
+ qr_task_step(follower, packet_source, pkt);
+ qr_task_unref(follower);
+ defer_sample_restart();
}
- qr_task_step(follower, packet_source, pkt);
- qr_task_unref(follower);
+ defer_sample_stop(&defer_prev_sample_state, true);
+ task->waiting.len = 0;
}
- task->waiting.len = 0;
task->leading = false;
}