]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: spoe: Add support of pipelined and asynchronous exchanges with agents
authorChristopher Faulet <cfaulet@haproxy.com>
Wed, 21 Dec 2016 07:58:06 +0000 (08:58 +0100)
committerWilly Tarreau <w@1wt.eu>
Thu, 9 Mar 2017 14:32:55 +0000 (15:32 +0100)
Now, HAProxy and agents can announce the support for "pipelining" and/or "async"
capabilities during the HELLO handshake. For now, HAProxy always announces the
support of both. In addition, in its HELLO frames. HAproxy adds the "engine-id"
key. It is a uniq string that identify a SPOE engine.

The "pipelining" capability is the ability for a peer to decouple NOTIFY and ACK
frames. This is a symmectical capability. To be used, it must be supported by
HAproxy and agents. Unlike HTTP pipelining, the ACK frames can be send in any
order, but always on the same TCP connection used for the corresponding NOTIFY
frame.

The "async" capability is similar to the pipelining, but here any TCP connection
established between HAProxy and the agent can be used to send ACK frames. if an
agent accepts connections from multiple HAProxy, it can use the "engine-id"
value to group TCP connections.

doc/SPOE.txt
include/types/applet.h
src/flt_spoe.c

index 6d2c4ae4c6095d7b1e081b5b4d183ae786e6fcbd..d716c6b6b6419083bbb530ee1632099a1e0d36a6 100644 (file)
@@ -505,8 +505,23 @@ equal to 256 bytes.
 
 Here are the list of official capabilities that HAProxy and agents can support:
 
-  * fragmentation: This is the abaility for a peer to support fragmented
-                   payload in received frames.
+  * fragmentation: This is the ability for a peer to support fragmented
+                   payload in received frames. This is an asymmectical
+                   capability, it only concerns the peer that announces
+                   it. This is the responsibility to the other peer to use it
+                   or not.
+
+  * pipelining: This is the ability for a peer to decouple NOTIFY and ACK
+                frames. This is a symmectical capability. To be used, it must
+                be supported by HAproxy and agents. Unlike HTTP pipelining, the
+                ACK frames can be send in any order, but always on the same TCP
+                connection used for the corresponding NOTIFY frame.
+
+  * async: This ability is similar to the pipelining, but here any TCP
+           connection established between HAProxy and the agent can be used to
+           send ACK frames. if an agent accepts connections from multiple
+           HAProxy, it can use the "engine-id" value to group TCP
+           connections. See details about HAPROXY-HELLO frame.
 
 Unsupported or unknown capabilities are silently ignored, when possible.
 
@@ -653,6 +668,10 @@ Following optional items can be added in the KV-LIST:
     If this item is set to TRUE, then the HAPROXY-HELLO frame is sent during a
     SPOE health check. When set to FALSE, this item can be ignored.
 
+  * "engine-id"    <STRING>
+
+    This is a uniq string that identify a SPOE engine.
+
 To finish the HELLO handshake, the agent must return an AGENT-HELLO frame with
 its supported SPOP version, the lower value between its maximum size allowed
 for a frame and the HAProxy one and capabilities it supports. If an error
@@ -834,7 +853,7 @@ Here is the list of supported actions:
     SESSION     : <1>
     TRANSACTION : <2>
     REQUEST     : <3>
