#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
+/* Flags set on the SPOE applet */
+#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
+#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */
+#define SPOE_APPCTX_FL_PERSIST 0x00000004 /* Set if the applet is persistent */
+
#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
enum spoe_appctx_state {
SPOE_APPCTX_ST_CONNECT = 0,
SPOE_APPCTX_ST_CONNECTING,
+ SPOE_APPCTX_ST_IDLE,
SPOE_APPCTX_ST_PROCESSING,
SPOE_APPCTX_ST_DISCONNECT,
SPOE_APPCTX_ST_DISCONNECTING,
/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
* an argument list (see above) and it is linked to a specific event. */
struct spoe_message {
- char *id; /* SPOE message id */
- unsigned int id_len; /* The message id length */
+ char *id; /* SPOE message id */
+ unsigned int id_len; /* The message id length */
struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
struct {
- char *file; /* file where the SPOE message appears */
- int line; /* line where the SPOE message appears */
- } conf; /* config information */
- struct list args; /* Arguments added when the SPOE messages is sent */
- struct list list; /* Used to chain SPOE messages */
+ char *file; /* file where the SPOE message appears */
+ int line; /* line where the SPOE message appears */
+ } conf; /* config information */
+ struct list args; /* Arguments added when the SPOE messages is sent */
+ struct list list; /* Used to chain SPOE messages */
enum spoe_event event; /* SPOE_EV_* */
};
unsigned int processing; /* Max time to process an event (in the main stream) */
} timeout;
+ /* Config info */
+ char *engine_id; /* engine-id string */
char *var_pfx; /* Prefix used for vars set by the agent */
char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
unsigned int flags; /* SPOE_FL_* */
- unsigned int cps_max; /* Maximum number of connections per second */
- unsigned int eps_max; /* Maximum number of errors per second */
-
- struct list cache; /* List used to cache SPOE streams. In
- * fact, we cache the SPOE applect ctx */
+ unsigned int cps_max; /* Maximum # of connections per second */
+ unsigned int eps_max; /* Maximum # of errors per second */
+ unsigned int max_frame_size; /* Maximum frame size for this agent, before any negotiation */
+ unsigned int min_applets; /* Minimum # applets alive at a time */
+ unsigned int max_fpa; /* Maximum # of frames handled per applet at once */
struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
* for each supported events */
- struct list applet_wq; /* List of streams waiting for a SPOE applet */
- struct freq_ctr conn_per_sec; /* connections per second */
- struct freq_ctr err_per_sec; /* connetion errors per second */
+ /* running info */
+ unsigned int applets_act; /* # of applets alive at a time */
+ unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
+ unsigned int sending_rate; /* the global sending rate */
+
+ struct freq_ctr conn_per_sec; /* connections per second */
+ struct freq_ctr err_per_sec; /* connetion errors per second */
+
+ struct list applets; /* List of available SPOE applets */
+ struct list sending_queue; /* Queue of streams waiting to send data */
+ struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */
+
};
/* SPOE filter configuration */
struct spoe_context {
struct filter *filter; /* The SPOE filter */
struct stream *strm; /* The stream that should be offloaded */
- struct appctx *appctx; /* The SPOE appctx */
+
struct list *messages; /* List of messages that will be sent during the stream processing */
struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
- struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
+ struct list list;
enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
unsigned int flags; /* SPOE_CTX_FL_* */
struct flt_ops spoe_ops;
-static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
-static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
-static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
+static int queue_spoe_context(struct spoe_context *ctx);
+static int acquire_spoe_buffer(struct spoe_context *ctx);
+static void release_spoe_buffer(struct spoe_context *ctx);
/********************************************************************
* helper functions/globals
free(agent->id);
free(agent->conf.file);
free(agent->var_pfx);
+ free(agent->engine_id);
free(agent->var_on_error);
for (i = 0; i < SPOE_EV_EVENTS; ++i) {
list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
[SPOE_APPCTX_ST_CONNECT] = "CONNECT",
[SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
+ [SPOE_APPCTX_ST_IDLE] = "IDLE",
[SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
[SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
[SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
};
#endif
+
+static char *
+generate_pseudo_uuid()
+{
+ static int init = 0;
+
+ const char uuid_fmt[] = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx";
+ const char uuid_chr[] = "0123456789ABCDEF-";
+ char *uuid;
+ int i;
+
+ if ((uuid = calloc(1, sizeof(uuid_fmt))) == NULL)
+ return NULL;
+
+ if (!init) {
+ srand(now_ms);
+ init = 1;
+ }
+
+ for (i = 0; i < sizeof(uuid_fmt)-1; i++) {
+ int r = rand () % 16;
+
+ switch (uuid_fmt[i]) {
+ case 'x' : uuid[i] = uuid_chr[r]; break;
+ case 'y' : uuid[i] = uuid_chr[(r & 0x03) | 0x08]; break;
+ default : uuid[i] = uuid_fmt[i]; break;
+ }
+ }
+ return uuid;
+}
+
+static inline unsigned int
+min_applets_act(struct spoe_agent *agent)
+{
+ unsigned int nbsrv;
+
+ if (agent->min_applets)
+ return agent->min_applets;
+
+ nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck);
+ return 2*nbsrv;
+}
+
/********************************************************************
* Functions that encode/decode SPOE frames
********************************************************************/
#define VERSION_KEY "version"
#define MAX_FRAME_SIZE_KEY "max-frame-size"
#define CAPABILITIES_KEY "capabilities"
+#define ENGINE_ID_KEY "engine-id"
#define HEALTHCHECK_KEY "healthcheck"
#define STATUS_CODE_KEY "status-code"
#define MSG_KEY "message"
#define SUPPORTED_VERSIONS_VAL "1.0"
/* Comma-separated list of supported capabilities (none for now) */
-#define CAPABILITIES_VAL ""
+//#define CAPABILITIES_VAL ""
+#define CAPABILITIES_VAL "pipelining,async"
static int
decode_spoe_version(const char *str, size_t len)
static int
prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
{
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
int idx = 0;
size_t max = (7 /* TYPE + METADATA */
+ 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
+ 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
- + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
+ + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
+ + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
if (size < max)
return -1;
frame[idx++] = SPOE_DATA_T_STR;
idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
+ /* "engine-id" K/V item */
+ if (agent != NULL && agent->engine_id != NULL) {
+ idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx);
+ frame[idx++] = SPOE_DATA_T_STR;
+ idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx);
+ }
+
return idx;
}
/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
* success, 0 if the frame can be ignored and -1 if an error occurred. */
static int
-prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
+prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
+ char *frame, size_t size)
{
- struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
- int idx = 0;
+ int idx = 0;
if (size < APPCTX_SPOE(appctx).max_frame_size)
return -1;
idx += encode_spoe_varint(ctx->stream_id, frame+idx);
idx += encode_spoe_varint(ctx->frame_id, frame+idx);
+ /* Copy encoded messages */
+ if (idx + ctx->buffer->i > size)
+ return 0;
+
/* Copy encoded messages */
memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
idx += ctx->buffer->i;
static int
handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
{
- int vsn, max_frame_size;
+ int vsn, max_frame_size, flags;
int i, idx = 0;
size_t min_size = (7 /* TYPE + METADATA */
+ 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
* "capabilities" */
/* Loop on K/V items */
- vsn = max_frame_size = 0;
+ vsn = max_frame_size = flags = 0;
while (idx < size) {
char *str;
uint64_t sz;
}
max_frame_size = sz;
}
- /* Skip "capabilities" K/V item for now */
+ /* Check "capabilities" K/V item */
+ else if (!memcmp(str, CAPABILITIES_KEY, sz)) {
+ int i;
+
+ /* The value must be a string */
+ if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
+ if (str == NULL)
+ continue;
+
+ i = 0;
+ while (i < sz) {
+ char *delim;
+
+ /* Skip leading spaces */
+ for (; isspace(str[i]) && i < sz; i++);
+
+ if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) {
+ i += 10;
+ if (sz == i || isspace(str[i]) || str[i] == ',')
+ flags |= SPOE_APPCTX_FL_PIPELINING;
+ }
+ else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) {
+ i += 5;
+ if (sz == i || isspace(str[i]) || str[i] == ',')
+ flags |= SPOE_APPCTX_FL_ASYNC;
+ }
+
+ if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
+ break;
+ i = (delim - str) + 1;
+ }
+ }
else {
/* Silently ignore unknown item */
if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
APPCTX_SPOE(appctx).version = (unsigned int)vsn;
APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
+ APPCTX_SPOE(appctx).flags |= flags;
return idx;
}
}
-/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
+/* Decode ACK frame sent by an agent. It returns the number of read bytes on
* success, 0 if the frame can be ignored and -1 if an error occurred. */
static int
handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
{
- struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
+ struct spoe_context *ctx, *back;
uint64_t stream_id, frame_id;
- int idx = 0;
+ int i, idx = 0;
size_t min_size = (7 /* TYPE + METADATA */);
/* Check frame type */
idx += 4;
/* Get the stream-id and the frame-id */
- idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
- idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
-
- /* Check stream-id and frame-id */
- if (ctx->stream_id != (unsigned int)stream_id ||
- ctx->frame_id != (unsigned int)frame_id)
+ if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1)
+ return 0;
+ idx += i;
+ if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1)
return 0;
+ idx += i;
+
+ if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) {
+ list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+ if (ctx->stream_id == (unsigned int)stream_id &&
+ ctx->frame_id == (unsigned int)frame_id)
+ goto found;
+ }
+ }
+ else {
+ list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
+ if (ctx->stream_id == (unsigned int)stream_id &&
+ ctx->frame_id == (unsigned int)frame_id)
+ goto found;
+ }
+ }
+
+ /* No Stream found, ignore the frame */
+ return 0;
+
+ found:
+ if (acquire_spoe_buffer(ctx) <= 0)
+ return 1; /* Retry later */
/* Copy encoded actions */
- b_reset(ctx->buffer);
memcpy(ctx->buffer->p, frame+idx, size-idx);
ctx->buffer->i = size-idx;
+ /* Notify the stream */
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ ctx->state = SPOE_CTX_ST_DONE;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+
return idx;
}
memset(&a, 0, sizeof(a));
memset(buf, 0, sizeof(buf));
- APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
+ APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
frame = buf+4;
idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
int r;
memset(&a, 0, sizeof(a));
- APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
+ APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
goto error;
return -1;
}
+/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when
+ * the frame can be ignored, 1 to retry later, and the frame legnth on
+ * success. */
+static int
+send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
+{
+ struct stream_interface *si = appctx->owner;
+ int ret;
+ uint32_t netint;
+
+ if (si_ic(si)->buf == &buf_empty)
+ return 1;
+
+ netint = htonl(framesz);
+ memcpy(buf, (char *)&netint, 4);
+ ret = bi_putblk(si_ic(si), buf, framesz+4);
+
+ if (ret <= 0) {
+ if (ret == -1)
+ return 1; /* retry */
+ return -1; /* error */
+ }
+ return framesz;
+}
+
+/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
+ * when the frame can be ignored, 1 to retry later and the frame length on
+ * success. */
+static int
+recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
+{
+ struct stream_interface *si = appctx->owner;
+ int ret;
+ uint32_t netint;
+
+ if (si_oc(si)->buf == &buf_empty)
+ return 1;
+
+ ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
+ if (ret > 0) {
+ framesz = ntohl(netint);
+ if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
+ spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
+ return -1;
+ }
+ ret = bo_getblk(si_oc(si), trash.str, framesz, 4);
+ }
+ if (ret <= 0) {
+ if (ret == 0)
+ return 1; /* retry */
+ spoe_status_code = SPOE_FRM_ERR_IO;
+ return -1; /* error */
+ }
+ return framesz;
+}
+
/********************************************************************
* Functions that manage the SPOE applet
********************************************************************/
appctx->st1 = SPOE_APPCTX_ERR_TOUT;
}
si_applet_want_get(appctx->owner);
+ si_applet_want_put(appctx->owner);
appctx_wakeup(appctx);
return task;
}
-/* Remove a SPOE applet from the agent cache */
-static void
-remove_spoe_applet_from_cache(struct appctx *appctx)
-{
- struct appctx *a, *back;
- struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
-
- if (LIST_ISEMPTY(&agent->cache))
- return;
-
- list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
- if (a == appctx) {
- LIST_DEL(&APPCTX_SPOE(appctx).list);
- break;
- }
- }
-}
-
-
/* Callback function that releases a SPOE applet. This happens when the
* connection with the agent is closed. */
static void
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
- struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
+ struct spoe_context *ctx, *back;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx);
- if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
- appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
- on_new_spoe_appctx_failure(agent);
+ agent->applets_act--;
+ if (!LIST_ISEMPTY(&APPCTX_SPOE(appctx).list)) {
+ LIST_DEL(&APPCTX_SPOE(appctx).list);
+ LIST_INIT(&APPCTX_SPOE(appctx).list);
+ }
if (appctx->st0 != SPOE_APPCTX_ST_END) {
+ if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
+ agent->applets_idle--;
+
si_shutw(si);
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
appctx->st0 = SPOE_APPCTX_ST_END;
}
- if (ctx != NULL) {
+ if (APPCTX_SPOE(appctx).task) {
+ task_delete(APPCTX_SPOE(appctx).task);
+ task_free(APPCTX_SPOE(appctx).task);
+ }
+
+ list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ ctx->state = SPOE_CTX_ST_ERROR;
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- ctx->appctx = NULL;
}
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, appctx);
+ if (!LIST_ISEMPTY(&agent->applets))
+ return;
- /* Release the task attached to the SPOE applet */
- if (APPCTX_SPOE(appctx).task) {
- task_delete(APPCTX_SPOE(appctx).task);
- task_free(APPCTX_SPOE(appctx).task);
+ list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ ctx->state = SPOE_CTX_ST_ERROR;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
- /* And remove it from the agent cache */
- remove_spoe_applet_from_cache(appctx);
- APPCTX_SPOE(appctx).ctx = NULL;
+ list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ ctx->state = SPOE_CTX_ST_ERROR;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ }
}
-/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
- * the frame can be ignored, 0 to retry later and 1 on success. The frame is
- * encoded using the callback function <prepare>. */
static int
-send_spoe_frame(struct appctx *appctx,
- int (*prepare)(struct appctx *, char *, size_t))
+handle_connect_spoe_applet(struct appctx *appctx)
{
- struct stream_interface *si = appctx->owner;
- int framesz, ret;
- uint32_t netint;
+ struct stream_interface *si = appctx->owner;
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
+ char *frame = trash.str;
+ int ret;
- if (si_ic(si)->buf->size == 0)
- return -1;
+ if (si->state <= SI_ST_CON) {
+ si_applet_want_put(si);
+ task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG);
+ goto stop;
+ }
+ if (si->state != SI_ST_EST)
+ goto exit;
- ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
- if (ret <= 0)
- goto skip_or_error;
- framesz = ret;
- netint = htonl(framesz);
- ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
- if (ret > 0)
- ret = bi_putblk(si_ic(si), trash.str, framesz);
- if (ret <= 0) {
- if (ret == -1)
- return -1;
- return -2;
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
+ goto exit;
+ }
+
+ if (APPCTX_SPOE(appctx).task->expire == TICK_ETERNITY)
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
+
+ ret = prepare_spoe_hahello_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+ if (ret > 1)
+ ret = send_spoe_frame(appctx, frame, ret);
+
+ switch (ret) {
+ case -1: /* error */
+ goto exit;
+
+ case 0: /* ignore => an error, cannot be ignored */
+ goto exit;
+
+ case 1: /* retry later */
+ si_applet_cant_put(si);
+ goto stop;
+
+ default: /* CONNECT frame successfully sent */
+ appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
+ goto next;
}
+
+ next:
+ return 0;
+ stop:
return 1;
+ exit:
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ return 0;
+}
- skip_or_error:
- if (!ret)
- return -1;
- return -2;
+static int
+handle_connecting_spoe_applet(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
+ char *frame = trash.str;
+ int ret, framesz = 0;
+
+
+ if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+ goto exit;
+
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
+ goto exit;
+ }
+
+ ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+ if (ret > 1) {
+ if (*frame == SPOE_FRM_T_AGENT_DISCON) {
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+ goto next;
+ }
+ framesz = ret;
+ ret = handle_spoe_agenthello_frame(appctx, frame, framesz);
+ }
+
+ switch (ret) {
+ case -1: /* error */
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ goto next;
+
+ case 0: /* ignore */
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ goto next;
+
+ case 1: /* retry later */
+ goto stop;
+
+ default:
+ /* hello handshake is finished, set the idle timeout,
+ * Add the appctx in the agent cache, decrease the
+ * number of new applets and wake up waiting streams. */
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
+ agent->applets_idle++;
+ appctx->st0 = SPOE_APPCTX_ST_IDLE;
+ goto next;
+ }
+
+ next:
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ return 0;
+ stop:
+ return 1;
+ exit:
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ return 0;
}
-/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
- * when the frame can be ignored, 0 to retry later and 1 on success. The frame
- * is decoded using the callback function <handle>. */
static int
-recv_spoe_frame(struct appctx *appctx,
- int (*handle)(struct appctx *, char *, size_t))
+handle_processing_spoe_applet(struct appctx *appctx)
{
- struct stream_interface *si = appctx->owner;
- int framesz, ret;
- uint32_t netint;
+ struct stream_interface *si = appctx->owner;
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
+ struct spoe_context *ctx;
+ char *frame = trash.str;
+ unsigned int fpa = 0;
+ int ret, framesz = 0, skip_sending = 0, skip_receiving = 0;
+
+ if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+ goto exit;
+
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+ spoe_status_code = SPOE_FRM_ERR_TOUT;
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ appctx->st1 = SPOE_APPCTX_ERR_NONE;
+ goto next;
+ }
+
+ process:
+ if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
+ goto stop;
+
+ /* Frames must be handled synchronously and a the applet is waiting for
+ * a ACK frame */
+ if (!(APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
+ !LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+ if (skip_receiving)
+ goto stop;
+ goto recv_frame;
+ }
+
+ if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) {
+ skip_sending = 1;
+ goto recv_frame;
+ }
+
+ ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+ ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+ if (ret > 1)
+ ret = send_spoe_frame(appctx, frame, ret);
+
+ switch (ret) {
+ case -1: /* error */
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ goto next;
+
+ case 0: /* ignore */
+ agent->sending_rate++;
+ ctx->state = SPOE_CTX_ST_ERROR;
+ release_spoe_buffer(ctx);
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ fpa++;
+ break;
- ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
- if (ret <= 0)
- goto empty_or_error;
- framesz = ntohl(netint);
- if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
- spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
- return -2;
+ case 1: /* retry */
+ si_applet_cant_put(si);
+ skip_sending = 1;
+ break;
+
+ default:
+ agent->sending_rate++;
+ ctx->state = SPOE_CTX_ST_WAITING_ACK;
+ release_spoe_buffer(ctx);
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC)
+ LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+ else
+ LIST_ADDQ(&APPCTX_SPOE(appctx).waiting_queue, &ctx->list);
+ fpa++;
+ }
+
+ if (fpa > agent->max_fpa)
+ goto stop;
+
+ recv_frame:
+ if (skip_receiving)
+ goto process;
+
+ framesz = 0;
+ ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+ if (ret > 1) {
+ if (*frame == SPOE_FRM_T_AGENT_DISCON) {
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+ goto next;
+ }
+ framesz = ret;
+ ret = handle_spoe_agentack_frame(appctx, frame, framesz);
}
- ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
- if (ret <= 0)
- goto empty_or_error;
- bo_skip(si_oc(si), ret+sizeof(netint));
+ switch (ret) {
+ case -1: /* error */
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ goto next;
+
+ case 0: /* ignore */
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
+ fpa++;
+ break;
+
+ case 1: /* retry */
+ skip_receiving = 1;
+ break;
+
+ default:
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
+ fpa++;
+ }
+ goto process;
+
+ next:
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ return 0;
+ stop:
+ if ((APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
+ LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+ agent->applets_idle++;
+ appctx->st0 = SPOE_APPCTX_ST_IDLE;
+ }
+ if (fpa || (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_PERSIST)) {
+ LIST_DEL(&APPCTX_SPOE(appctx).list);
+ LIST_ADD(&agent->applets, &APPCTX_SPOE(appctx).list);
+ if (fpa)
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ }
+ return 1;
- /* First check if the received frame is a DISCONNECT frame */
- ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
- if (ret != 0) {
- if (ret > 0) {
+ exit:
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ return 0;
+}
+
+static int
+handle_disconnect_spoe_applet(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
+ char *frame = trash.str;
+ int ret;
+
+ if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+ goto exit;
+
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
+ goto exit;
+
+ ret = prepare_spoe_hadiscon_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+ if (ret > 1)
+ ret = send_spoe_frame(appctx, frame, ret);
+
+ switch (ret) {
+ case -1: /* error */
+ goto exit;
+
+ case 0: /* ignore */
+ goto exit;
+
+ case 1: /* retry */
+ si_applet_cant_put(si);
+ goto stop;
+
+ default:
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - disconnected by HAProxy (%d): %s\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx, spoe_status_code,
+ spoe_frm_err_reasons[spoe_status_code]);
+
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+ goto next;
+ }
+
+ next:
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ return 0;
+ stop:
+ return 1;
+ exit:
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ return 0;
+}
+
+static int
+handle_disconnecting_spoe_applet(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ char *frame = trash.str;
+ int ret, framesz = 0;
+
+ if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+ goto exit;
+
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
+ goto exit;
+
+ framesz = 0;
+ ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+ if (ret > 1) {
+ framesz = ret;
+ ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
+ }
+
+ switch (ret) {
+ case -1: /* error */
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - error on frame (%s)\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+ __FUNCTION__, appctx,
+ spoe_frm_err_reasons[spoe_status_code]);
+ goto exit;
+
+ case 0: /* ignore */
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
+ goto next;
+
+ case 1: /* retry */
+ goto stop;
+
+ default:
+ if (framesz)
+ bo_skip(si_oc(si), framesz+4);
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
" - disconnected by peer (%d): %s\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
__FUNCTION__, appctx, spoe_status_code,
spoe_reason);
- return 2;
- }
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
- " - error on frame (%s)\n",
- (int)now.tv_sec, (int)now.tv_usec,
- ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
- __FUNCTION__, appctx,
- spoe_frm_err_reasons[spoe_status_code]);
- return -2;
+ goto exit;
}
- if (handle == NULL)
- goto out;
- /* If not, try to decode it */
- ret = handle(appctx, trash.str, framesz);
- if (ret <= 0) {
- if (!ret)
- return -1;
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
- " - error on frame (%s)\n",
- (int)now.tv_sec, (int)now.tv_usec,
- ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
- __FUNCTION__, appctx,
- spoe_frm_err_reasons[spoe_status_code]);
- return -2;
- }
- out:
+ next:
+ return 0;
+ stop:
return 1;
-
- empty_or_error:
- if (!ret)
- return 0;
- spoe_status_code = SPOE_FRM_ERR_IO;
- return -2;
+ exit:
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ return 0;
}
/* I/O Handler processing messages exchanged with the agent */
handle_spoe_applet(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
- struct stream *s = si_strm(si);
struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
- struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
- int ret;
- switchstate:
+ switchstate:
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
" - appctx-state=%s\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
switch (appctx->st0) {
case SPOE_APPCTX_ST_CONNECT:
spoe_status_code = SPOE_FRM_ERR_NONE;
- if (si->state <= SI_ST_CON) {
- si_applet_want_put(si);
- task_wakeup(s->task, TASK_WOKEN_MSG);
- break;
- }
- else if (si->state != SI_ST_EST) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- on_new_spoe_appctx_failure(agent);
- goto switchstate;
- }
- ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
- if (ret < 0) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- on_new_spoe_appctx_failure(agent);
- goto switchstate;
- }
- else if (!ret)
- goto full;
-
- /* Hello frame was sent. Set the hello timeout and
- * wait for the reply. */
- APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
- appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
- /* fall through */
+ if (handle_connect_spoe_applet(appctx))
+ goto out;
+ goto switchstate;
case SPOE_APPCTX_ST_CONNECTING:
- if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- on_new_spoe_appctx_failure(agent);
- goto switchstate;
- }
- if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
- " - Connection timed out\n",
- (int)now.tv_sec, (int)now.tv_usec,
- ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
- __FUNCTION__, appctx);
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- on_new_spoe_appctx_failure(agent);
- goto switchstate;
- }
- ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
- if (ret < 0) {
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- on_new_spoe_appctx_failure(agent);
- goto switchstate;
- }
- if (ret == 2) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- on_new_spoe_appctx_failure(agent);
- goto switchstate;
- }
- if (!ret)
+ if (handle_connecting_spoe_applet(appctx))
goto out;
+ goto switchstate;
- /* hello handshake is finished, set the idle timeout,
- * Add the appctx in the agent cache, decrease the
- * number of new applets and wake up waiting streams. */
- APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- on_new_spoe_appctx_success(agent, appctx);
- break;
-
- case SPOE_APPCTX_ST_PROCESSING:
- if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- goto switchstate;
- }
- if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
- spoe_status_code = SPOE_FRM_ERR_TOUT;
+ case SPOE_APPCTX_ST_IDLE:
+ if (stopping &&
+ LIST_ISEMPTY(&agent->sending_queue) &&
+ LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- appctx->st1 = SPOE_APPCTX_ERR_NONE;
goto switchstate;
}
- if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
- ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
- if (ret < 0) {
- if (ret == -1) {
- ctx->state = SPOE_CTX_ST_ERROR;
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- goto skip_notify_frame;
- }
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- goto switchstate;
- }
- else if (!ret)
- goto full;
- ctx->state = SPOE_CTX_ST_WAITING_ACK;
- APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
- }
-
- skip_notify_frame:
- if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
- ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
- if (ret < 0) {
- if (ret == -1)
- goto skip_notify_frame;
- ctx->state = SPOE_CTX_ST_ERROR;
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- goto switchstate;
- }
- if (!ret)
- goto out;
- if (ret == 2) {
- ctx->state = SPOE_CTX_ST_ERROR;
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- goto switchstate;
- }
- ctx->state = SPOE_CTX_ST_DONE;
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
- }
- else {
- if (stopping) {
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- goto switchstate;
- }
+ agent->applets_idle--;
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ /* fall through */
- ret = recv_spoe_frame(appctx, NULL);
- if (ret < 0) {
- if (ret == -1)
- goto skip_notify_frame;
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- goto switchstate;
- }
- if (!ret)
- goto out;
- if (ret == 2) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- goto switchstate;
- }
- APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
- }
- break;
+ case SPOE_APPCTX_ST_PROCESSING:
+ if (handle_processing_spoe_applet(appctx))
+ goto out;
+ goto switchstate;
case SPOE_APPCTX_ST_DISCONNECT:
- ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
- if (ret < 0) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- goto switchstate;
- }
- else if (!ret)
- goto full;
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
- " - disconnected by HAProxy (%d): %s\n",
- (int)now.tv_sec, (int)now.tv_usec,
- ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
- __FUNCTION__, appctx, spoe_status_code,
- spoe_frm_err_reasons[spoe_status_code]);
-
- APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
- /* fall through */
+ if (handle_disconnect_spoe_applet(appctx))
+ goto out;
+ goto switchstate;
case SPOE_APPCTX_ST_DISCONNECTING:
- if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- goto switchstate;
- }
- if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- goto switchstate;
- }
- ret = recv_spoe_frame(appctx, NULL);
- if (ret < 0 || ret == 2) {
- appctx->st0 = SPOE_APPCTX_ST_EXIT;
- goto switchstate;
- }
- break;
+ if (handle_disconnecting_spoe_applet(appctx))
+ goto out;
+ goto switchstate;
case SPOE_APPCTX_ST_EXIT:
si_shutw(si);
case SPOE_APPCTX_ST_END:
return;
}
-
- out:
+ out:
if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
task_queue(APPCTX_SPOE(appctx).task);
si_oc(si)->flags |= CF_READ_DONTWAIT;
task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
- return;
- full:
- si_applet_cant_put(si);
- goto out;
}
struct applet spoe_applet = {
if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
goto out_free_appctx;
APPCTX_SPOE(appctx).task->process = process_spoe_applet;
- APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
+ APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;//tick_add_ifset(now_ms, conf->agent->timeout.hello);
APPCTX_SPOE(appctx).task->context = appctx;
APPCTX_SPOE(appctx).agent = conf->agent;
- APPCTX_SPOE(appctx).ctx = NULL;
APPCTX_SPOE(appctx).version = 0;
- APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
- task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
+ APPCTX_SPOE(appctx).max_frame_size = conf->agent->max_frame_size;
+ APPCTX_SPOE(appctx).flags = 0;
+
+ LIST_INIT(&APPCTX_SPOE(appctx).list);
+ LIST_INIT(&APPCTX_SPOE(appctx).waiting_queue);
sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
if (!sess)
si_applet_cant_get(&strm->si[0]);
appctx_wakeup(appctx);
- /* Increase the per-process number of cumulated connections */
- if (conf->agent->cps_max > 0)
- update_freq_ctr(&conf->agent->conn_per_sec, 1);
-
strm->do_log = NULL;
strm->res.flags |= CF_READ_DONTWAIT;
jobs++;
totalconn++;
+ task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
+ LIST_ADDQ(&conf->agent->applets, &APPCTX_SPOE(appctx).list);
+ conf->agent->applets_act++;
return appctx;
/* Error unrolling */
return NULL;
}
-/* Wake up a SPOE applet attached to a SPOE context. */
-static void
-wakeup_spoe_appctx(struct spoe_context *ctx)
-{
- if (ctx->appctx == NULL)
- return;
- if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
- si_applet_want_get(ctx->appctx->owner);
- si_applet_want_put(ctx->appctx->owner);
- appctx_wakeup(ctx->appctx);
- }
-}
-
-
-/* Run across the list of pending streams waiting for a SPOE applet and wake the
- * first. */
-static void
-offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
-{
- struct spoe_context *ctx;
-
- if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
- return;
-
- if (LIST_ISEMPTY(&agent->applet_wq))
- LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
- else {
- ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
- APPCTX_SPOE(appctx).ctx = ctx;
- ctx->appctx = appctx;
- LIST_DEL(&ctx->applet_wait);
- LIST_INIT(&ctx->applet_wait);
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - wake up stream to get available SPOE applet\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, ctx->strm);
- }
-}
-
-/* A failure occurred during SPOE applet creation. */
-static void
-on_new_spoe_appctx_failure(struct spoe_agent *agent)
-{
- struct spoe_context *ctx;
-
- list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - wake up stream because to SPOE applet connection failed\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, ctx->strm);
- }
-}
-
-static void
-on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
-{
- offer_spoe_appctx(agent, appctx);
-}
-/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
- * returns 1 on success, 0 to retry later and -1 if an error occurred. */
static int
-acquire_spoe_appctx(struct spoe_context *ctx, int dir)
+queue_spoe_context(struct spoe_context *ctx)
{
struct spoe_config *conf = FLT_CONF(ctx->filter);
struct spoe_agent *agent = conf->agent;
struct appctx *appctx;
+ unsigned int min_applets;
- /* If a process is already started for this SPOE context, retry
- * later. */
- if (ctx->flags & SPOE_CTX_FL_PROCESS)
- goto wait;
-
- /* If needed, initialize the buffer that will be used to encode messages
- * and decode actions. */
- if (ctx->buffer == &buf_empty) {
- if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
- LIST_DEL(&ctx->buffer_wait.list);
- LIST_INIT(&ctx->buffer_wait.list);
- }
-
- if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
- LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
- goto wait;
- }
- }
-
- /* If the SPOE applet was already set, all is done. */
- if (ctx->appctx)
- goto success;
+ min_applets = min_applets_act(agent);
- /* Else try to retrieve it from the agent cache */
- if (!LIST_ISEMPTY(&agent->cache)) {
- appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
- LIST_DEL(&APPCTX_SPOE(appctx).list);
- APPCTX_SPOE(appctx).ctx = ctx;
- ctx->appctx = appctx;
- goto success;
- }
-
- /* If there is no server up for the agent's backend, this is an
- * error. */
- if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
- goto error;
+ /* Check if we need to create a new SPOE applet or not. */
+ if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate)
+ goto end;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - waiting for available SPOE appctx\n",
+ " - try to create new SPOE appctx\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
ctx->strm);
- /* Else add the stream in the waiting queue. */
- if (LIST_ISEMPTY(&ctx->applet_wait))
- LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
+ /* Do not try to create a new applet if there is no server up for the
+ * agent's backend. */
+ if (!agent->b.be->srv_act && !agent->b.be->srv_bck) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - cannot create SPOE appctx: no server up\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm);
+ goto end;
+ }
- /* Finally, create new SPOE applet if we can */
+ /* Do not try to create a new applet if we have reached the maximum of
+ * connection per seconds */
if (agent->cps_max > 0) {
- if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
- goto wait;
+ if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - cannot create SPOE appctx: max CPS reached\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm);
+ goto end;
+ }
}
- if (create_spoe_appctx(conf) == NULL)
- goto error;
- wait:
- return 0;
-
- success:
- /* Remove the stream from the waiting queue */
- if (!LIST_ISEMPTY(&ctx->applet_wait)) {
- LIST_DEL(&ctx->applet_wait);
- LIST_INIT(&ctx->applet_wait);
+ appctx = create_spoe_appctx(conf);
+ if (appctx == NULL) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - failed to create SPOE appctx\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm);
+ goto end;
}
+ if (agent->applets_act <= min_applets)
+ APPCTX_SPOE(appctx).flags |= SPOE_APPCTX_FL_PERSIST;
- /* Set the right flag to prevent request and response processing
- * in same time. */
- ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
- ? SPOE_CTX_FL_REQ_PROCESS
- : SPOE_CTX_FL_RSP_PROCESS);
+ /* Increase the per-process number of cumulated connections */
+ if (agent->cps_max > 0)
+ update_freq_ctr(&agent->conn_per_sec, 1);
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - acquire SPOE appctx %p from cache\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, ctx->strm, ctx->appctx);
- return 1;
+ end:
+ /* The only reason to return an error is when there is no applet */
+ if (LIST_ISEMPTY(&agent->applets))
+ return 0;
- error:
- /* Remove the stream from the waiting queue */
- if (!LIST_ISEMPTY(&ctx->applet_wait)) {
- LIST_DEL(&ctx->applet_wait);
- LIST_INIT(&ctx->applet_wait);
- }
+ /* Add the SPOE context in the sending queue and update all running
+ * info */
+ LIST_ADDQ(&agent->sending_queue, &ctx->list);
+ if (agent->sending_rate)
+ agent->sending_rate--;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - failed to acquire SPOE appctx\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, ctx->strm);
- send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
-
- return -1;
-}
-
-/* Release a SPOE applet and push it in the agent cache. */
-static void
-release_spoe_appctx(struct spoe_context *ctx)
-{
- struct spoe_config *conf = FLT_CONF(ctx->filter);
- struct spoe_agent *agent = conf->agent;
- struct appctx *appctx = ctx->appctx;
-
- /* Reset the flag to allow next processing */
- ctx->flags &= ~SPOE_CTX_FL_PROCESS;
-
- /* Reset processing timer */
- ctx->process_exp = TICK_ETERNITY;
-
- /* Release the buffer if needed */
- if (ctx->buffer != &buf_empty) {
- b_free(&ctx->buffer);
- offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+ " - Add stream in sending queue - applets_act=%u - applets_idle=%u"
+ " - sending_rate=%u\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
+ ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate);
+
+ /* Finally try to wakeup the first IDLE applet found and move it at the
+ * end of the list. */
+ list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
+ if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
+ si_applet_want_get(appctx->owner);
+ si_applet_want_put(appctx->owner);
+ appctx_wakeup(appctx);
+ LIST_DEL(&APPCTX_SPOE(appctx).list);
+ LIST_ADDQ(&agent->applets, &APPCTX_SPOE(appctx).list);
+ break;
+ }
}
-
- /* If there is no SPOE applet, all is done */
- if (!appctx)
- return;
-
- /* Else, reassign it or push it in the agent cache */
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - release SPOE appctx %p\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, ctx->strm, appctx);
-
- APPCTX_SPOE(appctx).ctx = NULL;
- ctx->appctx = NULL;
- offer_spoe_appctx(agent, appctx);
+ return 1;
}
/***************************************************************************
process_spoe_messages(struct stream *s, struct spoe_context *ctx,
struct list *messages, int dir)
{
+ struct spoe_config *conf = FLT_CONF(ctx->filter);
+ struct spoe_agent *agent = conf->agent;
struct spoe_message *msg;
struct sample *smp;
struct spoe_arg *arg;
int off, flag, idx = 0;
/* Reserve 32 bytes from the frame Metadata */
- max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
+ max_size = agent->max_frame_size - 32;
- b_reset(ctx->buffer);
p = ctx->buffer->p;
/* Loop on messages */
return 1;
skip:
- b_reset(ctx->buffer);
return 0;
}
return 0;
}
+static int
+start_event_processing(struct spoe_context *ctx, int dir)
+{
+ int ret;
+ /* If a process is already started for this SPOE context, retry
+ * later. */
+ if (ctx->flags & SPOE_CTX_FL_PROCESS)
+ goto wait;
+
+ ret = acquire_spoe_buffer(ctx);
+ if (ret <= 0)
+ return ret;
+
+ /* Set the right flag to prevent request and response processing
+ * in same time. */
+ ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
+ ? SPOE_CTX_FL_REQ_PROCESS
+ : SPOE_CTX_FL_RSP_PROCESS);
+
+ return 1;
+
+ wait:
+ return 0;
+}
+
+static void
+stop_event_processing(struct spoe_context *ctx)
+{
+ /* Reset the flag to allow next processing */
+ ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+
+ /* Reset processing timer */
+ ctx->process_exp = TICK_ETERNITY;
+
+ release_spoe_buffer(ctx);
+
+ if (!LIST_ISEMPTY(&ctx->list)) {
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ }
+}
/* Process a SPOE event. First, this functions will process messages attached to
* this event and send them to an agent in a NOTIFY frame. Then, it will wait a
agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
spoe_event_str[ev]);
- if (agent->eps_max > 0) {
- if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - skip event '%s': max EPS reached\n",
- (int)now.tv_sec, (int)now.tv_usec,
- agent->id, __FUNCTION__, s, spoe_event_str[ev]);
- goto skip;
- }
- }
dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
}
if (ctx->state == SPOE_CTX_ST_READY) {
+ if (agent->eps_max > 0) {
+ if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - skip event '%s': max EPS reached\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ agent->id, __FUNCTION__, s, spoe_event_str[ev]);
+ goto skip;
+ }
+ }
+
if (!tick_isset(ctx->process_exp)) {
ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
ctx->process_exp);
}
-
- ret = acquire_spoe_appctx(ctx, dir);
+ ret = start_event_processing(ctx, dir);
if (ret <= 0) {
if (!ret)
goto out;
goto error;
}
- ctx->state = SPOE_CTX_ST_SENDING_MSGS;
- }
-
- if (ctx->appctx == NULL)
- goto error;
-
- if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
if (ret <= 0) {
if (!ret)
goto skip;
goto error;
}
- wakeup_spoe_appctx(ctx);
- ret = 0;
- goto out;
+
+ if (!queue_spoe_context(ctx))
+ goto error;
+
+ ctx->state = SPOE_CTX_ST_SENDING_MSGS;
+ /* fall through */
}
- if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
- wakeup_spoe_appctx(ctx);
+ if (ctx->state == SPOE_CTX_ST_SENDING_MSGS ||
+ ctx->state == SPOE_CTX_ST_WAITING_ACK) {
ret = 0;
goto out;
}
goto error;
}
ctx->frame_id++;
- release_spoe_appctx(ctx);
ctx->state = SPOE_CTX_ST_READY;
+ goto end;
}
out:
return ret;
- skip:
- release_spoe_appctx(ctx);
- ctx->state = SPOE_CTX_ST_READY;
- return 1;
-
error:
if (agent->eps_max > 0)
update_freq_ctr(&agent->err_per_sec, 1);
if (agent->var_on_error) {
struct sample smp;
+ // FIXME: Get the error code here
memset(&smp, 0, sizeof(smp));
smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
smp.data.u.sint = 1;
strlen(agent->var_on_error), &smp);
}
- release_spoe_appctx(ctx);
ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
? SPOE_CTX_ST_READY
: SPOE_CTX_ST_ERROR);
- return 1;
-}
+ ret = 1;
+ goto end;
+ skip:
+ ctx->state = SPOE_CTX_ST_READY;
+ ret = 1;
+
+ end:
+ stop_event_processing(ctx);
+ return ret;
+}
/***************************************************************************
* Functions that create/destroy SPOE contexts
**************************************************************************/
+static int
+acquire_spoe_buffer(struct spoe_context *ctx)
+{
+ if (ctx->buffer != &buf_empty)
+ return 1;
+
+ if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
+ LIST_DEL(&ctx->buffer_wait.list);
+ LIST_INIT(&ctx->buffer_wait.list);
+ }
+
+ if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
+ return 1;
+
+ LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
+ return 0;
+}
+
+static void
+release_spoe_buffer(struct spoe_context *ctx)
+{
+ if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
+ LIST_DEL(&ctx->buffer_wait.list);
+ LIST_INIT(&ctx->buffer_wait.list);
+ }
+
+ /* Release the buffer if needed */
+ if (ctx->buffer != &buf_empty) {
+ b_free(&ctx->buffer);
+ offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+ }
+}
+
static int wakeup_spoe_context(struct spoe_context *ctx)
{
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
LIST_INIT(&ctx->buffer_wait.list);
ctx->buffer_wait.target = ctx;
ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
- LIST_INIT(&ctx->applet_wait);
+ LIST_INIT(&ctx->list);
ctx->stream_id = 0;
ctx->frame_id = 1;
if (!ctx)
return;
- if (ctx->appctx)
- APPCTX_SPOE(ctx->appctx).ctx = NULL;
if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
LIST_DEL(&ctx->buffer_wait.list);
- if (!LIST_ISEMPTY(&ctx->applet_wait))
- LIST_DEL(&ctx->applet_wait);
+ if (!LIST_ISEMPTY(&ctx->list))
+ LIST_DEL(&ctx->list);
pool_free2(pool2_spoe_ctx, ctx);
}
conf = fconf->conf;
agent = conf->agent;
- list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
+ list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
si_applet_want_get(appctx->owner);
si_applet_want_put(appctx->owner);
appctx_wakeup(appctx);
static void
spoe_stop(struct stream *s, struct filter *filter)
{
- struct spoe_context *ctx = filter->ctx;
-
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_config *)FLT_CONF(filter))->agent->id,
__FUNCTION__, s);
-
- if (ctx) {
- release_spoe_appctx(ctx);
- destroy_spoe_context(ctx);
- }
+ destroy_spoe_context(filter->ctx);
}
if (tick_is_expired(ctx->process_exp, now_ms)) {
s->pending_events |= TASK_WOKEN_MSG;
- if (ctx->buffer != &buf_empty) {
- b_free(&ctx->buffer);
- offer_buffers(ctx, tasks_run_queue + applets_active_queue);
- }
+ release_spoe_buffer(ctx);
}
}
goto out;
}
ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
+ if (!ret) {
+ channel_dont_read(chn);
+ channel_dont_close(chn);
+ }
}
out:
- if (!ret) {
- channel_dont_read(chn);
- channel_dont_close(chn);
- }
return ret;
}
}
curagent->id = strdup(args[1]);
+
curagent->conf.file = strdup(file);
curagent->conf.line = linenum;
- curagent->timeout.hello = TICK_ETERNITY;
- curagent->timeout.idle = TICK_ETERNITY;
+
+ curagent->timeout.hello = TICK_ETERNITY;
+ curagent->timeout.idle = TICK_ETERNITY;
curagent->timeout.processing = TICK_ETERNITY;
- curagent->var_pfx = NULL;
- curagent->var_on_error = NULL;
- curagent->flags = 0;
- curagent->cps_max = 0;
- curagent->eps_max = 0;
+
+ curagent->engine_id = NULL;
+ curagent->var_pfx = NULL;
+ curagent->var_on_error = NULL;
+ curagent->flags = 0;
+ curagent->cps_max = 0;
+ curagent->eps_max = 0;
+ curagent->max_frame_size = global.tune.bufsize - 4;
+ curagent->min_applets = 0;
+ curagent->max_fpa = 100;
for (i = 0; i < SPOE_EV_EVENTS; ++i)
LIST_INIT(&curagent->messages[i]);
- LIST_INIT(&curagent->cache);
- LIST_INIT(&curagent->applet_wq);
+
+ curagent->applets_act = 0;
+ curagent->applets_idle = 0;
+ curagent->sending_rate = 0;
+
+ LIST_INIT(&curagent->applets);
+ LIST_INIT(&curagent->sending_queue);
+ LIST_INIT(&curagent->waiting_queue);
}
else if (!strcmp(args[0], "use-backend")) {
if (!*args[1]) {
}
curagent->var_pfx = strdup(curagent->id);
}
+ if (curagent->engine_id == NULL)
+ curagent->engine_id = generate_pseudo_uuid();
if (LIST_ISEMPTY(&curmps)) {
Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",