]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: thread/spoe: Make the SPOE thread-safe
authorChristopher Faulet <cfaulet@haproxy.com>
Mon, 25 Sep 2017 12:48:02 +0000 (14:48 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:33 +0000 (13:58 +0100)
Because there is not migration mechanism yet, all runtime information about an
SPOE agent are thread-local and async exchanges with agents are disabled when we
have serveral threads. Howerver, pipelining is still available. So for now, the
thread part of the SPOE is pretty simple.

include/common/hathreads.h
include/types/spoe.h
src/flt_spoe.c

index 774fe7b674ddf6abcbfe209d1df653b897abd659..39d82208e8ce599e4d4b3efedfe80727056e1d01 100644 (file)
@@ -167,6 +167,7 @@ enum lock_label {
        COMP_POOL_LOCK,
        LUA_LOCK,
        NOTIF_LOCK,
+       SPOE_APPLET_LOCK,
        LOCK_LABELS
 };
 struct lock_stat {
@@ -255,7 +256,7 @@ static inline void show_lock_stats()
                                           "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
                                           "APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS",
                                           "PATREF", "PATEXP", "PATLRU", "VARS", "COMP_POOL", "LUA",
-                                          "NOTIF" };
+                                          "NOTIF", "SPOE_APPLET" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
index 108bc980ac875f26f398ee688ed9ee24e2cb47ef..aead2ba98bcc129121e2fb8795021b968f694501 100644 (file)
@@ -24,6 +24,7 @@
 
 #include <common/buffer.h>
 #include <common/mini-clist.h>
+#include <common/hathreads.h>
 
 #include <types/filters.h>
 #include <types/freq_ctr.h>
@@ -251,17 +252,23 @@ struct spoe_agent {
        struct list messages;                 /* list of all messages attached to this SPOE agent */
 
        /* running info */
-       unsigned int          frame_size;     /* current maximum frame size, only used to encode messages */
-       unsigned int          applets_act;    /* # of applets alive at a time */
-       unsigned int          applets_idle;   /* # of applets in the state SPOE_APPCTX_ST_IDLE */
-       unsigned int          sending_rate;   /* the global sending rate */
+       struct {
+               unsigned int    frame_size;     /* current maximum frame size, only used to encode messages */
+               unsigned int    applets_act;    /* # of applets alive at a time */
+               unsigned int    applets_idle;   /* # of applets in the state SPOE_APPCTX_ST_IDLE */
+               unsigned int    sending_rate;   /* the global sending rate */
+
+               struct freq_ctr conn_per_sec;   /* connections per second */
+               struct freq_ctr err_per_sec;    /* connetion errors per second */
 
-       struct freq_ctr       conn_per_sec;   /* connections per second */
-       struct freq_ctr       err_per_sec;    /* connetion errors per second */
+               struct list     applets;        /* List of available SPOE applets */
+               struct list     sending_queue;  /* Queue of streams waiting to send data */
+               struct list     waiting_queue;  /* Queue of streams waiting for a ack, in async mode */
 
-       struct list           applets;        /* List of available SPOE applets */
-       struct list           sending_queue;  /* Queue of streams waiting to send data */
-       struct list           waiting_queue;  /* Queue of streams waiting for a ack, in async mode */
+#ifdef USE_THREAD
+               HA_SPINLOCK_T   lock;
+#endif
+       } *rt;
 
 };
 
index 7fc4ed87f3370eeea757e53f237ce0c04d1dc1b4..938faabd25f7ede63fecbf5cd453c5a26787ebde 100644 (file)
@@ -153,6 +153,7 @@ spoe_release_agent(struct spoe_agent *agent)
 {
        struct spoe_message *msg, *msgback;
        struct spoe_group   *grp, *grpback;
+       int                  i;
 
        if (!agent)
                return;
@@ -169,6 +170,9 @@ spoe_release_agent(struct spoe_agent *agent)
                LIST_DEL(&grp->list);
                spoe_release_group(grp);
        }
+       for (i = 0; i < global.nbthread; ++i)
+               SPIN_DESTROY(&agent->rt[i].lock);
+       free(agent->rt);
        free(agent);
 }
 
@@ -974,7 +978,7 @@ spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
 
        /* Try to find the corresponding SPOE context */
        if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
