}
VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
- kr_straddr(&defer_sample_state.addr.ip), nsec / 1000000.0, max_load, prefix);
+ kr_straddr(&addr->ip), nsec / 1000000.0, max_load, prefix);
}
/// Determine priority of the request in [-1, QUEUES_CNT - 1].
static inline void process_single_deferred(void)
{
struct protolayer_iter_ctx *ctx = pop_query();
- if (ctx == NULL) return;
+ if (kr_fails_assert(ctx)) return;
defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
phase_accounting = true;
struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+ struct session2 *session = ctx->session;
uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
VERBOSE_LOG(" %s POP from %d after %4.3f ms\n",
return;
}
+ bool eof = false;
if (ctx->session->stream) {
kr_assert(queue_head(sdata->queue) == ctx);
queue_pop(sdata->queue);
+ while ((queue_len(sdata->queue) > 0) && (queue_head(sdata->queue) == NULL)) { // EOF event
+ eof = true;
+ queue_pop(sdata->queue);
+ }
if (queue_len(sdata->queue) > 0) {
VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority);
push_query(queue_head(sdata->queue), priority, true);
}
}
+ if (eof) {
+ // Keep session alive even if it is somehow force-closed during continuation.
+ // TODO Is it needed?
+ session->ref_count++;
+ }
+
VERBOSE_LOG(" CONTINUE\n");
protolayer_continue(ctx);
+
+ if (eof) {
+ VERBOSE_LOG(" CONTINUE EOF event\n");
+ session2_event_after(session, PROTOLAYER_TYPE_DEFER, PROTOLAYER_EVENT_EOF, NULL);
+ session2_unhandle(session); // decrease ref_count
+ }
}
/// Break expired requests at the beginning of queues, uses current stamp.
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
- if (!defer)
- return protolayer_continue(ctx);
-
- if (ctx->session->outgoing)
+ if (!defer || ctx->session->outgoing)
return protolayer_continue(ctx);
defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
return protolayer_async();
}
+/// Unwrap event: EOF event may be deferred here, other events pass synchronously.
+static enum protolayer_event_cb_result pl_defer_event_unwrap(
+ enum protolayer_event_type event, void **baton,
+ struct session2 *session, void *sess_data)
+{
+ if (!defer || session->outgoing)
+ return PROTOLAYER_EVENT_PROPAGATE;
+
+ struct pl_defer_sess_data *sdata = sess_data;
+ if ((event == PROTOLAYER_EVENT_EOF) && (queue_len(sdata->queue) > 0)) {
+ // defer EOF event if unprocessed data remain, baton is dropped if any
+ queue_push(sdata->queue, NULL);
+ VERBOSE_LOG(" %s event %s deferred\n",
+ session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
+ protolayer_event_name(event));
+ return PROTOLAYER_EVENT_CONSUME;
+ }
+
+ VERBOSE_LOG(" %s event %s passes through synchronously%s%s\n",
+ session->comm_storage.src_addr ? kr_straddr(session->comm_storage.src_addr) : "(null)",
+ protolayer_event_name(event),
+ queue_len(sdata->queue) > 0 ? " ahead of deferred data" : "",
+ *baton ? " (with baton)" : "");
+ return PROTOLAYER_EVENT_PROPAGATE;
+}
+
/// Idle: continue processing deferred requests.
static void defer_queues_idle(uv_idle_t *handle)
{
.sess_size = sizeof(struct pl_defer_sess_data),
.sess_init = pl_defer_sess_init,
.unwrap = pl_defer_unwrap,
+ .event_unwrap = pl_defer_event_unwrap,
};
}