-    RESERVED    : <4>
+    RESPONSE    : <4>
 
   * unset-var    unset the value for an existing variable. 2 arguments must be
                  attached to this action: the variable scope (proc, sess, txn,
@@ -851,7 +870,7 @@ Here is the list of supported actions:
     SESSION     : <1>
     TRANSACTION : <2>
     REQUEST     : <3>
-    RESERVED    : <4>
+    RESPONSE    : <4>
 
 
 NOTE: Name of the variables will be automatically prefixed by HAProxy to avoid
index 642c7931b98433e0e3a62bdb5d5f62110aff3cac..851948b4a37e2664d263d44f2919676532a58654 100644 (file)
@@ -85,10 +85,11 @@ struct appctx {
                } hlua_apphttp;                 /* used by the Lua HTTP services */
                struct {
                        struct task *task;
-                       void        *ctx;
                        void        *agent;
                        unsigned int version;
                        unsigned int max_frame_size;
+                       unsigned int flags;
+                       struct list  waiting_queue;
                        struct list  list;
                } spoe;                         /* used by SPOE filter */
                struct {
index f5918dc5f029eca0c574754dac240976c00adfcd..17290d2cc4a5674edc577fa5388bccf09c11aaeb 100644 (file)
 
 #define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
 
+/* Flags set on the SPOE applet */
+#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
+#define SPOE_APPCTX_FL_ASYNC      0x00000002 /* Set if asynchronus frames is supported */
+#define SPOE_APPCTX_FL_PERSIST    0x00000004 /* Set if the applet is persistent */
+
 #define SPOE_APPCTX_ERR_NONE    0x00000000 /* no error yet, leave it to zero */
 #define SPOE_APPCTX_ERR_TOUT    0x00000001 /* SPOE applet timeout */
 
@@ -83,6 +88,7 @@ enum spoe_ctx_state {
 enum spoe_appctx_state {
        SPOE_APPCTX_ST_CONNECT = 0,
        SPOE_APPCTX_ST_CONNECTING,
+       SPOE_APPCTX_ST_IDLE,
        SPOE_APPCTX_ST_PROCESSING,
        SPOE_APPCTX_ST_DISCONNECT,
        SPOE_APPCTX_ST_DISCONNECTING,
@@ -162,15 +168,15 @@ struct spoe_msg_placeholder {
 /* Describe a message that will be sent in a NOTIFY frame. A message has a name,
  * an argument list (see above) and it is linked to a specific event. */
 struct spoe_message {
-       char             *id;      /* SPOE message id */
-       unsigned int      id_len;  /* The message id length */
+       char              *id;      /* SPOE message id */
+       unsigned int       id_len;  /* The message id length */
        struct spoe_agent *agent;   /* SPOE agent owning this SPOE message */
         struct {
-                char     *file;    /* file where the SPOE message appears */
-                int       line;    /* line where the SPOE message appears */
-        } conf;                    /* config information */
-       struct list       args;    /* Arguments added when the SPOE messages is sent */
-       struct list       list;    /* Used to chain SPOE messages */
+                char      *file;    /* file where the SPOE message appears */
+                int        line;    /* line where the SPOE message appears */
+        } conf;                     /* config information */
+       struct list        args;    /* Arguments added when the SPOE messages is sent */
+       struct list        list;    /* Used to chain SPOE messages */
 
        enum spoe_event    event;   /* SPOE_EV_* */
 };
@@ -192,21 +198,32 @@ struct spoe_agent {
                unsigned int  processing;     /* Max time to process an event (in the main stream) */
        } timeout;
 
+       /* Config info */
+       char                 *engine_id;      /* engine-id string */
        char                 *var_pfx;        /* Prefix used for vars set by the agent */
        char                 *var_on_error;   /* Variable to set when an error occured, in the TXN scope */
        unsigned int          flags;          /* SPOE_FL_* */
-       unsigned int          cps_max;        /* Maximum number of connections per second */
-       unsigned int          eps_max;        /* Maximum number of errors per second */
-
-       struct list           cache;          /* List used to cache SPOE streams. In
-                                              * fact, we cache the SPOE applect ctx */
+       unsigned int          cps_max;        /* Maximum # of connections per second */
+       unsigned int          eps_max;        /* Maximum # of errors per second */
+       unsigned int          max_frame_size; /* Maximum frame size for this agent, before any negotiation */
+       unsigned int          min_applets;    /* Minimum # applets alive at a time */
+       unsigned int          max_fpa;        /* Maximum # of frames handled per applet at once */
 
        struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
                                               * for each supported events */
 
-       struct list        applet_wq;         /* List of streams waiting for a SPOE applet */
-       struct freq_ctr    conn_per_sec;      /* connections per second */
-       struct freq_ctr    err_per_sec;       /* connetion errors per second */
+       /* running info */
+       unsigned int          applets_act;    /* # of applets alive at a time */
+       unsigned int          applets_idle;   /* # of applets in the state SPOE_APPCTX_ST_IDLE */
+       unsigned int          sending_rate;   /* the global sending rate */
+
+       struct freq_ctr       conn_per_sec;   /* connections per second */
+       struct freq_ctr       err_per_sec;    /* connetion errors per second */
+
+       struct list           applets;        /* List of available SPOE applets */
+       struct list           sending_queue;  /* Queue of streams waiting to send data */
+       struct list           waiting_queue;  /* Queue of streams waiting for a ack, in async mode */
+
 };
 
 /* SPOE filter configuration */
@@ -221,11 +238,11 @@ struct spoe_config {
 struct spoe_context {
        struct filter      *filter;       /* The SPOE filter */
        struct stream      *strm;         /* The stream that should be offloaded */
-       struct appctx      *appctx;       /* The SPOE appctx */
+
        struct list        *messages;     /* List of messages that will be sent during the stream processing */
        struct buffer      *buffer;       /* Buffer used to store a NOTIFY or ACK frame */
        struct buffer_wait  buffer_wait;  /* position in the list of streams waiting for a buffer */
-       struct list         applet_wait;  /* position in the list of streams waiting for a SPOE applet */
+       struct list         list;
 
        enum spoe_ctx_state state;        /* SPOE_CTX_ST_* */
        unsigned int        flags;        /* SPOE_CTX_FL_* */
@@ -266,9 +283,9 @@ char spoe_reason[256];
 
 struct flt_ops spoe_ops;
 
-static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
-static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
-static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
+static int  queue_spoe_context(struct spoe_context *ctx);
+static int  acquire_spoe_buffer(struct spoe_context *ctx);
+static void release_spoe_buffer(struct spoe_context *ctx);
 
 /********************************************************************
  * helper functions/globals
@@ -312,6 +329,7 @@ release_spoe_agent(struct spoe_agent *agent)
        free(agent->id);
        free(agent->conf.file);
        free(agent->var_pfx);
+       free(agent->engine_id);
        free(agent->var_on_error);
        for (i = 0; i < SPOE_EV_EVENTS; ++i) {
                list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
@@ -363,6 +381,7 @@ static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
 static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
        [SPOE_APPCTX_ST_CONNECT]       = "CONNECT",
        [SPOE_APPCTX_ST_CONNECTING]    = "CONNECTING",
+       [SPOE_APPCTX_ST_IDLE]          = "IDLE",
        [SPOE_APPCTX_ST_PROCESSING]    = "PROCESSING",
        [SPOE_APPCTX_ST_DISCONNECT]    = "DISCONNECT",
        [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
@@ -371,6 +390,49 @@ static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
 };
 
 #endif
+
+static char *
+generate_pseudo_uuid()
+{
+       static int init = 0;
+
+       const char uuid_fmt[] = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx";
+       const char uuid_chr[] = "0123456789ABCDEF-";
+       char *uuid;
+       int i;
+
+       if ((uuid = calloc(1, sizeof(uuid_fmt))) == NULL)
+               return NULL;
+
+       if (!init) {
+               srand(now_ms);
+               init = 1;
+       }
+
+       for (i = 0; i < sizeof(uuid_fmt)-1; i++) {
+               int r = rand () % 16;
+
+               switch (uuid_fmt[i]) {
+                       case 'x' : uuid[i] = uuid_chr[r]; break;
+                       case 'y' : uuid[i] = uuid_chr[(r & 0x03) | 0x08]; break;
+                       default  : uuid[i] = uuid_fmt[i]; break;
+               }
+       }
+       return uuid;
+}
+
+static inline unsigned int
+min_applets_act(struct spoe_agent *agent)
+{
+       unsigned int nbsrv;
+
+       if (agent->min_applets)
+               return agent->min_applets;
+
+       nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck);
+       return 2*nbsrv;
+}
+
 /********************************************************************
  * Functions that encode/decode SPOE frames
  ********************************************************************/
@@ -418,6 +480,7 @@ enum spoe_data_type {
 #define VERSION_KEY                "version"
 #define MAX_FRAME_SIZE_KEY         "max-frame-size"
 #define CAPABILITIES_KEY           "capabilities"
+#define ENGINE_ID_KEY              "engine-id"
 #define HEALTHCHECK_KEY            "healthcheck"
 #define STATUS_CODE_KEY            "status-code"
 #define MSG_KEY                    "message"
@@ -438,7 +501,8 @@ static struct spoe_version supported_versions[] = {
 #define SUPPORTED_VERSIONS_VAL  "1.0"
 
 /* Comma-separated list of supported capabilities (none for now) */
-#define CAPABILITIES_VAL ""
+//#define CAPABILITIES_VAL ""
+#define CAPABILITIES_VAL "pipelining,async"
 
 static int
 decode_spoe_version(const char *str, size_t len)
@@ -707,11 +771,13 @@ skip_spoe_action(char *frame, char *end)
 static int
 prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
 {
+       struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
        int      idx = 0;
        size_t   max = (7   /* TYPE + METADATA */
                        + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
                        + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
-                       + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
+                       + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
+                       + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
 
        if (size < max)
                return -1;
@@ -745,6 +811,13 @@ prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
        frame[idx++] = SPOE_DATA_T_STR;
        idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
 
+       /* "engine-id" K/V item */
+       if (agent != NULL && agent->engine_id != NULL) {
+               idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx);
+               frame[idx++] = SPOE_DATA_T_STR;
+               idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx);
+       }
+
        return idx;
 }
 
@@ -798,10 +871,10 @@ prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
 /* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
  * success, 0 if the frame can be ignored and -1 if an error occurred. */
 static int
-prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
+prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
+                           char *frame, size_t size)
 {
-       struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
-       int                  idx = 0;
+       int idx = 0;
 
        if (size < APPCTX_SPOE(appctx).max_frame_size)
                return -1;
@@ -816,6 +889,10 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
        idx += encode_spoe_varint(ctx->stream_id, frame+idx);
        idx += encode_spoe_varint(ctx->frame_id, frame+idx);
 
+       /* Copy encoded messages */
+       if (idx + ctx->buffer->i > size)
+               return 0;
+
        /* Copy encoded messages */
        memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
        idx += ctx->buffer->i;
@@ -828,7 +905,7 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
 static int
 handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
 {
-       int    vsn, max_frame_size;
+       int    vsn, max_frame_size, flags;
        int    i, idx = 0;
        size_t min_size = (7   /* TYPE + METADATA */
                           + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
@@ -858,7 +935,7 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
         * "capabilities" */
 
        /* Loop on K/V items */
-       vsn = max_frame_size = 0;
+       vsn = max_frame_size = flags = 0;
        while (idx < size) {
                char     *str;
                uint64_t  sz;
@@ -921,7 +998,42 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
                        }
                        max_frame_size = sz;
                }
-               /* Skip "capabilities" K/V item for now */
+               /* Check "capabilities" K/V item */
+               else if (!memcmp(str, CAPABILITIES_KEY, sz)) {
+                       int i;
+
+                       /* The value must be a string */
+                       if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+                               spoe_status_code = SPOE_FRM_ERR_INVALID;
+                               return -1;
+                       }
+                       idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
+                       if (str == NULL)
+                               continue;
+
+                       i = 0;
+                       while (i < sz) {
+                               char *delim;
+
+                               /* Skip leading spaces */
+                               for (; isspace(str[i]) && i < sz; i++);
+
+                               if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) {
+                                       i += 10;
+                                       if (sz == i || isspace(str[i]) || str[i] == ',')
+                                               flags |= SPOE_APPCTX_FL_PIPELINING;
+                               }
+                               else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) {
+                                       i += 5;
+                                       if (sz == i || isspace(str[i]) || str[i] == ',')
+                                               flags |= SPOE_APPCTX_FL_ASYNC;
+                               }
+
+                               if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
+                                       break;
+                               i = (delim - str) + 1;
+                       }
+               }
                else {
                        /* Silently ignore unknown item */
                        if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
@@ -944,6 +1056,7 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
 
        APPCTX_SPOE(appctx).version        = (unsigned int)vsn;
        APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
+       APPCTX_SPOE(appctx).flags         |= flags;
        return idx;
 }
 
@@ -1041,14 +1154,15 @@ handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
 }
 
 
-/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
+/* Decode ACK frame sent by an agent. It returns the number of read bytes on
  * success, 0 if the frame can be ignored and -1 if an error occurred. */
 static int
 handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
 {
-       struct spoe_context  *ctx = APPCTX_SPOE(appctx).ctx;
+       struct spoe_agent    *agent = APPCTX_SPOE(appctx).agent;
+       struct spoe_context  *ctx, *back;
        uint64_t              stream_id, frame_id;
-       int                   idx = 0;
+       int                   i, idx = 0;
        size_t                min_size = (7  /* TYPE + METADATA */);
 
        /* Check frame type */
@@ -1064,19 +1178,45 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
        idx += 4;
 
        /* Get the stream-id and the frame-id */
-       idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
-       idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
-
-       /* Check stream-id and frame-id */
-       if (ctx->stream_id != (unsigned int)stream_id ||
-           ctx->frame_id  != (unsigned int)frame_id)
+       if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1)
+               return 0;
+       idx += i;
+       if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1)
                return 0;
+       idx += i;
+
+       if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) {
+               list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+                       if (ctx->stream_id == (unsigned int)stream_id &&
+                           ctx->frame_id  == (unsigned int)frame_id)
+                               goto found;
+               }
+       }
+       else {
+               list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
+                       if (ctx->stream_id == (unsigned int)stream_id &&
+                           ctx->frame_id  == (unsigned int)frame_id)
+                               goto found;
+               }
+       }
+
+       /* No Stream found, ignore the frame */
+       return 0;
+
+  found:
+       if (acquire_spoe_buffer(ctx) <= 0)
+               return 1; /* Retry later */
 
        /* Copy encoded actions */
