}
}
+struct pl_defer_sess_data {
+ struct protolayer_data h;
+ protolayer_iter_ctx_queue_t queue; // properly ordered sequence of deferred packets, for stream only
+ // the first ctx in the queue is also in a defer queue
+};
+
struct pl_defer_iter_data {
struct protolayer_data h;
uint64_t req_stamp; // time when request was received, uses get_stamp()
/// Push query to a queue according to its priority and activate idle.
-static inline void push_query(struct protolayer_iter_ctx *ctx, int priority)
+static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end)
{
- queue_push(queues[priority], ctx);
+ if (to_head_end) {
+ queue_push_head(queues[priority], ctx);
+ } else {
+ queue_push(queues[priority], ctx);
+ }
queue_ix = MIN(queue_ix, priority);
if (waiting_requests++ <= 0) {
kr_assert(waiting_requests == 1);
defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
phase_accounting = true;
- struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx);
- uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp;
+ struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
+ struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
+ uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
VERBOSE_LOG(" %s POP from %d after %4.3f ms\n",
kr_straddr(ctx->comm->comm_addr),
if (ctx->session->closing) {
VERBOSE_LOG(" BREAK (session is closing)\n");
+ if (ctx->session->stream) {
+ queue_pop(sdata->queue);
+ while (queue_len(sdata->queue) > 0) {
+ protolayer_break(ctx, kr_error(ECANCELED)); // session is not freed here as other contexts exist
+ ctx = queue_head(sdata->queue);
+ queue_pop(sdata->queue);
+ }
+ }
protolayer_break(ctx, kr_error(ECANCELED));
return;
}
+
if (age_ns >= REQ_TIMEOUT) {
VERBOSE_LOG(" BREAK (timeout)\n");
- protolayer_break(ctx, kr_error(ETIME));
+ if (ctx->session->stream) {
+ while (queue_len(sdata->queue) > 0) {
+ ctx = queue_head(sdata->queue);
+ queue_pop(sdata->queue);
+ protolayer_break(ctx, kr_error(ETIME)); // session is not freed here as it is not closing yet
+ }
+ session2_force_close(ctx->session);
+ } else {
+ protolayer_break(ctx, kr_error(ETIME));
+ }
return;
}
int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
if (priority > queue_ix) { // priority dropped (got higher value)
VERBOSE_LOG(" PUSH to %d\n", priority);
- push_query(ctx, priority);
+ push_query(ctx, priority, false);
return;
}
+ if (ctx->session->stream) {
+ kr_assert(queue_head(sdata->queue) == ctx);
+ queue_pop(sdata->queue);
+ if (queue_len(sdata->queue) > 0) {
+ VERBOSE_LOG(" PUSH follow-up to head of %d\n", priority);
+ push_query(queue_head(sdata->queue), priority, true);
+ }
+ }
+
VERBOSE_LOG(" CONTINUE\n");
protolayer_continue(ctx);
}
defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
struct pl_defer_iter_data *data = iter_data;
+ struct pl_defer_sess_data *sdata = sess_data;
data->req_stamp = defer_sample_state.stamp;
VERBOSE_LOG(" %s UNWRAP\n",
kr_straddr(ctx->comm->comm_addr));
+
+ if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred
+ queue_push(sdata->queue, ctx);
+ VERBOSE_LOG(" PUSH as follow-up\n");
+ return protolayer_async();
+ }
+
int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
if (priority == -1) {
}
VERBOSE_LOG(" PUSH to %d\n", priority);
- push_query(ctx, priority);
- while (waiting_requests > MAX_WAITING_REQS) {
+ if (ctx->session->stream) {
+ queue_push(sdata->queue, ctx);
+ }
+ push_query(ctx, priority, false);
+ while (waiting_requests > MAX_WAITING_REQS) { // TODO follow-up stream packets are not counted here
defer_sample_restart();
process_single_deferred(); // possibly defers again without decreasing waiting_requests
// defer_sample_stop should be called soon outside
return ret;
}
+/// Initialize session queue
+int pl_defer_sess_init(struct session2 *session, void *data, void *param) {
+ struct pl_defer_sess_data *sdata = data;
+ queue_init(sdata->queue);
+ return 0;
+}
+
/// Deinitialize shared memory.
void defer_deinit(void)
{
{
protolayer_globals[PROTOLAYER_TYPE_DEFER] = (struct protolayer_globals){
.iter_size = sizeof(struct pl_defer_iter_data),
+ .sess_size = sizeof(struct pl_defer_sess_data),
+ .sess_init = pl_defer_sess_init,
.unwrap = pl_defer_unwrap,
};
}