{
if (phase_accounting) {
phase_charge(nsec);
- phase_accounting = false;
}
if (!stream) return; // UDP is not accounted in KRU
if (kr_fails_assert(ctx)) return;
defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
- phase_accounting = true; // TODO check there are no suspensions of sampling
struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
+ phase_accounting = false;
if (!defer || ctx->session->outgoing)
return protolayer_continue(ctx);
int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
- if (priority == -1) {
+ if (priority == PRIORITY_SYNC) {
VERBOSE_LOG(" CONTINUE\n");
phase_accounting = true;
return protolayer_continue(ctx);
if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
defer_sample_state_t prev_sample_state;
defer_sample_start(&prev_sample_state);
+ phase_accounting = true;
do {
process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
// If the unwrapped query is to be processed here,
// it is the last iteration and the query is processed after returning.
defer_sample_restart();
} while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
+ phase_accounting = false;
defer_sample_stop(&prev_sample_state, true);
}
enum protolayer_event_type event, void **baton,
struct session2 *session, void *sess_data)
{
+ if ((event == PROTOLAYER_EVENT_EOF) || (event == PROTOLAYER_EVENT_GENERAL_TIMEOUT)) {
+ // disable accounting only for events that cannot occur during incoming data processing
+ phase_accounting = false;
+ }
if (!defer || session->outgoing)
return PROTOLAYER_EVENT_PROPAGATE;
VERBOSE_LOG(" %d waiting\n", waiting_requests);
defer_sample_start(NULL);
uint64_t idle_stamp = defer_sample_state.stamp;
+ phase_accounting = true;
do {
process_single_deferred();
defer_sample_restart();
} while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
+ phase_accounting = false;
defer_sample_stop(NULL, true);
cleanup_queues();
udp_queue_send_all();