-       b_reset(ctx->buffer);
        memcpy(ctx->buffer->p, frame+idx, size-idx);
        ctx->buffer->i = size-idx;
 
+       /* Notify the stream */
+       LIST_DEL(&ctx->list);
+       LIST_INIT(&ctx->list);
+       ctx->state = SPOE_CTX_ST_DONE;
+       task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+
        return idx;
 }
 
@@ -1093,7 +1233,7 @@ prepare_spoe_healthcheck_request(char **req, int *len)
 
        memset(&a, 0, sizeof(a));
        memset(buf, 0, sizeof(buf));
-       APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
+       APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
 
        frame = buf+4;
        idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
@@ -1126,7 +1266,7 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen
        int           r;
 
        memset(&a, 0, sizeof(a));
-       APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
+       APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
 
        if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
                goto error;
@@ -1145,6 +1285,62 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen
        return -1;
 }
 
+/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when
+ * the frame can be ignored, 1 to retry later, and the frame legnth on
+ * success. */
+static int
+send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
+{
+       struct stream_interface *si = appctx->owner;
+       int                      ret;
+       uint32_t                 netint;
+
+       if (si_ic(si)->buf == &buf_empty)
+               return 1;
+
+       netint = htonl(framesz);
+       memcpy(buf, (char *)&netint, 4);
+       ret = bi_putblk(si_ic(si), buf, framesz+4);
+
+       if (ret <= 0) {
+               if (ret == -1)
+                       return 1; /* retry */
+               return -1; /* error */
+       }
+       return framesz;
+}
+
+/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
+ * when the frame can be ignored, 1 to retry later and the frame length on
+ * success. */
+static int
+recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
+{
+       struct stream_interface *si = appctx->owner;
+       int                      ret;
+       uint32_t                 netint;
+
+       if (si_oc(si)->buf == &buf_empty)
+               return 1;
+
+       ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
+       if (ret > 0) {
+               framesz = ntohl(netint);
+               if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
+                       spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
+                       return -1;
+               }
+               ret = bo_getblk(si_oc(si), trash.str, framesz, 4);
+       }
+       if (ret <= 0) {
+               if (ret == 0)
+                       return 1; /* retry */
+               spoe_status_code = SPOE_FRM_ERR_IO;
+               return -1; /* error */
+       }
+       return framesz;
+}
+
 /********************************************************************
  * Functions that manage the SPOE applet
  ********************************************************************/
@@ -1161,29 +1357,11 @@ process_spoe_applet(struct task * task)
                appctx->st1 = SPOE_APPCTX_ERR_TOUT;
        }
        si_applet_want_get(appctx->owner);
+       si_applet_want_put(appctx->owner);
        appctx_wakeup(appctx);
        return task;
 }
 
-/* Remove a SPOE applet from the agent cache */
-static void
-remove_spoe_applet_from_cache(struct appctx *appctx)
-{
-       struct appctx     *a, *back;
-       struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
-
-       if (LIST_ISEMPTY(&agent->cache))
-               return;
-
-       list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
-               if (a == appctx) {
-                       LIST_DEL(&APPCTX_SPOE(appctx).list);
-                       break;
-               }
-       }
-}
-
-
 /* Callback function that releases a SPOE applet. This happens when the
  * connection with the agent is closed. */
 static void
@@ -1191,143 +1369,426 @@ release_spoe_applet(struct appctx *appctx)
 {
        struct stream_interface *si    = appctx->owner;
        struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
-       struct spoe_context     *ctx   = APPCTX_SPOE(appctx).ctx;
+       struct spoe_context     *ctx, *back;
+
+       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
+                   (int)now.tv_sec, (int)now.tv_usec, agent->id,
+                   __FUNCTION__, appctx);
 
-       if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
-           appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
-               on_new_spoe_appctx_failure(agent);
+       agent->applets_act--;
+       if (!LIST_ISEMPTY(&APPCTX_SPOE(appctx).list)) {
+               LIST_DEL(&APPCTX_SPOE(appctx).list);
+               LIST_INIT(&APPCTX_SPOE(appctx).list);
+       }
 
        if (appctx->st0 != SPOE_APPCTX_ST_END) {
+               if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
+                       agent->applets_idle--;
+
                si_shutw(si);
                si_shutr(si);
                si_ic(si)->flags |= CF_READ_NULL;
                appctx->st0 = SPOE_APPCTX_ST_END;
        }
 
-       if (ctx != NULL) {
+       if (APPCTX_SPOE(appctx).task) {
+               task_delete(APPCTX_SPOE(appctx).task);
+               task_free(APPCTX_SPOE(appctx).task);
+       }
+
+       list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
+               LIST_DEL(&ctx->list);
+               LIST_INIT(&ctx->list);
+               ctx->state = SPOE_CTX_ST_ERROR;
                task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-               ctx->appctx = NULL;
        }
 
-       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
-                   (int)now.tv_sec, (int)now.tv_usec, agent->id,
-                   __FUNCTION__, appctx);
+       if (!LIST_ISEMPTY(&agent->applets))
+               return;
 
-       /* Release the task attached to the SPOE applet */
-       if (APPCTX_SPOE(appctx).task) {
-               task_delete(APPCTX_SPOE(appctx).task);
-               task_free(APPCTX_SPOE(appctx).task);
+       list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
+               LIST_DEL(&ctx->list);
+               LIST_INIT(&ctx->list);
+               ctx->state = SPOE_CTX_ST_ERROR;
+               task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
        }
 
-       /* And remove it from the agent cache */
-       remove_spoe_applet_from_cache(appctx);
-       APPCTX_SPOE(appctx).ctx = NULL;
+       list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+               LIST_DEL(&ctx->list);
+               LIST_INIT(&ctx->list);
+               ctx->state = SPOE_CTX_ST_ERROR;
+               task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+       }
 }
 
-/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
- * the frame can be ignored, 0 to retry later and 1 on success. The frame is
- * encoded using the callback function <prepare>. */
 static int
