static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
{
if (ctx->session->stream) {
+ struct session2 *s = ctx->session;
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+ s->ref_count++; // keep session and sdata alive for a while
waiting_requests_size -= sdata->size;
if (!ctx->session->closing) {
- session2_force_close(ctx->session); // session is not freed here as iter contexts exist
+ session2_force_close(ctx->session);
}
- queue_pop(sdata->queue);
- while (queue_len(sdata->queue) > 0) {
- struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
- waiting_requests_size -= idata->size;
- protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist
- ctx = queue_head(sdata->queue);
+ kr_assert(ctx == queue_head(sdata->queue));
+ while (true) {
queue_pop(sdata->queue);
+ if (ctx) {
+ struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+ waiting_requests_size -= idata->size;
+ protolayer_break(ctx, kr_error(err));
+ }
+ if (queue_len(sdata->queue) == 0) break;
+ ctx = queue_head(sdata->queue);
}
+ session2_unhandle(s); // decrease ref_count
+ } else {
+ struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+ waiting_requests_size -= idata->size;
+ protolayer_break(ctx, kr_error(err));
}
- struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
- waiting_requests_size -= idata->size;
- protolayer_break(ctx, kr_error(err));
kr_assert(waiting_requests ? waiting_requests_size > 0 : waiting_requests_size == 0);
}
if (eof) {
// Keep session alive even if it is somehow force-closed during continuation.
- // TODO Is it needed?
+ // TODO Is it possible?
session->ref_count++;
}