}
}
+/// Pop and return query from the specified queue, deactivate idle if not needed.
+static inline struct protolayer_iter_ctx *pop_query_queue(int priority)
+{
+ kr_assert(queue_len(queues[priority]) > 0);
+ struct protolayer_iter_ctx *ctx = queue_head(queues[priority]);
+ queue_pop(queues[priority]);
+ if (--waiting_requests <= 0) {
+ kr_assert(waiting_requests == 0);
+ uv_idle_stop(&idle_handle);
+ VERBOSE_LOG(" deactivating idle\n");
+ }
+ return ctx;
+}
+
+
/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
/// deactivate idle if not needed.
static inline struct protolayer_iter_ctx *pop_query(void)
i = PRIORITY_UDP;
}
- struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
- queue_pop(queues[i]);
- if (--waiting_requests <= 0) {
- kr_assert(waiting_requests == 0);
- uv_idle_stop(&idle_handle);
- VERBOSE_LOG(" deactivating idle\n");
- }
- return ctx;
+ return pop_query_queue(i);
}
+// Break the given query; for streams break also all follow-up queries and force-close the stream.
+static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
+{
+ if (ctx->session->stream) {
+ struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+ if (!ctx->session->closing) {
+ session2_force_close(ctx->session); // session is not freed here as iter contexts exist
+ }
+ queue_pop(sdata->queue);
+ while (queue_len(sdata->queue) > 0) {
+ protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist
+ ctx = queue_head(sdata->queue);
+ queue_pop(sdata->queue);
+ }
+ }
+ protolayer_break(ctx, kr_error(err));
+}
+
/// Process a single deferred query (or defer again) if there is any.
/// Time accounting should have been just started, the stamp is used, accounted address is set.
static inline void process_single_deferred(void) {
if (ctx->session->closing) {
VERBOSE_LOG(" BREAK (session is closing)\n");
- if (ctx->session->stream) {
- queue_pop(sdata->queue);
- while (queue_len(sdata->queue) > 0) {
- protolayer_break(ctx, kr_error(ECANCELED)); // session is not freed here as other contexts exist
- ctx = queue_head(sdata->queue);
- queue_pop(sdata->queue);
- }
- }
- protolayer_break(ctx, kr_error(ECANCELED));
+ break_query(ctx, ECANCELED);
return;
}
if (age_ns >= REQ_TIMEOUT) {
VERBOSE_LOG(" BREAK (timeout)\n");
- if (ctx->session->stream) {
- while (queue_len(sdata->queue) > 0) {
- ctx = queue_head(sdata->queue);
- queue_pop(sdata->queue);
- protolayer_break(ctx, kr_error(ETIME)); // session is not freed here as it is not closing yet
- }
- session2_force_close(ctx->session);
- } else {
- protolayer_break(ctx, kr_error(ETIME));
- }
+ break_query(ctx, ETIME);
return;
}
protolayer_continue(ctx);
}
+/// Break expired requests at the beginning of queues, uses current stamp.
+static inline void cleanup_queues(void) {
+ for (int i = 0; i < QUEUES_CNT; i++) {
+ int cnt = 0;
+ while (queue_len(queues[i]) > 0) {
+ struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
+ struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+ uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
+ if (age_ns < REQ_TIMEOUT) break;
+ pop_query_queue(i);
+ break_query(ctx, ETIME);
+ cnt++;
+ }
+ if (cnt > 0) {
+ VERBOSE_LOG(" BREAK %d queries from %d\n", cnt, i);
+ }
+ }
+}
+
/// Unwrap: defer or process the query synchronously.
/// Time accounting should have been started, the stamp is used, accounted address is set.
static enum protolayer_iter_cb_result pl_defer_unwrap(
process_single_deferred();
defer_sample_restart();
}
+ cleanup_queues();
defer_sample_stop(); // TODO skip calling and use just restart elsewhere?
udp_queue_send_all();