-send_spoe_frame(struct appctx *appctx,
-               int (*prepare)(struct appctx *, char *, size_t))
+handle_connect_spoe_applet(struct appctx *appctx)
 {
-       struct stream_interface *si  = appctx->owner;
-       int                      framesz, ret;
-       uint32_t                 netint;
+       struct stream_interface *si    = appctx->owner;
+       struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
+       char *frame = trash.str;
+       int   ret;
 
-       if (si_ic(si)->buf->size == 0)
-               return -1;
+       if (si->state <= SI_ST_CON) {
+               si_applet_want_put(si);
+               task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG);
+               goto stop;
+       }
+       if (si->state != SI_ST_EST)
+               goto exit;
 
-       ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
-       if (ret <= 0)
-               goto skip_or_error;
-       framesz = ret;
-       netint  = htonl(framesz);
-       ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
-       if (ret > 0)
-               ret = bi_putblk(si_ic(si), trash.str, framesz);
-       if (ret <= 0) {
-               if (ret == -1)
-                       return -1;
-               return -2;
+       if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
+                           (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
+               goto exit;
+       }
+
+       if (APPCTX_SPOE(appctx).task->expire == TICK_ETERNITY)
+               APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
+
+       ret = prepare_spoe_hahello_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+       if (ret > 1)
+               ret = send_spoe_frame(appctx, frame, ret);
+
+       switch (ret) {
+               case -1: /* error */
+                       goto exit;
+
+               case  0: /* ignore => an error, cannot be ignored */
+                       goto exit;
+
+               case  1: /* retry later */
+                       si_applet_cant_put(si);
+                       goto stop;
+
+               default: /* CONNECT frame successfully sent */
+                       appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
+                       goto next;
        }
+
+  next:
+       return 0;
+  stop:
        return 1;
+  exit:
+       appctx->st0 = SPOE_APPCTX_ST_EXIT;
+       return 0;
+}
 
- skip_or_error:
-       if (!ret)
-               return -1;
-       return -2;
+static int
+handle_connecting_spoe_applet(struct appctx *appctx)
+{
+       struct stream_interface *si     = appctx->owner;
+       struct spoe_agent       *agent  = APPCTX_SPOE(appctx).agent;
+       char *frame = trash.str;
+       int   ret, framesz = 0;
+
+
+       if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+               goto exit;
+
+       if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
+                           (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
+               goto exit;
+       }
+
+       ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+       if (ret > 1) {
+               if (*frame == SPOE_FRM_T_AGENT_DISCON) {
+                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+                       goto next;
+               }
+               framesz = ret;
+               ret = handle_spoe_agenthello_frame(appctx, frame, framesz);
+       }
+
+       switch (ret) {
+               case -1: /* error */
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
+                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+                       goto next;
+
+               case 0: /* ignore */
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
+                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+                       goto next;
+
+               case 1: /* retry later */
+                       goto stop;
+
+               default:
+                       /* hello handshake is finished, set the idle timeout,
+                        * Add the appctx in the agent cache, decrease the
+                        * number of new applets and wake up waiting streams. */
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
+                       agent->applets_idle++;
+                       appctx->st0 = SPOE_APPCTX_ST_IDLE;
+                       goto next;
+       }
+
+  next:
+       APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+       return 0;
+  stop:
+       return 1;
+  exit:
+       appctx->st0 = SPOE_APPCTX_ST_EXIT;
+       return 0;
 }
 
-/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
- * when the frame can be ignored, 0 to retry later and 1 on success. The frame
- * is decoded using the callback function <handle>. */
 static int
-recv_spoe_frame(struct appctx *appctx,
-               int (*handle)(struct appctx *, char *, size_t))
+handle_processing_spoe_applet(struct appctx *appctx)
 {
-       struct stream_interface *si  = appctx->owner;
-       int                      framesz, ret;
-       uint32_t                 netint;
+       struct stream_interface *si    = appctx->owner;
+       struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
+       struct spoe_context     *ctx;
+       char         *frame = trash.str;
+       unsigned int  fpa = 0;
+       int           ret, framesz = 0, skip_sending = 0, skip_receiving = 0;
+
+       if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+               goto exit;
+
+       if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+               spoe_status_code = SPOE_FRM_ERR_TOUT;
+               appctx->st0      = SPOE_APPCTX_ST_DISCONNECT;
+               appctx->st1      = SPOE_APPCTX_ERR_NONE;
+               goto next;
+       }
+
+  process:
+       if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
+               goto stop;
+
+       /* Frames must be handled synchronously and a the applet is waiting for
+        * a ACK frame */
+       if (!(APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
+           !LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+               if (skip_receiving)
+                       goto stop;
+               goto recv_frame;
+       }
+
+       if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) {
+               skip_sending = 1;
+               goto recv_frame;
+       }
+
+       ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+       ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+       if (ret > 1)
+               ret = send_spoe_frame(appctx, frame, ret);
+
+       switch (ret) {
+               case -1: /* error */
+                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+                       goto next;
+
+               case 0: /* ignore */
+                       agent->sending_rate++;
+                       ctx->state = SPOE_CTX_ST_ERROR;
+                       release_spoe_buffer(ctx);
+                       task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+                       LIST_DEL(&ctx->list);
+                       LIST_INIT(&ctx->list);
+                       fpa++;
+                       break;
 
-       ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
-       if (ret <= 0)
-               goto empty_or_error;
-       framesz = ntohl(netint);
-       if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
-               spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
-               return -2;
+               case 1: /* retry */
+                       si_applet_cant_put(si);
+                       skip_sending = 1;
+                       break;
+
+               default:
+                       agent->sending_rate++;
+                       ctx->state = SPOE_CTX_ST_WAITING_ACK;
+                       release_spoe_buffer(ctx);
+                       LIST_DEL(&ctx->list);
+                       LIST_INIT(&ctx->list);
+                       if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC)
+                               LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+                       else
+                               LIST_ADDQ(&APPCTX_SPOE(appctx).waiting_queue, &ctx->list);
+                       fpa++;
+       }
+
+       if (fpa > agent->max_fpa)
+               goto stop;
+
+  recv_frame:
+       if (skip_receiving)
+               goto process;
+
+       framesz = 0;
+       ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+       if (ret > 1) {
+               if (*frame == SPOE_FRM_T_AGENT_DISCON) {
+                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+                       goto next;
+               }
+               framesz = ret;
+               ret = handle_spoe_agentack_frame(appctx, frame, framesz);
        }
 