-               list_for_each_entry((*ctx), &agent->waiting_queue, list) {
+               list_for_each_entry((*ctx), &agent->rt[tid].waiting_queue, list) {
                        if ((*ctx)->stream_id == (unsigned int)stream_id &&
                            (*ctx)->frame_id  == (unsigned int)frame_id)
                                goto found;
@@ -1234,7 +1238,7 @@ spoe_release_appctx(struct appctx *appctx)
                    __FUNCTION__, appctx);
 
        /* Remove applet from the list of running applets */
-       agent->applets_act--;
+       agent->rt[tid].applets_act--;
        if (!LIST_ISEMPTY(&spoe_appctx->list)) {
                LIST_DEL(&spoe_appctx->list);
                LIST_INIT(&spoe_appctx->list);
@@ -1243,7 +1247,7 @@ spoe_release_appctx(struct appctx *appctx)
        /* Shutdown the server connection, if needed */
        if (appctx->st0 != SPOE_APPCTX_ST_END) {
                if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
-                       agent->applets_idle--;
+                       agent->rt[tid].applets_idle--;
 
                appctx->st0 = SPOE_APPCTX_ST_END;
                if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
@@ -1284,18 +1288,18 @@ spoe_release_appctx(struct appctx *appctx)
                            &spoe_appctx->buffer_wait);
        pool_free2(pool2_spoe_appctx, spoe_appctx);
 
-       if (!LIST_ISEMPTY(&agent->applets))
+       if (!LIST_ISEMPTY(&agent->rt[tid].applets))
                goto end;
 
        /* If this was the last running applet, notify all waiting streams */
-       list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
+       list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
                LIST_DEL(&ctx->list);
                LIST_INIT(&ctx->list);
                ctx->state = SPOE_CTX_ST_ERROR;
                ctx->status_code = (spoe_appctx->status_code + 0x100);
                task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
        }
-       list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+       list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
                LIST_DEL(&ctx->list);
                LIST_INIT(&ctx->list);
                ctx->state = SPOE_CTX_ST_ERROR;
@@ -1305,10 +1309,9 @@ spoe_release_appctx(struct appctx *appctx)
 
   end:
        /* Update runtinme agent info */
-       agent->frame_size = agent->max_frame_size;
-       list_for_each_entry(spoe_appctx, &agent->applets, list)
-               agent->frame_size = MIN(spoe_appctx->max_frame_size,
-                                       agent->frame_size);
+       agent->rt[tid].frame_size = agent->max_frame_size;
+       list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list)
+               HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, spoe_appctx->max_frame_size);
 }
 
 static int
@@ -1421,14 +1424,15 @@ spoe_handle_connecting_appctx(struct appctx *appctx)
                default:
                        /* HELLO handshake is finished, set the idle timeout and
                         * add the applet in the list of running applets. */
-                       agent->applets_idle++;
+                       agent->rt[tid].applets_idle++;
                        appctx->st0 = SPOE_APPCTX_ST_IDLE;
+                       SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
                        LIST_DEL(&SPOE_APPCTX(appctx)->list);
-                       LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
+                       LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+                       SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 
                        /* Update runtinme agent info */
-                       agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size,
-                                               agent->frame_size);
+                       HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
                        goto next;
        }
 
@@ -1465,13 +1469,13 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
                ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
                                                SPOE_APPCTX(appctx)->max_frame_size);
        }
-       else if (LIST_ISEMPTY(&agent->sending_queue)) {
+       else if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
                *skip = 1;
                ret   = 1;
                goto end;
        }
        else {
-               ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+               ctx = LIST_NEXT(&agent->rt[tid].sending_queue, typeof(ctx), list);
                ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
                                                  SPOE_APPCTX(appctx)->max_frame_size);
 
@@ -1532,7 +1536,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
   no_frag_frame_sent:
        if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
                appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
-               LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+               LIST_ADDQ(&agent->rt[tid].waiting_queue, &ctx->list);
        }
        else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
                appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
@@ -1660,7 +1664,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
                        goto next;
 
                case 0: /* ignore */
-                       agent->sending_rate++;
+                       agent->rt[tid].sending_rate++;
                        fpa++;
                        break;
 
@@ -1668,7 +1672,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
                        break;
 
                default:
-                       agent->sending_rate++;
+                       agent->rt[tid].sending_rate++;
                        fpa++;
                        break;
        }
@@ -1703,11 +1707,13 @@ spoe_handle_processing_appctx(struct appctx *appctx)
   stop:
        if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
                appctx->st0 = SPOE_APPCTX_ST_IDLE;
-               agent->applets_idle++;
+               agent->rt[tid].applets_idle++;
        }
        if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
+               SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
                LIST_DEL(&SPOE_APPCTX(appctx)->list);
-               LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
+               LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+               SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
                if (fpa)
                        SPOE_APPCTX(appctx)->task->expire =
                                tick_add_ifset(now_ms, agent->timeout.idle);
@@ -1868,14 +1874,14 @@ spoe_handle_appctx(struct appctx *appctx)
 
                case SPOE_APPCTX_ST_IDLE:
                        if (stopping &&
-                           LIST_ISEMPTY(&agent->sending_queue) &&
+                           LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
                            LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
                                SPOE_APPCTX(appctx)->task->expire =
                                        tick_add_ifset(now_ms, agent->timeout.idle);
                                appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
                                goto switchstate;
                        }
