}
-
/// Push query to a queue according to its priority and activate idle.
static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end)
{
if (age_ns >= REQ_TIMEOUT) {
VERBOSE_LOG(" BREAK (timeout)\n");
+ kr_log_warning(DEFER, "Data from %s too long in queue, dropping.\n",
+ kr_straddr(ctx->comm->src_addr)); // TODO make it notice as it's intended behavior of defer?
break_query(ctx, ETIME);
return;
}
return protolayer_continue(ctx);
defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
- struct pl_defer_iter_data *data = iter_data;
+ struct pl_defer_iter_data *idata = iter_data;
struct pl_defer_sess_data *sdata = sess_data;
- data->req_stamp = defer_sample_state.stamp;
+ idata->req_stamp = defer_sample_state.stamp;
VERBOSE_LOG(" %s UNWRAP\n",
kr_straddr(ctx->comm->src_addr));
if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred
queue_push(sdata->queue, ctx);
- waiting_requests_size += data->size = protolayer_iter_size_est(ctx, false);
+ waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false);
// payload counted in session wire buffer
VERBOSE_LOG(" PUSH as follow-up\n");
return protolayer_async();
waiting_requests_size += sdata->size = protolayer_sess_size_est(ctx->session);
}
push_query(ctx, priority, false);
- waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
+ waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
// for stream, payload is counted in session wire buffer
if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
defer_sample_start(&prev_sample_state);
do {
process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
+ // If the unwrapped query is to be processed here,
+ // it is the last iteration and the query is processed after returning.
defer_sample_restart();
} while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
defer_sample_stop(&prev_sample_state, true);
if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) {
// TODO: this costs performance, so only in some debug mode?
- kr_assert(kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) == kr_ok());
- return;
+ if (kr_fails_assert(kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) == kr_ok())) {
+ kr_log_error(DEFER, "%s != %s\n",
+ kr_straddr(&addr->ip),
+ kr_straddr(&defer_sample_state.addr.ip));
+ abort(); // TODO change this to warning or remove before releasing
+ return;
+ }
+
}
switch (addr->ip.sa_family) {