-       ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
-       if (ret <= 0)
-               goto empty_or_error;
-       bo_skip(si_oc(si), ret+sizeof(netint));
+       switch (ret) {
+               case -1: /* error */
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
+                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+                       goto next;
+
+               case 0: /* ignore */
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
+                       fpa++;
+                       break;
+
+               case 1: /* retry */
+                       skip_receiving = 1;
+                       break;
+
+               default:
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
+                       fpa++;
+       }
+       goto process;
+
+  next:
+       APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+       return 0;
+  stop:
+       if ((APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
+           LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+               agent->applets_idle++;
+               appctx->st0 = SPOE_APPCTX_ST_IDLE;
+       }
+       if (fpa || (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_PERSIST)) {
+               LIST_DEL(&APPCTX_SPOE(appctx).list);
+               LIST_ADD(&agent->applets, &APPCTX_SPOE(appctx).list);
+               if (fpa)
+                       APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+       }
+       return 1;
 
-       /* First check if the received frame is a DISCONNECT frame */
-       ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
-       if (ret != 0) {
-               if (ret > 0) {
+  exit:
+       appctx->st0 = SPOE_APPCTX_ST_EXIT;
+       return 0;
+}
+
+static int
+handle_disconnect_spoe_applet(struct appctx *appctx)
+{
+       struct stream_interface *si    = appctx->owner;
+       struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
+       char *frame = trash.str;
+       int ret;
+
+       if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+               goto exit;
+
+       if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
+               goto exit;
+
+       ret = prepare_spoe_hadiscon_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+       if (ret > 1)
+               ret = send_spoe_frame(appctx, frame, ret);
+
+       switch (ret) {
+               case -1: /* error */
+                       goto exit;
+
+               case  0: /* ignore */
+                       goto exit;
+
+               case 1: /* retry */
+                       si_applet_cant_put(si);
+                       goto stop;
+
+               default:
+                       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+                                   " - disconnected by HAProxy (%d): %s\n",
+                                   (int)now.tv_sec, (int)now.tv_usec, agent->id,
+                                   __FUNCTION__, appctx, spoe_status_code,
+                                   spoe_frm_err_reasons[spoe_status_code]);
+
+                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+                       goto next;
+       }
+
+  next:
+       APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+       return 0;
+  stop:
+       return 1;
+  exit:
+       appctx->st0 = SPOE_APPCTX_ST_EXIT;
+       return 0;
+}
+
+static int
+handle_disconnecting_spoe_applet(struct appctx *appctx)
+{
+       struct stream_interface *si = appctx->owner;
+       char *frame = trash.str;
+       int   ret, framesz = 0;
+
+       if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+               goto exit;
+
+       if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
+               goto exit;
+
+       framesz = 0;
+       ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+       if (ret > 1) {
+               framesz = ret;
+               ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
+       }
+
+       switch (ret) {
+               case -1: /* error  */
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
+                       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+                                   " - error on frame (%s)\n",
+                                   (int)now.tv_sec, (int)now.tv_usec,
+                                   ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+                                   __FUNCTION__, appctx,
+                                   spoe_frm_err_reasons[spoe_status_code]);
+                       goto exit;
+
+               case  0: /* ignore */
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
+                       goto next;
+
+               case  1: /* retry */
+                       goto stop;
+
+               default:
+                       if (framesz)
+                               bo_skip(si_oc(si), framesz+4);
                        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
                                    " - disconnected by peer (%d): %s\n",
                                    (int)now.tv_sec, (int)now.tv_usec,
                                    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
                                    __FUNCTION__, appctx, spoe_status_code,
                                    spoe_reason);
-                       return 2;
-               }
-               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-                           " - error on frame (%s)\n",
-                           (int)now.tv_sec, (int)now.tv_usec,
-                           ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-                           __FUNCTION__, appctx,
-                           spoe_frm_err_reasons[spoe_status_code]);
-               return -2;
+                       goto exit;
        }
-       if (handle == NULL)
-               goto out;
 
-       /* If not, try to decode it */
-       ret = handle(appctx, trash.str, framesz);
-       if (ret <= 0) {
-               if (!ret)
-                       return -1;
-               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-                           " - error on frame (%s)\n",
-                           (int)now.tv_sec, (int)now.tv_usec,
-                           ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-                           __FUNCTION__, appctx,
-                           spoe_frm_err_reasons[spoe_status_code]);
-               return -2;
-       }
-  out:
+  next:
+       return 0;
+  stop:
        return 1;
-
-  empty_or_error:
-       if (!ret)
-               return 0;
-       spoe_status_code = SPOE_FRM_ERR_IO;
-       return -2;
+  exit:
+       appctx->st0 = SPOE_APPCTX_ST_EXIT;
+       return 0;
 }
 
 /* I/O Handler processing messages exchanged with the agent */
@@ -1335,12 +1796,9 @@ static void
 handle_spoe_applet(struct appctx *appctx)
 {
        struct stream_interface *si    = appctx->owner;
-       struct stream           *s     = si_strm(si);
        struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
-       struct spoe_context     *ctx   = APPCTX_SPOE(appctx).ctx;
-       int                      ret;
 
- switchstate:
 switchstate:
        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
                    " - appctx-state=%s\n",
                    (int)now.tv_sec, (int)now.tv_usec, agent->id,
@@ -1349,177 +1807,41 @@ handle_spoe_applet(struct appctx *appctx)
        switch (appctx->st0) {
                case SPOE_APPCTX_ST_CONNECT:
                        spoe_status_code = SPOE_FRM_ERR_NONE;
-                       if (si->state <= SI_ST_CON) {
-                               si_applet_want_put(si);
-                               task_wakeup(s->task, TASK_WOKEN_MSG);
-                               break;
-                       }
-                       else if (si->state != SI_ST_EST) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               on_new_spoe_appctx_failure(agent);
-                               goto switchstate;
-                       }
-                       ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
-                       if (ret < 0) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               on_new_spoe_appctx_failure(agent);
-                               goto switchstate;
-                       }
-                       else if (!ret)
-                               goto full;
-
-                       /* Hello frame was sent. Set the hello timeout and
-                        * wait for the reply. */
-                       APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
-                       appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
-                       /* fall through */
+                       if (handle_connect_spoe_applet(appctx))
+                               goto out;
+                       goto switchstate;
 
                case SPOE_APPCTX_ST_CONNECTING:
-                       if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               on_new_spoe_appctx_failure(agent);
-                               goto switchstate;
-                       }
-                       if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
-                               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-                                           " - Connection timed out\n",
-                                           (int)now.tv_sec, (int)now.tv_usec,
-                                           ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-                                           __FUNCTION__, appctx);
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               on_new_spoe_appctx_failure(agent);
-                               goto switchstate;
-                       }
-                       ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
-                       if (ret < 0) {
-                               appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-                               on_new_spoe_appctx_failure(agent);
-                               goto switchstate;
-                       }
-                       if (ret == 2) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               on_new_spoe_appctx_failure(agent);
-                               goto switchstate;
-                       }
-                       if (!ret)
+                       if (handle_connecting_spoe_applet(appctx))
                                goto out;
+                       goto switchstate;
 
-                       /* hello handshake is finished, set the idle timeout,
-                        * Add the appctx in the agent cache, decrease the
-                        * number of new applets and wake up waiting streams. */
-                       APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-                       appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
-                       on_new_spoe_appctx_success(agent, appctx);
-                       break;
-
-               case SPOE_APPCTX_ST_PROCESSING:
-                       if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               goto switchstate;
-                       }
-                       if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
-                               spoe_status_code = SPOE_FRM_ERR_TOUT;
+               case SPOE_APPCTX_ST_IDLE:
+                       if (stopping &&
+                           LIST_ISEMPTY(&agent->sending_queue) &&
+                           LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+                               APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
                                appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-                               appctx->st1 = SPOE_APPCTX_ERR_NONE;
                                goto switchstate;
                        }
-                       if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
-                               ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
-                               if (ret < 0) {
-                                       if (ret == -1) {
-                                               ctx->state = SPOE_CTX_ST_ERROR;
-                                               task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-                                               goto skip_notify_frame;
-                                       }
-                                       appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                                       goto switchstate;
-                               }
-                               else if (!ret)
-                                       goto full;
-                               ctx->state = SPOE_CTX_ST_WAITING_ACK;
-                               APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-                       }
-
-                 skip_notify_frame:
-                       if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
-                               ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
-                               if (ret < 0) {
-                                       if (ret == -1)
-                                               goto skip_notify_frame;
-                                       ctx->state = SPOE_CTX_ST_ERROR;
-                                       task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-                                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-                                       goto switchstate;
-                               }
-                               if (!ret)
-                                       goto out;
-                               if (ret == 2) {
-                                       ctx->state = SPOE_CTX_ST_ERROR;
-                                       task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-                                       appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                                       goto switchstate;
-                               }
-                               ctx->state = SPOE_CTX_ST_DONE;
-                               task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-                               APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-                       }
-                       else {
-                               if (stopping) {
-                                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-                                       goto switchstate;
-                               }
+                       agent->applets_idle--;
+                       appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+                       /* fall through */
 
-                               ret = recv_spoe_frame(appctx, NULL);
-                               if (ret < 0) {
-                                       if (ret == -1)
-                                               goto skip_notify_frame;
-                                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-                                       goto switchstate;
-                               }
-                               if (!ret)
-                                       goto out;
-                               if (ret == 2) {
-                                       appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                                       goto switchstate;
-                               }
-                               APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-                       }
-                       break;
+               case SPOE_APPCTX_ST_PROCESSING:
+                       if (handle_processing_spoe_applet(appctx))
+                               goto out;
+                       goto switchstate;
 
                case SPOE_APPCTX_ST_DISCONNECT:
-                       ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
-                       if (ret < 0) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               goto switchstate;
-                       }
-                       else if (!ret)
-                               goto full;
-                       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-                                   " - disconnected by HAProxy (%d): %s\n",
-                                   (int)now.tv_sec, (int)now.tv_usec,
-                                   ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-                                   __FUNCTION__, appctx, spoe_status_code,
-                                   spoe_frm_err_reasons[spoe_status_code]);
-
-                       APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-                       appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
-                       /* fall through */
+                       if (handle_disconnect_spoe_applet(appctx))
+                               goto out;
+                       goto switchstate;
 
                case SPOE_APPCTX_ST_DISCONNECTING:
-                       if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               goto switchstate;
-                       }
-                       if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               goto switchstate;
-                       }
-                       ret = recv_spoe_frame(appctx, NULL);
-                       if (ret < 0 || ret == 2) {
-                               appctx->st0 = SPOE_APPCTX_ST_EXIT;
-                               goto switchstate;
-                       }
-                       break;
+                       if (handle_disconnecting_spoe_applet(appctx))
+                               goto out;
+                       goto switchstate;
 
                case SPOE_APPCTX_ST_EXIT:
                        si_shutw(si);
@@ -1532,16 +1854,11 @@ handle_spoe_applet(struct appctx *appctx)
                case SPOE_APPCTX_ST_END:
                        return;
        }
-
- out:
+  out:
        if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
                task_queue(APPCTX_SPOE(appctx).task);
        si_oc(si)->flags |= CF_READ_DONTWAIT;
        task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
-       return;
- full:
-       si_applet_cant_put(si);
-       goto out;
 }
 
 struct applet spoe_applet = {
@@ -1568,13 +1885,15 @@ create_spoe_appctx(struct spoe_config *conf)
        if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
                goto out_free_appctx;
        APPCTX_SPOE(appctx).task->process   = process_spoe_applet;
-       APPCTX_SPOE(appctx).task->expire    = TICK_ETERNITY;
+       APPCTX_SPOE(appctx).task->expire    = TICK_ETERNITY;//tick_add_ifset(now_ms, conf->agent->timeout.hello);
        APPCTX_SPOE(appctx).task->context   = appctx;
        APPCTX_SPOE(appctx).agent           = conf->agent;
-       APPCTX_SPOE(appctx).ctx             = NULL;
        APPCTX_SPOE(appctx).version         = 0;
-       APPCTX_SPOE(appctx).max_frame_size  = global.tune.bufsize;
-       task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
+       APPCTX_SPOE(appctx).max_frame_size  = conf->agent->max_frame_size;
+       APPCTX_SPOE(appctx).flags           = 0;
+
+       LIST_INIT(&APPCTX_SPOE(appctx).list);
+       LIST_INIT(&APPCTX_SPOE(appctx).waiting_queue);
 
        sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
        if (!sess)
@@ -1592,10 +1911,6 @@ create_spoe_appctx(struct spoe_config *conf)
        si_applet_cant_get(&strm->si[0]);
        appctx_wakeup(appctx);
 
-       /* Increase the per-process number of cumulated connections */
-       if (conf->agent->cps_max > 0)
-               update_freq_ctr(&conf->agent->conn_per_sec, 1);
-
        strm->do_log = NULL;
        strm->res.flags |= CF_READ_DONTWAIT;
 
@@ -1603,6 +1918,9 @@ create_spoe_appctx(struct spoe_config *conf)
        jobs++;
        totalconn++;
 
+       task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
+       LIST_ADDQ(&conf->agent->applets, &APPCTX_SPOE(appctx).list);
+       conf->agent->applets_act++;
        return appctx;
 
        /* Error unrolling */
@@ -1618,200 +1936,92 @@ create_spoe_appctx(struct spoe_config *conf)
        return NULL;
 }
 
-/* Wake up a SPOE applet attached to a SPOE context. */
-static void
-wakeup_spoe_appctx(struct spoe_context *ctx)
-{
-       if (ctx->appctx == NULL)
-               return;
-       if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
-               si_applet_want_get(ctx->appctx->owner);
-               si_applet_want_put(ctx->appctx->owner);
-               appctx_wakeup(ctx->appctx);
-       }
-}
-
-
-/* Run across the list of pending streams waiting for a SPOE applet and wake the
- * first. */
-static void
-offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
-{
-       struct spoe_context *ctx;
-
-       if  (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
-               return;
-
-       if (LIST_ISEMPTY(&agent->applet_wq))
-               LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
-       else {
-               ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
-               APPCTX_SPOE(appctx).ctx = ctx;
-               ctx->appctx = appctx;
-               LIST_DEL(&ctx->applet_wait);
-               LIST_INIT(&ctx->applet_wait);
-               task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-                           " - wake up stream to get available SPOE applet\n",
-                           (int)now.tv_sec, (int)now.tv_usec, agent->id,
-                           __FUNCTION__, ctx->strm);
-       }
-}
-
-/* A failure occurred during SPOE applet creation. */
-static void
-on_new_spoe_appctx_failure(struct spoe_agent *agent)
-{
-       struct spoe_context *ctx;
-
-       list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
-               task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-                           " - wake up stream because to SPOE applet connection failed\n",
-                           (int)now.tv_sec, (int)now.tv_usec, agent->id,
-                           __FUNCTION__, ctx->strm);
-       }
-}
-
-static void
-on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
-{
-       offer_spoe_appctx(agent, appctx);
-}
-/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
- * returns 1 on success, 0 to retry later and -1 if an error occurred. */
 static int
-acquire_spoe_appctx(struct spoe_context *ctx, int dir)
+queue_spoe_context(struct spoe_context *ctx)
 {
        struct spoe_config *conf = FLT_CONF(ctx->filter);
        struct spoe_agent  *agent = conf->agent;
        struct appctx      *appctx;
+       unsigned int        min_applets;
 
-       /* If a process is already started for this SPOE context, retry
-        * later. */
-       if (ctx->flags & SPOE_CTX_FL_PROCESS)
-               goto wait;
-
-       /* If needed, initialize the buffer that will be used to encode messages
-        * and decode actions. */
-       if (ctx->buffer == &buf_empty) {
-               if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
-                       LIST_DEL(&ctx->buffer_wait.list);
-                       LIST_INIT(&ctx->buffer_wait.list);
-               }
-
-               if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
-                       LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
-                       goto wait;
-               }
-       }
-
-       /* If the SPOE applet was already set, all is done. */
-       if (ctx->appctx)
-               goto success;
+       min_applets = min_applets_act(agent);
 
-       /* Else try to retrieve it from the agent cache */
-       if (!LIST_ISEMPTY(&agent->cache)) {
-               appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
-               LIST_DEL(&APPCTX_SPOE(appctx).list);
-               APPCTX_SPOE(appctx).ctx = ctx;
-               ctx->appctx = appctx;
-               goto success;
-       }
-
-       /* If there is no server up for the agent's backend, this is an
-        * error. */
-       if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
-               goto error;
+       /* Check if we need to create a new SPOE applet or not. */
+       if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate)
+               goto end;
 
        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-                   " - waiting for available SPOE appctx\n",
+                   " - try to create new SPOE appctx\n",
                    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
                    ctx->strm);
 
-       /* Else add the stream in the waiting queue. */
-       if (LIST_ISEMPTY(&ctx->applet_wait))
-               LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
+       /* Do not try to create a new applet if there is no server up for the
+        * agent's backend. */
+       if (!agent->b.be->srv_act && !agent->b.be->srv_bck) {
+               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+                           " - cannot create SPOE appctx: no server up\n",
+                           (int)now.tv_sec, (int)now.tv_usec, agent->id,
+                           __FUNCTION__, ctx->strm);
+               goto end;
+       }
 