-                       agent->applets_idle--;
+                       agent->rt[tid].applets_idle--;
                        appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
                        /* fall through */
 
@@ -1909,6 +1915,9 @@ spoe_handle_appctx(struct appctx *appctx)
                        return;
        }
   out:
+       if (stopping)
+               spoe_wakeup_appctx(appctx);
+
        if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
                task_queue(SPOE_APPCTX(appctx)->task);
        si_oc(si)->flags |= CF_READ_DONTWAIT;
@@ -1940,7 +1949,7 @@ spoe_create_appctx(struct spoe_config *conf)
        memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
 
        appctx->st0 = SPOE_APPCTX_ST_CONNECT;
-       if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL)
+       if ((SPOE_APPCTX(appctx)->task = task_new(1UL << tid)) == NULL)
                goto out_free_spoe_appctx;
 
        SPOE_APPCTX(appctx)->owner           = appctx;
@@ -1976,8 +1985,10 @@ spoe_create_appctx(struct spoe_config *conf)
        strm->do_log = NULL;
        strm->res.flags |= CF_READ_DONTWAIT;
 
-       LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
-       conf->agent->applets_act++;
+       SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
+       LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+       SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
+       conf->agent->rt[tid].applets_act++;
 
        task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
        task_wakeup(strm->task, TASK_WOKEN_INIT);
@@ -2008,9 +2019,9 @@ spoe_queue_context(struct spoe_context *ctx)
        min_applets = min_applets_act(agent);
 
        /* Check if we need to create a new SPOE applet or not. */
-       if (agent->applets_act >= min_applets &&
-           agent->applets_idle &&
-           agent->sending_rate)
+       if (agent->rt[tid].applets_act >= min_applets &&
+           agent->rt[tid].applets_idle &&
+           agent->rt[tid].sending_rate)
                goto end;
 
        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@@ -2031,7 +2042,7 @@ spoe_queue_context(struct spoe_context *ctx)
        /* Do not try to create a new applet if we have reached the maximum of
         * connection per seconds */
        if (agent->cps_max > 0) {
-               if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
+               if (!freq_ctr_remain(&agent->rt[tid].conn_per_sec, agent->cps_max, 0)) {
                        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
                                    " - cannot create SPOE appctx: max CPS reached\n",
                                    (int)now.tv_sec, (int)now.tv_usec, agent->id,
@@ -2052,41 +2063,43 @@ spoe_queue_context(struct spoe_context *ctx)
 
                goto end;
        }
-       if (agent->applets_act <= min_applets)
+       if (agent->rt[tid].applets_act <= min_applets)
                SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST;
 
        /* Increase the per-process number of cumulated connections */
        if (agent->cps_max > 0)
-               update_freq_ctr(&agent->conn_per_sec, 1);
+               update_freq_ctr(&agent->rt[tid].conn_per_sec, 1);
 
   end:
        /* The only reason to return an error is when there is no applet */
-       if (LIST_ISEMPTY(&agent->applets)) {
+       if (LIST_ISEMPTY(&agent->rt[tid].applets)) {
                ctx->status_code = SPOE_CTX_ERR_RES;
                return -1;
        }
 
        /* Add the SPOE context in the sending queue and update all running
         * info */
-       LIST_ADDQ(&agent->sending_queue, &ctx->list);
-       if (agent->sending_rate)
-               agent->sending_rate--;
+       LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
+       if (agent->rt[tid].sending_rate)
+               agent->rt[tid].sending_rate--;
 
        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
                    " - Add stream in sending queue"
                    " - applets_act=%u - applets_idle=%u - sending_rate=%u\n",
                    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
-                   ctx->strm, agent->applets_act, agent->applets_idle,
-                   agent->sending_rate);
+                   ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
+                   agent->rt[tid].sending_rate);
 
        /* Finally try to wakeup the first IDLE applet found and move it at the
         * end of the list. */
-       list_for_each_entry(spoe_appctx, &agent->applets, list) {
+       list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) {
                appctx = spoe_appctx->owner;
                if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
                        spoe_wakeup_appctx(appctx);
+                       SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
                        LIST_DEL(&spoe_appctx->list);
-                       LIST_ADDQ(&agent->applets, &spoe_appctx->list);
+                       LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list);
+                       SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
                        break;
                }
        }
@@ -2189,7 +2202,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
        char   *p, *end;
 
        p   = ctx->buffer->p;
-       end =  p + agent->frame_size - FRAME_HDR_SIZE;
+       end =  p + agent->rt[tid].frame_size - FRAME_HDR_SIZE;
 
        if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
                /* Resume encoding of a SPOE message */
@@ -2239,7 +2252,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
                    (int)now.tv_sec, (int)now.tv_usec,
                    agent->id, __FUNCTION__, s,
                    ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
