{
struct spoe_message *msg, *msgback;
struct spoe_group *grp, *grpback;
+ int i;
if (!agent)
return;
LIST_DEL(&grp->list);
spoe_release_group(grp);
}
+ for (i = 0; i < global.nbthread; ++i)
+ SPIN_DESTROY(&agent->rt[i].lock);
+ free(agent->rt);
free(agent);
}
/* Try to find the corresponding SPOE context */
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
- list_for_each_entry((*ctx), &agent->waiting_queue, list) {
+ list_for_each_entry((*ctx), &agent->rt[tid].waiting_queue, list) {
if ((*ctx)->stream_id == (unsigned int)stream_id &&
(*ctx)->frame_id == (unsigned int)frame_id)
goto found;
__FUNCTION__, appctx);
/* Remove applet from the list of running applets */
- agent->applets_act--;
+ agent->rt[tid].applets_act--;
if (!LIST_ISEMPTY(&spoe_appctx->list)) {
LIST_DEL(&spoe_appctx->list);
LIST_INIT(&spoe_appctx->list);
/* Shutdown the server connection, if needed */
if (appctx->st0 != SPOE_APPCTX_ST_END) {
if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
- agent->applets_idle--;
+ agent->rt[tid].applets_idle--;
appctx->st0 = SPOE_APPCTX_ST_END;
if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
&spoe_appctx->buffer_wait);
pool_free2(pool2_spoe_appctx, spoe_appctx);
- if (!LIST_ISEMPTY(&agent->applets))
+ if (!LIST_ISEMPTY(&agent->rt[tid].applets))
goto end;
/* If this was the last running applet, notify all waiting streams */
- list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
+ list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
- list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+ list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR;
end:
/* Update runtinme agent info */
- agent->frame_size = agent->max_frame_size;
- list_for_each_entry(spoe_appctx, &agent->applets, list)
- agent->frame_size = MIN(spoe_appctx->max_frame_size,
- agent->frame_size);
+ agent->rt[tid].frame_size = agent->max_frame_size;
+ list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list)
+ HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, spoe_appctx->max_frame_size);
}
static int
default:
/* HELLO handshake is finished, set the idle timeout and
* add the applet in the list of running applets. */
- agent->applets_idle++;
+ agent->rt[tid].applets_idle++;
appctx->st0 = SPOE_APPCTX_ST_IDLE;
+ SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&SPOE_APPCTX(appctx)->list);
- LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
+ LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+ SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
/* Update runtinme agent info */
- agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size,
- agent->frame_size);
+ HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
goto next;
}
ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
SPOE_APPCTX(appctx)->max_frame_size);
}
- else if (LIST_ISEMPTY(&agent->sending_queue)) {
+ else if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
*skip = 1;
ret = 1;
goto end;
}
else {
- ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+ ctx = LIST_NEXT(&agent->rt[tid].sending_queue, typeof(ctx), list);
ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
SPOE_APPCTX(appctx)->max_frame_size);
no_frag_frame_sent:
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+ LIST_ADDQ(&agent->rt[tid].waiting_queue, &ctx->list);
}
else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
goto next;
case 0: /* ignore */
- agent->sending_rate++;
+ agent->rt[tid].sending_rate++;
fpa++;
break;
break;
default:
- agent->sending_rate++;
+ agent->rt[tid].sending_rate++;
fpa++;
break;
}
stop:
if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
appctx->st0 = SPOE_APPCTX_ST_IDLE;
- agent->applets_idle++;
+ agent->rt[tid].applets_idle++;
}
if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
+ SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&SPOE_APPCTX(appctx)->list);
- LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
+ LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+ SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
if (fpa)
SPOE_APPCTX(appctx)->task->expire =
tick_add_ifset(now_ms, agent->timeout.idle);
case SPOE_APPCTX_ST_IDLE:
if (stopping &&
- LIST_ISEMPTY(&agent->sending_queue) &&
+ LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
SPOE_APPCTX(appctx)->task->expire =
tick_add_ifset(now_ms, agent->timeout.idle);
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
goto switchstate;
}
- agent->applets_idle--;
+ agent->rt[tid].applets_idle--;
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
/* fall through */
return;
}
out:
+ if (stopping)
+ spoe_wakeup_appctx(appctx);
+
if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
task_queue(SPOE_APPCTX(appctx)->task);
si_oc(si)->flags |= CF_READ_DONTWAIT;
memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
- if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL)
+ if ((SPOE_APPCTX(appctx)->task = task_new(1UL << tid)) == NULL)
goto out_free_spoe_appctx;
SPOE_APPCTX(appctx)->owner = appctx;
strm->do_log = NULL;
strm->res.flags |= CF_READ_DONTWAIT;
- LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
- conf->agent->applets_act++;
+ SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
+ LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+ SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
+ conf->agent->rt[tid].applets_act++;
task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
task_wakeup(strm->task, TASK_WOKEN_INIT);
min_applets = min_applets_act(agent);
/* Check if we need to create a new SPOE applet or not. */
- if (agent->applets_act >= min_applets &&
- agent->applets_idle &&
- agent->sending_rate)
+ if (agent->rt[tid].applets_act >= min_applets &&
+ agent->rt[tid].applets_idle &&
+ agent->rt[tid].sending_rate)
goto end;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
/* Do not try to create a new applet if we have reached the maximum of
* connection per seconds */
if (agent->cps_max > 0) {
- if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
+ if (!freq_ctr_remain(&agent->rt[tid].conn_per_sec, agent->cps_max, 0)) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - cannot create SPOE appctx: max CPS reached\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
goto end;
}
- if (agent->applets_act <= min_applets)
+ if (agent->rt[tid].applets_act <= min_applets)
SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST;
/* Increase the per-process number of cumulated connections */
if (agent->cps_max > 0)
- update_freq_ctr(&agent->conn_per_sec, 1);
+ update_freq_ctr(&agent->rt[tid].conn_per_sec, 1);
end:
/* The only reason to return an error is when there is no applet */
- if (LIST_ISEMPTY(&agent->applets)) {
+ if (LIST_ISEMPTY(&agent->rt[tid].applets)) {
ctx->status_code = SPOE_CTX_ERR_RES;
return -1;
}
/* Add the SPOE context in the sending queue and update all running
* info */
- LIST_ADDQ(&agent->sending_queue, &ctx->list);
- if (agent->sending_rate)
- agent->sending_rate--;
+ LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
+ if (agent->rt[tid].sending_rate)
+ agent->rt[tid].sending_rate--;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - Add stream in sending queue"
" - applets_act=%u - applets_idle=%u - sending_rate=%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
- ctx->strm, agent->applets_act, agent->applets_idle,
- agent->sending_rate);
+ ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
+ agent->rt[tid].sending_rate);
/* Finally try to wakeup the first IDLE applet found and move it at the
* end of the list. */
- list_for_each_entry(spoe_appctx, &agent->applets, list) {
+ list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) {
appctx = spoe_appctx->owner;
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
spoe_wakeup_appctx(appctx);
+ SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&spoe_appctx->list);
- LIST_ADDQ(&agent->applets, &spoe_appctx->list);
+ LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list);
+ SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
break;
}
}
char *p, *end;
p = ctx->buffer->p;
- end = p + agent->frame_size - FRAME_HDR_SIZE;
+ end = p + agent->rt[tid].frame_size - FRAME_HDR_SIZE;
if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
/* Resume encoding of a SPOE message */
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s,
((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
- ctx->frag_ctx.spoe_appctx, (agent->frame_size - FRAME_HDR_SIZE),
+ ctx->frag_ctx.spoe_appctx, (agent->rt[tid].frame_size - FRAME_HDR_SIZE),
p - ctx->buffer->p);
ctx->buffer->i = p - ctx->buffer->p;
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx,
ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
- (agent->frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
+ (agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
ctx->buffer->i = p - ctx->buffer->p;
ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
struct spoe_context *ctx, int dir)
{
if (agent->eps_max > 0)
- update_freq_ctr(&agent->err_per_sec, 1);
+ update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
if (agent->var_on_error) {
struct sample smp;
if (ctx->state == SPOE_CTX_ST_READY) {
if (agent->eps_max > 0) {
- if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
+ if (!freq_ctr_remain(&agent->rt[tid].err_per_sec, agent->eps_max, 0)) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - skip processing of messages: max EPS reached\n",
(int)now.tv_sec, (int)now.tv_usec,
struct spoe_config *conf;
struct spoe_agent *agent;
struct spoe_appctx *spoe_appctx;
+ int i;
if (fconf->id != spoe_filter_id)
continue;
conf = fconf->conf;
agent = conf->agent;
- list_for_each_entry(spoe_appctx, &agent->applets, list) {
- spoe_wakeup_appctx(spoe_appctx->owner);
+ for (i = 0; i < global.nbthread; ++i) {
+ SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
+ list_for_each_entry(spoe_appctx, &agent->rt[i].applets, list)
+ spoe_wakeup_appctx(spoe_appctx->owner);
+ SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
}
}
p = p->next;
curagent->engine_id = NULL;
curagent->var_pfx = NULL;
curagent->var_on_error = NULL;
- curagent->flags = (SPOE_FL_PIPELINING | SPOE_FL_ASYNC | SPOE_FL_SND_FRAGMENTATION);
+ curagent->flags = (SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
+ if (global.nbthread == 1)
+ curagent->flags |= SPOE_FL_ASYNC;
curagent->cps_max = 0;
curagent->eps_max = 0;
curagent->max_frame_size = MAX_FRAME_SIZE;
LIST_INIT(&curagent->groups);
LIST_INIT(&curagent->messages);
- curagent->frame_size = curagent->max_frame_size;
- curagent->applets_act = 0;
- curagent->applets_idle = 0;
- curagent->sending_rate = 0;
-
- LIST_INIT(&curagent->applets);
- LIST_INIT(&curagent->sending_queue);
- LIST_INIT(&curagent->waiting_queue);
+ if ((curagent->rt = calloc(global.nbthread, sizeof(*curagent->rt))) == NULL) {
+ Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ for (i = 0; i < global.nbthread; ++i) {
+ curagent->rt[i].frame_size = curagent->max_frame_size;
+ curagent->rt[i].applets_act = 0;
+ curagent->rt[i].applets_idle = 0;
+ curagent->rt[i].sending_rate = 0;
+ LIST_INIT(&curagent->rt[i].applets);
+ LIST_INIT(&curagent->rt[i].sending_queue);
+ LIST_INIT(&curagent->rt[i].waiting_queue);
+ SPIN_INIT(&curagent->rt[i].lock);
+ }
}
else if (!strcmp(args[0], "use-backend")) {
if (!*args[1]) {
goto out;
if (kwm == 1)
curagent->flags &= ~SPOE_FL_ASYNC;
- else
- curagent->flags |= SPOE_FL_ASYNC;
+ else {
+ if (global.nbthread == 1)
+ curagent->flags |= SPOE_FL_ASYNC;
+ else {
+ Warning("parsing [%s:%d] Async option is not supported with threads.\n",
+ file, linenum);
+ err_code |= ERR_WARN;
+ }
+ }
goto out;
}
else if (!strcmp(args[1], "send-frag-payload")) {