-       /* Finally, create new SPOE applet if we can */
+       /* Do not try to create a new applet if we have reached the maximum of
+        * connection per seconds */
        if (agent->cps_max > 0) {
-               if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
-                       goto wait;
+               if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
+                       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+                                   " - cannot create SPOE appctx: max CPS reached\n",
+                                   (int)now.tv_sec, (int)now.tv_usec, agent->id,
+                                   __FUNCTION__, ctx->strm);
+                       goto end;
+               }
        }
-       if (create_spoe_appctx(conf) == NULL)
-               goto error;
 
-  wait:
-       return 0;
-
-  success:
-       /* Remove the stream from the waiting queue */
-       if (!LIST_ISEMPTY(&ctx->applet_wait)) {
-               LIST_DEL(&ctx->applet_wait);
-               LIST_INIT(&ctx->applet_wait);
+       appctx = create_spoe_appctx(conf);
+       if (appctx == NULL) {
+               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+                           " - failed to create SPOE appctx\n",
+                           (int)now.tv_sec, (int)now.tv_usec, agent->id,
+                           __FUNCTION__, ctx->strm);
+               goto end;
        }
+       if (agent->applets_act <= min_applets)
+               APPCTX_SPOE(appctx).flags |= SPOE_APPCTX_FL_PERSIST;
 
-       /* Set the right flag to prevent request and response processing
-        * in same time. */
-       ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
-                      ? SPOE_CTX_FL_REQ_PROCESS
-                      : SPOE_CTX_FL_RSP_PROCESS);
+       /* Increase the per-process number of cumulated connections */
+       if (agent->cps_max > 0)
+               update_freq_ctr(&agent->conn_per_sec, 1);
 
-       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-                   " - acquire SPOE appctx %p from cache\n",
-                   (int)now.tv_sec, (int)now.tv_usec, agent->id,
-                   __FUNCTION__, ctx->strm, ctx->appctx);
-       return 1;
+  end:
+       /* The only reason to return an error is when there is no applet */
+       if (LIST_ISEMPTY(&agent->applets))
+               return 0;
 
-  error:
-       /* Remove the stream from the waiting queue */
-       if (!LIST_ISEMPTY(&ctx->applet_wait)) {
-               LIST_DEL(&ctx->applet_wait);
-               LIST_INIT(&ctx->applet_wait);
-       }
+       /* Add the SPOE context in the sending queue and update all running
+        * info */
+       LIST_ADDQ(&agent->sending_queue, &ctx->list);
+       if (agent->sending_rate)
+               agent->sending_rate--;
 
        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-                   " - failed to acquire SPOE appctx\n",
-                   (int)now.tv_sec, (int)now.tv_usec, agent->id,
-                   __FUNCTION__, ctx->strm);
-       send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
-
-       return -1;
-}
-
-/* Release a SPOE applet and push it in the agent cache. */
-static void
-release_spoe_appctx(struct spoe_context *ctx)
-{
-       struct spoe_config *conf = FLT_CONF(ctx->filter);
-       struct spoe_agent  *agent = conf->agent;
-       struct appctx      *appctx = ctx->appctx;
-
-       /* Reset the flag to allow next processing */
-       ctx->flags &= ~SPOE_CTX_FL_PROCESS;
-
-       /* Reset processing timer */
-       ctx->process_exp = TICK_ETERNITY;
-
-       /* Release the buffer if needed */
-       if (ctx->buffer != &buf_empty) {
-               b_free(&ctx->buffer);
-               offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+                   " - Add stream in sending queue - applets_act=%u - applets_idle=%u"
+                   " - sending_rate=%u\n",
+                   (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
+                   ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate);
+
+       /* Finally try to wakeup the first IDLE applet found and move it at the
+        * end of the list. */
+       list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
+               if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
+                       si_applet_want_get(appctx->owner);
+                       si_applet_want_put(appctx->owner);
+                       appctx_wakeup(appctx);
+                       LIST_DEL(&APPCTX_SPOE(appctx).list);
+                       LIST_ADDQ(&agent->applets, &APPCTX_SPOE(appctx).list);
+                       break;
+               }
        }
-
-       /* If there is no SPOE applet, all is done */
-       if (!appctx)
-               return;
-
-       /* Else, reassign it or push it in the agent cache */
-       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-                   " - release SPOE appctx %p\n",
-                   (int)now.tv_sec, (int)now.tv_usec, agent->id,
-                   __FUNCTION__, ctx->strm, appctx);
-
-       APPCTX_SPOE(appctx).ctx = NULL;
-       ctx->appctx = NULL;
-       offer_spoe_appctx(agent, appctx);
+       return 1;
 }
 
 /***************************************************************************
@@ -1824,6 +2034,8 @@ static int
 process_spoe_messages(struct stream *s, struct spoe_context *ctx,
                      struct list *messages, int dir)
 {
+       struct spoe_config  *conf = FLT_CONF(ctx->filter);
+       struct spoe_agent   *agent = conf->agent;
        struct spoe_message *msg;
        struct sample       *smp;
        struct spoe_arg     *arg;
@@ -1832,9 +2044,8 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx,
        int     off, flag, idx = 0;
 
        /* Reserve 32 bytes from the frame Metadata */
-       max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
+       max_size = agent->max_frame_size - 32;
 
-       b_reset(ctx->buffer);
        p = ctx->buffer->p;
 
        /* Loop on messages */
@@ -1937,7 +2148,6 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx,
        return 1;
 
   skip:
-       b_reset(ctx->buffer);
        return 0;
 }
 
@@ -2081,6 +2291,47 @@ process_spoe_actions(struct stream *s, struct spoe_context *ctx,
        return 0;
 }
 
+static int
+start_event_processing(struct spoe_context *ctx, int dir)
+{
+       int ret;
+       /* If a process is already started for this SPOE context, retry
+        * later. */
+       if (ctx->flags & SPOE_CTX_FL_PROCESS)
+               goto wait;
+
+       ret = acquire_spoe_buffer(ctx);
+       if (ret <= 0)
+               return ret;
+
+       /* Set the right flag to prevent request and response processing
+        * in same time. */
+       ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
+                      ? SPOE_CTX_FL_REQ_PROCESS
+                      : SPOE_CTX_FL_RSP_PROCESS);
+
+       return 1;
+
+  wait:
+       return 0;
+}
+
+static void
+stop_event_processing(struct spoe_context *ctx)
+{
+       /* Reset the flag to allow next processing */
+       ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+
+       /* Reset processing timer */
+       ctx->process_exp = TICK_ETERNITY;
+
+       release_spoe_buffer(ctx);
+
+       if (!LIST_ISEMPTY(&ctx->list)) {
+               LIST_DEL(&ctx->list);
+               LIST_INIT(&ctx->list);
+       }
+}
 
 /* Process a SPOE event. First, this functions will process messages attached to
  * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
@@ -2101,15 +2352,6 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
                    agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
                    spoe_event_str[ev]);
 
-       if (agent->eps_max > 0) {
-               if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
-                       SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-                                   " - skip event '%s': max EPS reached\n",
-                                   (int)now.tv_sec, (int)now.tv_usec,
-                                   agent->id, __FUNCTION__, s, spoe_event_str[ev]);
-                       goto skip;
-               }
-       }
 
        dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
 
@@ -2131,38 +2373,43 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
        }
 
        if (ctx->state == SPOE_CTX_ST_READY) {
+               if (agent->eps_max > 0) {
+                       if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
+                               SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+                                           " - skip event '%s': max EPS reached\n",
+                                           (int)now.tv_sec, (int)now.tv_usec,
+                                           agent->id, __FUNCTION__, s, spoe_event_str[ev]);
+                               goto skip;
+                       }
+               }
+
                if (!tick_isset(ctx->process_exp)) {
                        ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
                        s->task->expire  = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
                                                      ctx->process_exp);
                }
-
-               ret = acquire_spoe_appctx(ctx, dir);
+               ret = start_event_processing(ctx, dir);
                if (ret <= 0) {
                        if (!ret)
                                goto out;
                        goto error;
                }
-               ctx->state = SPOE_CTX_ST_SENDING_MSGS;
-       }
-
-       if (ctx->appctx == NULL)
-               goto error;
-
-       if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
                ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
                if (ret <= 0) {
                        if (!ret)
                                goto skip;
                        goto error;
                }
-               wakeup_spoe_appctx(ctx);
-               ret = 0;
-               goto out;
+
+               if (!queue_spoe_context(ctx))
+                       goto error;
+
+               ctx->state = SPOE_CTX_ST_SENDING_MSGS;
+               /* fall through */
        }
 