-                   ctx->frag_ctx.spoe_appctx, (agent->frame_size - FRAME_HDR_SIZE),
+                   ctx->frag_ctx.spoe_appctx, (agent->rt[tid].frame_size - FRAME_HDR_SIZE),
                    p - ctx->buffer->p);
 
        ctx->buffer->i = p - ctx->buffer->p;
@@ -2263,7 +2276,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
                    (int)now.tv_sec, (int)now.tv_usec,
                    agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx,
                    ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
-                   (agent->frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
+                   (agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
 
        ctx->buffer->i = p - ctx->buffer->p;
        ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
@@ -2504,7 +2517,7 @@ spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
                             struct spoe_context *ctx, int dir)
 {
        if (agent->eps_max > 0)
-               update_freq_ctr(&agent->err_per_sec, 1);
+               update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
 
        if (agent->var_on_error) {
                struct sample smp;
@@ -2557,7 +2570,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
 
        if (ctx->state == SPOE_CTX_ST_READY) {
                if (agent->eps_max > 0) {
-                       if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
+                       if (!freq_ctr_remain(&agent->rt[tid].err_per_sec, agent->eps_max, 0)) {
                                SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
                                            " - skip processing of messages: max EPS reached\n",
                                            (int)now.tv_sec, (int)now.tv_usec,
@@ -2791,6 +2804,7 @@ spoe_sig_stop(struct sig_handler *sh)
                        struct spoe_config *conf;
                        struct spoe_agent  *agent;
                        struct spoe_appctx *spoe_appctx;
+                       int i;
 
                        if (fconf->id != spoe_filter_id)
                                continue;
@@ -2798,8 +2812,11 @@ spoe_sig_stop(struct sig_handler *sh)
                        conf  = fconf->conf;
                        agent = conf->agent;
 
-                       list_for_each_entry(spoe_appctx, &agent->applets, list) {
-                               spoe_wakeup_appctx(spoe_appctx->owner);
+                       for (i = 0; i < global.nbthread; ++i) {
+                               SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
+                               list_for_each_entry(spoe_appctx, &agent->rt[i].applets, list)
+                                       spoe_wakeup_appctx(spoe_appctx->owner);
+                               SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
                        }
                }
                p = p->next;
@@ -3177,7 +3194,9 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
                curagent->engine_id      = NULL;
                curagent->var_pfx        = NULL;
                curagent->var_on_error   = NULL;
-               curagent->flags          = (SPOE_FL_PIPELINING | SPOE_FL_ASYNC | SPOE_FL_SND_FRAGMENTATION);
+               curagent->flags          = (SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
+               if (global.nbthread == 1)
+                       curagent->flags |= SPOE_FL_ASYNC;
                curagent->cps_max        = 0;
                curagent->eps_max        = 0;
                curagent->max_frame_size = MAX_FRAME_SIZE;
@@ -3189,14 +3208,21 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
                LIST_INIT(&curagent->groups);
                LIST_INIT(&curagent->messages);
 
-               curagent->frame_size   = curagent->max_frame_size;
-               curagent->applets_act  = 0;
-               curagent->applets_idle = 0;
-               curagent->sending_rate = 0;
-
-               LIST_INIT(&curagent->applets);
-               LIST_INIT(&curagent->sending_queue);
-               LIST_INIT(&curagent->waiting_queue);
+               if ((curagent->rt = calloc(global.nbthread, sizeof(*curagent->rt))) == NULL) {
+                       Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+                       err_code |= ERR_ALERT | ERR_ABORT;
+                       goto out;
+               }
+               for (i = 0; i < global.nbthread; ++i) {
+                       curagent->rt[i].frame_size   = curagent->max_frame_size;
+                       curagent->rt[i].applets_act  = 0;
+                       curagent->rt[i].applets_idle = 0;
+                       curagent->rt[i].sending_rate = 0;
+                       LIST_INIT(&curagent->rt[i].applets);
+                       LIST_INIT(&curagent->rt[i].sending_queue);
+                       LIST_INIT(&curagent->rt[i].waiting_queue);
+                       SPIN_INIT(&curagent->rt[i].lock);
+               }
        }
        else if (!strcmp(args[0], "use-backend")) {
                if (!*args[1]) {
@@ -3320,8 +3346,15 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
                                goto out;
                        if (kwm == 1)
                                curagent->flags &= ~SPOE_FL_ASYNC;
-                       else
-                               curagent->flags |= SPOE_FL_ASYNC;
+                       else {
+                               if (global.nbthread == 1)
+                                       curagent->flags |= SPOE_FL_ASYNC;
+                               else {
+                                       Warning("parsing [%s:%d] Async option is not supported with threads.\n",
+                                               file, linenum);
+                                       err_code |= ERR_WARN;
+                               }
+                       }
                        goto out;
                }
                else if (!strcmp(args[1], "send-frag-payload")) {