following keywords are supported :
- messages
- option var-prefix
- - timeout hello|idle|ack
+ - timeout hello|idle|ack|processing
- use-backend
timeout ack <timeout>
Set the maximum time to wait for an agent to receive the acknowledgement to a
- NOTIFY frame.
+ NOTIFY frame. It is applied on the stream that handle the connection with the
+ agent.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
timeout hello <timeout>
Set the maximum time to wait for an agent to receive the AGENT-HELLO frame.
+ It is applied on the stream that handle the connection with the agent.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
timeout idle <timeout>
- Set the maximum time to wait for an agent to close an idle connection.
+ Set the maximum time to wait for an agent to close an idle connection. It is
+ applied on the stream that handle the connection with the agent.
+
+ Arguments :
+ <timeout> is the timeout value specified in milliseconds by default, but
+ can be in any other unit if the number is suffixed by the unit,
+ as explained at the top of this document.
+
+
+timeout processing <timeout>
+ Set the maximum time to wait for a stream to process an event, i.e to acquire
+ a stream to talk with an agent, to encode all messages, to send the NOTIFY
+ frame, to receive the corrsponding acknowledgement and to process all
+ actions. It is applied on the stream that handle the client and the server
+ sessions.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
char *name; /* Backend name used during conf parsing */
} b;
struct {
- unsigned int hello; /* Max time to receive AGENT-HELLO frame */
- unsigned int idle; /* Max Idle timeout */
- unsigned int ack; /* Max time to acknowledge a NOTIFY frame */
+ unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
+ unsigned int idle; /* Max Idle timeout (in SPOE applet) */
+ unsigned int ack; /* Max time to acknowledge a NOTIFY frame (in SPOE applet)*/
+ unsigned int processing; /* Max time to process an event (in the main stream) */
} timeout;
char *var_pfx; /* Prefix used for vars set by the agent */
unsigned int stream_id; /* stream_id and frame_id are used */
unsigned int frame_id; /* to map NOTIFY and ACK frames */
-
+ unsigned int process_exp; /* expiration date to process an event */
};
/* Set if the handle on SIGUSR1 is registered */
{
struct spoe_context *ctx;
+ if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
+ return;
+
if (LIST_ISEMPTY(&agent->applet_wq))
LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
else {
/* Reset the flag to allow next processing */
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+ /* Reset processing timer */
+ ctx->process_exp = TICK_ETERNITY;
+
/* Release the buffer if needed */
if (ctx->buffer != &buf_empty) {
b_free(&ctx->buffer);
process_spoe_event(struct stream *s, struct spoe_context *ctx,
enum spoe_event ev)
{
- int dir, ret = 1;
+ struct spoe_config *conf = FLT_CONF(ctx->filter);
+ struct spoe_agent *agent = conf->agent;
+ int dir, ret = 1;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - ctx-state=%s - event=%s\n",
(int)now.tv_sec, (int)now.tv_usec,
- ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
- __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
+ agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
spoe_event_str[ev]);
dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
if (ctx->state == SPOE_CTX_ST_ERROR)
goto error;
+ if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - failed to process event '%s': timeout\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ agent->id, __FUNCTION__, s, spoe_event_str[ev]);
+ send_log(ctx->strm->be, LOG_WARNING,
+ "failed to process event '%s': timeout.\n",
+ spoe_event_str[ev]);
+ goto error;
+ }
+
if (ctx->state == SPOE_CTX_ST_READY) {
+ if (!tick_isset(ctx->process_exp)) {
+ ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
+ s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
+ ctx->process_exp);
+ }
+
ret = acquire_spoe_appctx(ctx, dir);
if (ret <= 0) {
if (!ret)
LIST_INIT(&ctx->buffer_wait);
LIST_INIT(&ctx->applet_wait);
- ctx->stream_id = 0;
- ctx->frame_id = 1;
+ ctx->stream_id = 0;
+ ctx->frame_id = 1;
+ ctx->process_exp = TICK_ETERNITY;
return ctx;
}
}
}
+
+/*
+ * Called when the stream is woken up because of expired timer.
+ */
+static void
+spoe_check_timeouts(struct stream *s, struct filter *filter)
+{
+ struct spoe_context *ctx = filter->ctx;
+
+ if (tick_is_expired(ctx->process_exp, now_ms))
+ s->task->state |= TASK_WOKEN_MSG;
+}
+
/* Called when we are ready to filter data on a channel */
static int
spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
.check = spoe_check,
/* Handle start/stop of SPOE */
- .attach = spoe_start,
- .detach = spoe_stop,
+ .attach = spoe_start,
+ .detach = spoe_stop,
+ .check_timeouts = spoe_check_timeouts,
/* Handle channels activity */
.channel_start_analyze = spoe_start_analyze,
curagent->timeout.hello = TICK_ETERNITY;
curagent->timeout.ack = TICK_ETERNITY;
curagent->timeout.idle = TICK_ETERNITY;
+ curagent->timeout.processing = TICK_ETERNITY;
curagent->var_pfx = NULL;
curagent->new_applets = 0;
tv = &curagent->timeout.idle;
else if (!strcmp(args[1], "ack"))
tv = &curagent->timeout.ack;
+ else if (!strcmp(args[1], "processing"))
+ tv = &curagent->timeout.processing;
else {
- Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n",
+ Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle', 'ack' or 'processing' (got %s).\n",
file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
curagent->id, curagent->conf.file, curagent->conf.line);
goto error;
}
- if (curagent->timeout.hello == TICK_ETERNITY ||
- curagent->timeout.idle == TICK_ETERNITY ||
- curagent->timeout.ack == TICK_ETERNITY) {
+ if (curagent->timeout.hello == TICK_ETERNITY ||
+ curagent->timeout.idle == TICK_ETERNITY ||
+ curagent->timeout.ack == TICK_ETERNITY ||
+ curagent->timeout.processing == TICK_ETERNITY) {
+ if (curagent->timeout.ack == TICK_ETERNITY)
+ curagent->timeout.ack = curagent->timeout.idle;
+
Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
" | While not properly invalid, you will certainly encounter various problems\n"
" | with such a configuration. To fix this, please ensure that all following\n"
- " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n",
+ " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack', 'processing'.\n",
px->id, curagent->id, curagent->conf.file, curagent->conf.line);
}
if (curagent->var_pfx == NULL) {