-       if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
-               wakeup_spoe_appctx(ctx);
+       if (ctx->state == SPOE_CTX_ST_SENDING_MSGS ||
+           ctx->state == SPOE_CTX_ST_WAITING_ACK) {
                ret = 0;
                goto out;
        }
@@ -2175,18 +2422,13 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
                        goto error;
                }
                ctx->frame_id++;
-               release_spoe_appctx(ctx);
                ctx->state = SPOE_CTX_ST_READY;
+               goto end;
        }
 
   out:
        return ret;
 
-  skip:
-       release_spoe_appctx(ctx);
-       ctx->state = SPOE_CTX_ST_READY;
-       return 1;
-
   error:
        if (agent->eps_max > 0)
                update_freq_ctr(&agent->err_per_sec, 1);
@@ -2194,6 +2436,7 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
        if (agent->var_on_error) {
                struct sample smp;
 
+               // FIXME: Get the error code here
                memset(&smp, 0, sizeof(smp));
                smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
                smp.data.u.sint = 1;
@@ -2203,17 +2446,57 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
                             strlen(agent->var_on_error), &smp);
        }
 
-       release_spoe_appctx(ctx);
        ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
                      ? SPOE_CTX_ST_READY
                      : SPOE_CTX_ST_ERROR);
-       return 1;
-}
+       ret = 1;
+       goto end;
 
+  skip:
+       ctx->state = SPOE_CTX_ST_READY;
+       ret = 1;
+
+  end:
+       stop_event_processing(ctx);
+       return ret;
+}
 
 /***************************************************************************
  * Functions that create/destroy SPOE contexts
  **************************************************************************/
+static int
+acquire_spoe_buffer(struct spoe_context *ctx)
+{
+       if (ctx->buffer != &buf_empty)
+               return 1;
+
+       if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
+               LIST_DEL(&ctx->buffer_wait.list);
+               LIST_INIT(&ctx->buffer_wait.list);
+       }
+
+       if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
+               return 1;
+
+       LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
+       return 0;
+}
+
+static void
+release_spoe_buffer(struct spoe_context *ctx)
+{
+       if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
+               LIST_DEL(&ctx->buffer_wait.list);
+               LIST_INIT(&ctx->buffer_wait.list);
+       }
+
+       /* Release the buffer if needed */
+       if (ctx->buffer != &buf_empty) {
+               b_free(&ctx->buffer);
+               offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+       }
+}
+
 static int wakeup_spoe_context(struct spoe_context *ctx)
 {
        task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
@@ -2239,7 +2522,7 @@ create_spoe_context(struct filter *filter)
        LIST_INIT(&ctx->buffer_wait.list);
        ctx->buffer_wait.target = ctx;
        ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
-       LIST_INIT(&ctx->applet_wait);
+       LIST_INIT(&ctx->list);
 
        ctx->stream_id   = 0;
        ctx->frame_id    = 1;
@@ -2254,12 +2537,10 @@ destroy_spoe_context(struct spoe_context *ctx)
        if (!ctx)
                return;
 
-       if (ctx->appctx)
-               APPCTX_SPOE(ctx->appctx).ctx = NULL;
        if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
                LIST_DEL(&ctx->buffer_wait.list);
-       if (!LIST_ISEMPTY(&ctx->applet_wait))
-               LIST_DEL(&ctx->applet_wait);
+       if (!LIST_ISEMPTY(&ctx->list))
+               LIST_DEL(&ctx->list);
        pool_free2(pool2_spoe_ctx, ctx);
 }
 
@@ -2295,7 +2576,7 @@ sig_stop_spoe(struct sig_handler *sh)
                        conf  = fconf->conf;
                        agent = conf->agent;
 
-                       list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
+                       list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
                                si_applet_want_get(appctx->owner);
                                si_applet_want_put(appctx->owner);
                                appctx_wakeup(appctx);
@@ -2437,17 +2718,11 @@ spoe_start(struct stream *s, struct filter *filter)
 static void
 spoe_stop(struct stream *s, struct filter *filter)
 {
-       struct spoe_context *ctx = filter->ctx;
-
        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
                    (int)now.tv_sec, (int)now.tv_usec,
                    ((struct spoe_config *)FLT_CONF(filter))->agent->id,
                    __FUNCTION__, s);
-
-       if (ctx) {
-               release_spoe_appctx(ctx);
-               destroy_spoe_context(ctx);
-       }
+       destroy_spoe_context(filter->ctx);
 }
 
 
@@ -2461,10 +2736,7 @@ spoe_check_timeouts(struct stream *s, struct filter *filter)
 
        if (tick_is_expired(ctx->process_exp, now_ms)) {
                s->pending_events |= TASK_WOKEN_MSG;
-               if (ctx->buffer != &buf_empty) {
-                       b_free(&ctx->buffer);
-                       offer_buffers(ctx, tasks_run_queue + applets_active_queue);
-               }
+               release_spoe_buffer(ctx);
        }
 }
 
@@ -2511,13 +2783,13 @@ spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
                                goto out;
                }
                ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
+               if (!ret) {
+                       channel_dont_read(chn);
+                       channel_dont_close(chn);
+               }
        }
 
   out:
-       if (!ret) {
-                channel_dont_read(chn);
-                channel_dont_close(chn);
-       }
        return ret;
 }
 
@@ -2654,21 +2926,34 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
                }
 
                curagent->id              = strdup(args[1]);
+
                curagent->conf.file       = strdup(file);
                curagent->conf.line       = linenum;
-               curagent->timeout.hello   = TICK_ETERNITY;
-               curagent->timeout.idle    = TICK_ETERNITY;
+
+               curagent->timeout.hello      = TICK_ETERNITY;
+               curagent->timeout.idle       = TICK_ETERNITY;
                curagent->timeout.processing = TICK_ETERNITY;
-               curagent->var_pfx         = NULL;
-               curagent->var_on_error    = NULL;
-               curagent->flags           = 0;
-               curagent->cps_max         = 0;
-               curagent->eps_max         = 0;
+
+               curagent->engine_id      = NULL;
+               curagent->var_pfx        = NULL;
+               curagent->var_on_error   = NULL;
+               curagent->flags          = 0;
+               curagent->cps_max        = 0;
+               curagent->eps_max        = 0;
+               curagent->max_frame_size = global.tune.bufsize - 4;
+               curagent->min_applets    = 0;
+               curagent->max_fpa        = 100;
 
                for (i = 0; i < SPOE_EV_EVENTS; ++i)
                        LIST_INIT(&curagent->messages[i]);
-               LIST_INIT(&curagent->cache);
-               LIST_INIT(&curagent->applet_wq);
+
+               curagent->applets_act  = 0;
+               curagent->applets_idle = 0;
+               curagent->sending_rate = 0;
+
+               LIST_INIT(&curagent->applets);
+               LIST_INIT(&curagent->sending_queue);
+               LIST_INIT(&curagent->waiting_queue);
        }
        else if (!strcmp(args[0], "use-backend")) {
                if (!*args[1]) {
@@ -3114,6 +3399,8 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
                }
                curagent->var_pfx = strdup(curagent->id);
        }
+       if (curagent->engine_id == NULL)
+               curagent->engine_id = generate_pseudo_uuid();
 
        if (LIST_ISEMPTY(&curmps)) {
                Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",