struct stream *strm; /* The stream that should be offloaded */
struct list *messages; /* List of messages that will be sent during the stream processing */
- struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
- struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
+ struct buffer *buffer; /* Buffer used to store a encoded messages */
+ struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list list;
enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
unsigned int flags; /* SPOE_APPCTX_FL_* */
unsigned int status_code; /* SPOE_FRM_ERR_* */
+ struct buffer *buffer; /* Buffer used to store a encoded messages */
+ struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
struct list list; /* next spoe appctx for the same agent */
};
struct flt_ops spoe_ops;
static int queue_spoe_context(struct spoe_context *ctx);
-static int acquire_spoe_buffer(struct spoe_context *ctx);
-static void release_spoe_buffer(struct spoe_context *ctx);
+static int acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
+static void release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
/********************************************************************
* helper functions/globals
idx += encode_spoe_varint(ctx->stream_id, frame+idx);
idx += encode_spoe_varint(ctx->frame_id, frame+idx);
- /* Copy encoded messages */
- if (idx + ctx->buffer->i > size) {
+ /* check the buffer size */
+ if (idx + SPOE_APPCTX(appctx)->buffer->i > size) {
spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
return 0;
}
/* Copy encoded messages */
- memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
- idx += ctx->buffer->i;
+ memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i);
+ idx += SPOE_APPCTX(appctx)->buffer->i;
return idx;
}
return 0;
found:
- if (!acquire_spoe_buffer(ctx))
+ if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait))
return 1; /* Retry later */
+ /* Transfer the buffer ownership to the SPOE context */
+ ctx->buffer = SPOE_APPCTX(appctx)->buffer;
+ SPOE_APPCTX(appctx)->buffer = &buf_empty;
+
/* Copy encoded actions */
memcpy(ctx->buffer->p, frame+idx, size-idx);
ctx->buffer->i = size-idx;
/********************************************************************
* Functions that manage the SPOE applet
********************************************************************/
+static int
+wakeup_spoe_appctx(struct appctx *appctx)
+{
+ si_applet_want_get(appctx->owner);
+ si_applet_want_put(appctx->owner);
+ appctx_wakeup(appctx);
+ return 1;
+}
+
/* Callback function that catches applet timeouts. If a timeout occurred, we set
* <appctx->st1> flag and the SPOE applet is woken up. */
static struct task *
task->expire = TICK_ETERNITY;
appctx->st1 = SPOE_APPCTX_ERR_TOUT;
}
- si_applet_want_get(appctx->owner);
- si_applet_want_put(appctx->owner);
- appctx_wakeup(appctx);
+ wakeup_spoe_appctx(appctx);
return task;
}
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
+ release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
if (!LIST_ISEMPTY(&agent->applets))
}
ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+
+ /* Transfer the buffer ownership to the SPOE appctx */
+ SPOE_APPCTX(appctx)->buffer = ctx->buffer;
+ ctx->buffer = &buf_empty;
+
ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
if (ret > 1)
ret = send_spoe_frame(appctx, frame, ret);
agent->sending_rate++;
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_status_code + 0x100);
- release_spoe_buffer(ctx);
+ release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
default:
agent->sending_rate++;
ctx->state = SPOE_CTX_ST_WAITING_ACK;
- release_spoe_buffer(ctx);
+ release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC)
SPOE_APPCTX(appctx)->max_frame_size = conf->agent->max_frame_size;
SPOE_APPCTX(appctx)->flags = 0;
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
+ SPOE_APPCTX(appctx)->buffer = &buf_empty;
+
+ LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
+ SPOE_APPCTX(appctx)->buffer_wait.target = appctx;
+ SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_appctx;
LIST_INIT(&SPOE_APPCTX(appctx)->list);
LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
list_for_each_entry(spoe_appctx, &agent->applets, list) {
appctx = spoe_appctx->owner;
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
- si_applet_want_get(appctx->owner);
- si_applet_want_put(appctx->owner);
- appctx_wakeup(appctx);
+ wakeup_spoe_appctx(appctx);
LIST_DEL(&spoe_appctx->list);
LIST_ADDQ(&agent->applets, &spoe_appctx->list);
break;
if (ctx->flags & SPOE_CTX_FL_PROCESS)
goto wait;
- if (!acquire_spoe_buffer(ctx))
+ if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait))
goto wait;
/* Set the right flag to prevent request and response processing
/* Reset processing timer */
ctx->process_exp = TICK_ETERNITY;
- release_spoe_buffer(ctx);
+ release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
if (!LIST_ISEMPTY(&ctx->list)) {
LIST_DEL(&ctx->list);
* Functions that create/destroy SPOE contexts
**************************************************************************/
static int
-acquire_spoe_buffer(struct spoe_context *ctx)
+acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
- if (ctx->buffer != &buf_empty)
+ if (*buf != &buf_empty)
return 1;
- if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
- LIST_DEL(&ctx->buffer_wait.list);
- LIST_INIT(&ctx->buffer_wait.list);
+ if (!LIST_ISEMPTY(&buffer_wait->list)) {
+ LIST_DEL(&buffer_wait->list);
+ LIST_INIT(&buffer_wait->list);
}
- if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
+ if (b_alloc_margin(buf, global.tune.reserved_bufs))
return 1;
- LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
+ LIST_ADDQ(&buffer_wq, &buffer_wait->list);
return 0;
}
static void
-release_spoe_buffer(struct spoe_context *ctx)
+release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
- if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
- LIST_DEL(&ctx->buffer_wait.list);
- LIST_INIT(&ctx->buffer_wait.list);
+ if (!LIST_ISEMPTY(&buffer_wait->list)) {
+ LIST_DEL(&buffer_wait->list);
+ LIST_INIT(&buffer_wait->list);
}
/* Release the buffer if needed */
- if (ctx->buffer != &buf_empty) {
- b_free(&ctx->buffer);
- offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+ if (*buf != &buf_empty) {
+ b_free(buf);
+ offer_buffers(buffer_wait->target,
+ tasks_run_queue + applets_active_queue);
}
}
-static int wakeup_spoe_context(struct spoe_context *ctx)
+static int
+wakeup_spoe_context(struct spoe_context *ctx)
{
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
return 1;
list_for_each_entry(fconf, &p->filter_configs, list) {
struct spoe_config *conf;
struct spoe_agent *agent;
- struct appctx *appctx;
struct spoe_appctx *spoe_appctx;
if (fconf->id != spoe_filter_id)
agent = conf->agent;
list_for_each_entry(spoe_appctx, &agent->applets, list) {
- appctx = spoe_appctx->owner;
- si_applet_want_get(appctx->owner);
- si_applet_want_put(appctx->owner);
- appctx_wakeup(appctx);
+ wakeup_spoe_appctx(spoe_appctx->owner);
}
}
p = p->next;
if (tick_is_expired(ctx->process_exp, now_ms)) {
s->pending_events |= TASK_WOKEN_MSG;
- release_spoe_buffer(ctx);
+ release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
}
}