/* The minimum size for a frame */
#define MIN_FRAME_SIZE 256
-/* Reserved for the metadata and the frame type. So <MAX_FRAME_SIZE> -
- * <FRAME_HDR_SIZE> is the maximum payload size */
+/* Reserved for the metadata and the frame type.
+ * So <MAX_FRAME_SIZE> - <FRAME_HDR_SIZE> is the maximum payload size */
#define FRAME_HDR_SIZE 32
/* Flags set on the SPOE agent */
#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
+#define SPOE_CTX_FL_FRAGMENTED 0x00000010 /* Set when a fragmented frame is processing */
#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
/* Flags set on the SPOE applet */
-#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
-#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */
-#define SPOE_APPCTX_FL_PERSIST 0x00000004 /* Set if the applet is persistent */
+#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
+#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */
+#define SPOE_APPCTX_FL_FRAGMENTATION 0x00000004 /* Set if fragmentation is supported */
+#define SPOE_APPCTX_FL_PERSIST 0x00000008 /* Set if the applet is persistent */
#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
+/* Flags set on the SPOE frame */
+#define SPOE_FRM_FL_FIN 0x00000001
+#define SPOE_FRM_FL_ABRT 0x00000002
+
/* All possible states for a SPOE context */
enum spoe_ctx_state {
SPOE_CTX_ST_NONE = 0,
SPOE_CTX_ST_READY,
+ SPOE_CTX_ST_ENCODING_MSGS,
SPOE_CTX_ST_SENDING_MSGS,
SPOE_CTX_ST_WAITING_ACK,
SPOE_CTX_ST_DONE,
SPOE_APPCTX_ST_CONNECTING,
SPOE_APPCTX_ST_IDLE,
SPOE_APPCTX_ST_PROCESSING,
+ SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY,
+ SPOE_APPCTX_ST_WAITING_SYNC_ACK,
SPOE_APPCTX_ST_DISCONNECT,
SPOE_APPCTX_ST_DISCONNECTING,
SPOE_APPCTX_ST_EXIT,
SPOE_CTX_ERR_NONE = 0,
SPOE_CTX_ERR_TOUT,
SPOE_CTX_ERR_RES,
+ SPOE_CTX_ERR_TOO_BIG,
SPOE_CTX_ERR_UNKNOWN = 255,
SPOE_CTX_ERRS,
};
SPOE_FRM_ERR_NO_CAP,
SPOE_FRM_ERR_BAD_VSN,
SPOE_FRM_ERR_BAD_FRAME_SIZE,
+ SPOE_FRM_ERR_FRAG_NOT_SUPPORTED,
+ SPOE_FRM_ERR_INTERLACED_FRAMES,
+ SPOE_FRM_ERR_RES,
SPOE_FRM_ERR_UNKNOWN = 99,
SPOE_FRM_ERRS,
};
char *file; /* file where the SPOE message appears */
int line; /* line where the SPOE message appears */
} conf; /* config information */
+ unsigned int nargs; /* # of arguments */
struct list args; /* Arguments added when the SPOE messages is sent */
struct list list; /* Used to chain SPOE messages */
unsigned int stream_id; /* stream_id and frame_id are used */
unsigned int frame_id; /* to map NOTIFY and ACK frames */
unsigned int process_exp; /* expiration date to process an event */
+
+ struct {
+ struct spoe_appctx *spoe_appctx; /* SPOE appctx sending the fragmented frame */
+ struct spoe_message *curmsg; /* SPOE message from which to resume encoding */
+ struct spoe_arg *curarg; /* SPOE arg in <curmsg> from which to resume encoding */
+ unsigned int curoff; /* offset in <curarg> from which to resume encoding */
+ unsigned int flags; /* SPOE_FRM_FL_* */
+ } frag_ctx; /* Info about fragmented frames, valid on if SPOE_CTX_FL_FRAGMENTED is set */
};
/* SPOE context inside a appctx */
struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
struct list list; /* next spoe appctx for the same agent */
+
+ struct {
+ struct spoe_context *ctx; /* SPOE context owning the fragmented frame */
+ unsigned int cursid; /* stream-id of the fragmented frame. used if the processing is aborted */
+ unsigned int curfid; /* frame-id of the fragmented frame. used if the processing is aborted */
+ } frag_ctx; /* Info about fragmented frames, unused for unfragmented frames */
};
+
+/* Helper to get SPOE ctx inside an appctx */
#define SPOE_APPCTX(appctx) ((struct spoe_appctx *)((appctx)->ctx.spoe.ptr))
/* SPOE filter id. Used to identify SPOE filters */
}
static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
- [SPOE_FRM_ERR_NONE] = "normal",
- [SPOE_FRM_ERR_IO] = "I/O error",
- [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
- [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
- [SPOE_FRM_ERR_INVALID] = "invalid frame received",
- [SPOE_FRM_ERR_NO_VSN] = "version value not found",
- [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
- [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
- [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
- [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
- [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
+ [SPOE_FRM_ERR_NONE] = "normal",
+ [SPOE_FRM_ERR_IO] = "I/O error",
+ [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
+ [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
+ [SPOE_FRM_ERR_INVALID] = "invalid frame received",
+ [SPOE_FRM_ERR_NO_VSN] = "version value not found",
+ [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
+ [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
+ [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
+ [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
+ [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
+ [SPOE_FRM_ERR_INTERLACED_FRAMES] = "invalid interlaced frames",
+ [SPOE_FRM_ERR_RES] = "resource allocation error",
+ [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
};
static const char *spoe_event_str[SPOE_EV_EVENTS] = {
#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
- [SPOE_CTX_ST_NONE] = "NONE",
- [SPOE_CTX_ST_READY] = "READY",
- [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
- [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
- [SPOE_CTX_ST_DONE] = "DONE",
- [SPOE_CTX_ST_ERROR] = "ERROR",
+ [SPOE_CTX_ST_NONE] = "NONE",
+ [SPOE_CTX_ST_READY] = "READY",
+ [SPOE_CTX_ST_ENCODING_MSGS] = "ENCODING_MSGS",
+ [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
+ [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
+ [SPOE_CTX_ST_DONE] = "DONE",
+ [SPOE_CTX_ST_ERROR] = "ERROR",
};
static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
- [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
- [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
- [SPOE_APPCTX_ST_IDLE] = "IDLE",
- [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
- [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
- [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
- [SPOE_APPCTX_ST_EXIT] = "EXIT",
- [SPOE_APPCTX_ST_END] = "END",
+ [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
+ [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
+ [SPOE_APPCTX_ST_IDLE] = "IDLE",
+ [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
+ [SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY] = "SENDING_FRAG_NOTIFY",
+ [SPOE_APPCTX_ST_WAITING_SYNC_ACK] = "WAITING_SYNC_ACK",
+ [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
+ [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
+ [SPOE_APPCTX_ST_EXIT] = "EXIT",
+ [SPOE_APPCTX_ST_END] = "END",
};
#endif
double d;
int vsn = -1;
- memset(tmp, 0, len+1);
memcpy(tmp, str, len);
+ tmp[len] = 0;
start = tmp;
while (isspace(*start))
unsigned char *msg = (unsigned char *)buf;
int idx = 0;
- if (msg > (unsigned char *)end)
+ if (msg >= (unsigned char *)end)
return -1;
if (msg[0] < 240) {
*i = msg[0];
do {
++idx;
- if (msg+idx > (unsigned char *)end)
+ if (msg+idx >= (unsigned char *)end)
return -1;
*i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
} while (msg[idx] >= 128);
return (idx + len);
}
+/* Encode first part of a fragmented string. The string will be prefix by its
+ * length, encoded as a variable-length integer. This function never fails and
+ * returns the number of written bytes. */
+static int
+encode_frag_spoe_string(const char *str, size_t sz, size_t len, char *dst)
+{
+ int idx = 0;
+
+ if (!sz) {
+ dst[0] = 0;
+ return 1;
+ }
+
+ idx += encode_spoe_varint(sz, dst);
+ memcpy(dst+idx, str, len);
+ return (idx + len);
+}
+
/* Decode a string. Its length is decoded first as a variable-length integer. If
* it succeeds, and if the string length is valid, the begin of the string is
* saved in <*str>, its length is saved in <*len> and the total numbre of bytes
prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
{
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- int idx = 0;
- size_t max = (7 /* TYPE + METADATA */
- + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
- + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
- + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
- + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
+ unsigned int flags = SPOE_FRM_FL_FIN;
+ int idx = 0;
+ size_t max = (7 /* TYPE + METADATA */
+ + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
+ + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
+ + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
+ + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
if (size < max) {
spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
/* Frame type */
frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
- /* No flags for now */
- memset(frame+idx, 0, 4);
+ /* Set flags */
+ //flags = htonl(flags);
+ memcpy(frame+idx, (char *)&flags, 4);
idx += 4;
/* No stream-id and frame-id for HELLO frames */
prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
{
const char *reason;
- int rlen, idx = 0;
- size_t max = (7 /* TYPE + METADATA */
- + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
- + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
+ unsigned int flags = SPOE_FRM_FL_FIN;
+ int rlen, idx = 0;
+ size_t max = (7 /* TYPE + METADATA */
+ + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
+ + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
if (size < max)
return -1;
/* Frame type */
frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
- /* No flags for now */
- memset(frame+idx, 0, 4);
+ /* Set flags */
+ memcpy(frame+idx, (char *)&flags, 4);
idx += 4;
/* No stream-id and frame-id for DISCONNECT frames */
prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
char *frame, size_t size)
{
- int idx = 0;
+ int idx = 0;
+ unsigned int stream_id, frame_id, flags = SPOE_FRM_FL_FIN;
frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
- /* No flags for now */
- memset(frame+idx, 0, 4);
+ if (ctx == NULL) {
+ flags |= SPOE_FRM_FL_ABRT;
+ stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
+ frame_id = SPOE_APPCTX(appctx)->frag_ctx.curfid;
+ }
+ else {
+ stream_id = ctx->stream_id;
+ frame_id = ctx->frame_id;
+
+ if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
+ if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
+ spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ return 0;
+ }
+ flags = ctx->frag_ctx.flags;
+ }
+ }
+
+ /* Set flags */
+ memcpy(frame+idx, (char *)&flags, 4);
idx += 4;
/* Set stream-id and frame-id */
- idx += encode_spoe_varint(ctx->stream_id, frame+idx);
- idx += encode_spoe_varint(ctx->frame_id, frame+idx);
+ idx += encode_spoe_varint(stream_id, frame+idx);
+ idx += encode_spoe_varint(frame_id, frame+idx);
/* check the buffer size */
if (idx + SPOE_APPCTX(appctx)->buffer->i > size) {
/* Copy encoded messages */
memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i);
idx += SPOE_APPCTX(appctx)->buffer->i;
-
return idx;
}
static int
handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
{
- int vsn, max_frame_size, flags;
- int i, idx = 0;
- size_t min_size = (7 /* TYPE + METADATA */
- + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
- + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
- + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
+ int vsn, max_frame_size, i, idx = 0;
+ unsigned int flags;
+ size_t min_size = (7 /* TYPE + METADATA */
+ + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
+ + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
+ + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
/* Check frame type */
if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
return -1;
}
- /* Skip flags: fragmentation is not supported for now */
+ /* Retrieve flags */
+ memcpy((char *)&flags, frame+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for HELLO frame */
+ if (!(flags & SPOE_FRM_FL_FIN)) {
+ spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ return -1;
+ }
+
/* stream-id and frame-id must be cleared */
if (frame[idx] != 0 || frame[idx+1] != 0) {
spoe_status_code = SPOE_FRM_ERR_INVALID;
if (sz == i || isspace(str[i]) || str[i] == ',')
flags |= SPOE_APPCTX_FL_ASYNC;
}
+ else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) {
+ i += 13;
+ if (sz == i || isspace(str[i]) || str[i] == ',')
+ flags |= SPOE_APPCTX_FL_FRAGMENTATION;
+ }
if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
break;
static int
handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
{
- int i, idx = 0;
- size_t min_size = (7 /* TYPE + METADATA */
- + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
- + 1 + SLEN(MSG_KEY) + 1 + 1);
+ int i, idx = 0;
+ unsigned int flags;
+ size_t min_size = (7 /* TYPE + METADATA */
+ + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
+ + 1 + SLEN(MSG_KEY) + 1 + 1);
/* Check frame type */
if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
return -1;
}
- /* Skip flags: fragmentation is not supported for now */
+ /* Retrieve flags */
+ memcpy((char *)&flags, frame+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for DISCONNECT frame */
+ if (!(flags & SPOE_FRM_FL_FIN)) {
+ spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ return -1;
+ }
+
/* stream-id and frame-id must be cleared */
if (frame[idx] != 0 || frame[idx+1] != 0) {
spoe_status_code = SPOE_FRM_ERR_INVALID;
/* Decode ACK frame sent by an agent. It returns the number of read bytes on
* success, 0 if the frame can be ignored and -1 if an error occurred. */
static int
-handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
+handle_spoe_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
+ char *frame, size_t size)
{
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- struct spoe_context *ctx, *back;
- uint64_t stream_id, frame_id;
- int i, idx = 0;
- size_t min_size = (7 /* TYPE + METADATA */);
+ struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ uint64_t stream_id, frame_id;
+ int i, idx = 0;
+ unsigned int flags;
+ size_t min_size = (7 /* TYPE + METADATA */);
/* Check frame type */
if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
return -1;
}
- /* Skip flags: fragmentation is not supported for now */
+ /* Retrieve flags */
+ memcpy((char *)&flags, frame+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for now */
+ if (!(flags & SPOE_FRM_FL_FIN)) {
+ spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ return -1;
+ }
+
/* Get the stream-id and the frame-id */
if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1)
return 0;
idx += i;
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
- list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
- if (ctx->stream_id == (unsigned int)stream_id &&
- ctx->frame_id == (unsigned int)frame_id)
+ list_for_each_entry((*ctx), &agent->waiting_queue, list) {
+ if ((*ctx)->stream_id == (unsigned int)stream_id &&
+ (*ctx)->frame_id == (unsigned int)frame_id)
goto found;
}
}
else {
- list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) {
- if (ctx->stream_id == (unsigned int)stream_id &&
- ctx->frame_id == (unsigned int)frame_id)
+ list_for_each_entry((*ctx), &SPOE_APPCTX(appctx)->waiting_queue, list) {
+ if ((*ctx)->stream_id == (unsigned int)stream_id &&
+ (*ctx)->frame_id == (unsigned int)frame_id)
goto found;
}
}
+ /* FIXME: check if ABRT bit is set for a unfinished fragmented frame */
+
/* No Stream found, ignore the frame */
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Ignore ACK frame"
+ " - stream-id=%u - frame-id=%u\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx,
+ (unsigned int)stream_id, (unsigned int)frame_id);
+
+ *ctx = NULL;
return 0;
found:
- if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait))
+ if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait)) {
+ *ctx = NULL;
return 1; /* Retry later */
-
- /* Transfer the buffer ownership to the SPOE context */
- ctx->buffer = SPOE_APPCTX(appctx)->buffer;
- SPOE_APPCTX(appctx)->buffer = &buf_empty;
+ }
/* Copy encoded actions */
- memcpy(ctx->buffer->p, frame+idx, size-idx);
- ctx->buffer->i = size-idx;
+ memcpy(SPOE_APPCTX(appctx)->buffer->p, frame+idx, size-idx);
+ SPOE_APPCTX(appctx)->buffer->i = size-idx;
- /* Notify the stream */
- LIST_DEL(&ctx->list);
- LIST_INIT(&ctx->list);
- ctx->state = SPOE_CTX_ST_DONE;
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ /* Transfer the buffer ownership to the SPOE context */
+ (*ctx)->buffer = SPOE_APPCTX(appctx)->buffer;
+ SPOE_APPCTX(appctx)->buffer = &buf_empty;
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - ACK frame received - ctx=%p - stream-id=%u - frame-id=%u\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx,
+ *ctx, (*ctx)->stream_id, (*ctx)->frame_id);
return idx;
}
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
+ if (SPOE_APPCTX(appctx)->frag_ctx.ctx) {
+ ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
+ ctx->frag_ctx.spoe_appctx = NULL;
+ ctx->state = SPOE_CTX_ST_ERROR;
+ ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ }
+
release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- struct spoe_context *ctx;
+ struct spoe_context *ctx = NULL;
char *frame = trash.str;
unsigned int fpa = 0;
int ret, framesz = 0, skip_sending = 0, skip_receiving = 0;
}
process:
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - process: fpa=%u/%u - skip_sending=%d - skip_receiving=%d"
+ " - appctx-state=%s\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx, fpa, agent->max_fpa,
+ skip_sending, skip_receiving, spoe_appctx_state_str[appctx->st0]);
+
if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
goto stop;
-
- /* Frames must be handled synchronously and a the applet is waiting for
- * a ACK frame */
- if (!(SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
- !LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
+ else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
if (skip_receiving)
goto stop;
goto recv_frame;
}
-
- if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) {
+ else if (skip_sending)
+ goto recv_frame;
+ else if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
+ ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
+ goto send_frame;
+ }
+ else if (LIST_ISEMPTY(&agent->sending_queue)) {
skip_sending = 1;
goto recv_frame;
}
-
ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+ send_frame:
/* Transfer the buffer ownership to the SPOE appctx */
- SPOE_APPCTX(appctx)->buffer = ctx->buffer;
- ctx->buffer = &buf_empty;
+ if (ctx) {
+ SPOE_APPCTX(appctx)->buffer = ctx->buffer;
+ ctx->buffer = &buf_empty;
+ }
ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
if (ret > 1)
goto next;
case 0: /* ignore */
+ release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
agent->sending_rate++;
+ fpa++;
+
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_status_code + 0x100);
- release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- LIST_DEL(&ctx->list);
- LIST_INIT(&ctx->list);
- fpa++;
break;
case 1: /* retry */
break;
default:
- agent->sending_rate++;
- ctx->state = SPOE_CTX_ST_WAITING_ACK;
release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
+ agent->sending_rate++;
+ fpa++;
+
+ if (ctx == NULL) {
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
+ SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
+ SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
+ break;
+ }
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
- if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC)
- LIST_ADDQ(&agent->waiting_queue, &ctx->list);
- else
- LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
- fpa++;
+
+ if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
+ if (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN) {
+ if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+ }
+ else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
+ }
+ else {
+ appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
+ LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
+ }
+ SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
+ SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
+ SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
+
+ ctx->frag_ctx.spoe_appctx = NULL;
+ ctx->state = SPOE_CTX_ST_WAITING_ACK;
+ }
+ else {
+ appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY;
+ SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx;
+ SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
+ SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
+
+ ctx->frag_ctx.spoe_appctx = SPOE_APPCTX(appctx);
+ ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ skip_sending = 1;
+ }
+ }
+ else {
+ if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+ }
+ else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
+ }
+ else {
+ appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
+ LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
+ }
+
+ ctx->state = SPOE_CTX_ST_WAITING_ACK;
+ }
}
if (fpa > agent->max_fpa)
goto next;
}
framesz = ret;
- ret = handle_spoe_agentack_frame(appctx, frame, framesz);
+ ret = handle_spoe_agentack_frame(appctx, &ctx, frame, framesz);
}
-
switch (ret) {
case -1: /* error */
if (framesz)
if (framesz)
bo_skip(si_oc(si), framesz+4);
fpa++;
+
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+
+ if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+
+ ctx->state = SPOE_CTX_ST_DONE;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
goto process;
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
stop:
- if ((SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
- LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
- agent->applets_idle++;
+ if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
appctx->st0 = SPOE_APPCTX_ST_IDLE;
+ agent->applets_idle++;
}
if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
LIST_DEL(&SPOE_APPCTX(appctx)->list);
/* fall through */
case SPOE_APPCTX_ST_PROCESSING:
+ case SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY:
+ case SPOE_APPCTX_ST_WAITING_SYNC_ACK:
if (handle_processing_spoe_applet(appctx))
goto out;
goto switchstate;
appctx->ctx.spoe.ptr = pool_alloc_dirty(pool2_spoe_appctx);
if (SPOE_APPCTX(appctx) == NULL)
goto out_free_appctx;
+ memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL)
end:
/* The only reason to return an error is when there is no applet */
- if (LIST_ISEMPTY(&agent->applets))
- return 0;
+ if (LIST_ISEMPTY(&agent->applets)) {
+ ctx->status_code = SPOE_CTX_ERR_RES;
+ return -1;
+ }
/* Add the SPOE context in the sending queue and update all running
* info */
}
/***************************************************************************
- * Functions that process SPOE messages and actions
+ * Functions that encode SPOE messages
**************************************************************************/
-/* Process SPOE messages for a specific event. During the processing, it returns
+static inline int
+encode_spoe_arg_string(struct spoe_context *ctx, struct sample *smp,
+ char *p, size_t max_size)
+{
+ struct chunk *chk = &smp->data.u.str;
+ int idx = 0;
+
+ /* Here, we need to know if the sample has already been partially
+ * encoded. If yes, we only need to encode the remaining, <curoff>
+ * reprensenting the number of bytes already encoded in previous
+ * frames. Else, <curoff> == 0 */
+
+ if (!ctx->frag_ctx.curoff) {
+ /* First evaluation of the sample : encode the type (string or
+ * binary) and check its size against <max_size> */
+
+ /* the string/binary length must not exceed 4 Gb. So 5 bytes is
+ * reserved to encode its size. */
+ if (max_size < 6)
+ return 0;
+
+ p[idx++] = (smp->data.type == SMP_T_STR) ? SPOE_DATA_T_STR : SPOE_DATA_T_BIN;
+ max_size -= (idx + 5);
+
+ if (chk->len > max_size) {
+ /* The sample is too big, we will fragment it. <curoff>
+ * will be updated accordingly. */
+ idx += encode_frag_spoe_string(chk->str, chk->len, max_size, p+idx);
+ ctx->frag_ctx.curoff = max_size;
+ }
+ else {
+ /* No fragmentation needed, all the sample is encoded
+ * and <curoff> remains 0 */
+ idx += encode_spoe_string(chk->str, chk->len, p+idx);
+ }
+ }
+ else {
+ /* Continue the sample fragmentation, the type was already set
+ * in a previous frame. So just do a copy of data. */
+
+ idx = chk->len - ctx->frag_ctx.curoff; /* Remaining data */
+ if (idx > max_size) {
+ /* The sample still needs to be fragmented. <curoff>
+ * will be incremented accordingly. */
+ memcpy(p, chk->str + ctx->frag_ctx.curoff, max_size);
+ idx = max_size;
+ ctx->frag_ctx.curoff += max_size;
+ }
+ else {
+ /* Finish the fragmentation. <curoff> will be reset. */
+ memcpy(p, chk->str + ctx->frag_ctx.curoff, idx);
+ ctx->frag_ctx.curoff = 0;
+ }
+ }
+ return idx;
+}
+
+static inline int
+encode_spoe_arg_method(struct spoe_context *ctx, struct sample *smp,
+ char *p, size_t max_size)
+{
+ int idx = 0;
+
+ /* method length must not exceed 2288 bytes. So 3 bytes is reserved to
+ * encode its size. */
+
+ if (smp->data.u.meth.meth != HTTP_METH_OTHER) {
+ const struct http_method_name *meth =
+ &http_known_methods[smp->data.u.meth.meth];
+
+ if (meth->len + 3 > max_size)
+ return 0;
+ p[idx++] = SPOE_DATA_T_STR;
+ idx += encode_spoe_string(meth->name, meth->len, p+idx);
+ }
+ else {
+ struct chunk *meth = &smp->data.u.meth.str;
+
+ if (meth->len + 3 > max_size)
+ return 0;
+ p[idx++] = SPOE_DATA_T_STR;
+ idx += encode_spoe_string(meth->str, meth->len, p+idx);
+ }
+ return idx;
+}
+
+static inline int
+encode_spoe_arg_ipv6(struct spoe_context *ctx, struct sample *smp,
+ char *p, size_t max_size)
+{
+ int idx = 0;
+
+ if (max_size < 17)
+ return 0;
+ p[idx++] = SPOE_DATA_T_IPV6;
+ memcpy(p+idx, &smp->data.u.ipv6, 16);
+ idx += 16;
+ return idx;
+}
+
+
+static inline int
+encode_spoe_arg_ipv4(struct spoe_context *ctx, struct sample *smp,
+ char *p, size_t max_size)
+{
+ int idx = 0;
+
+ if (max_size < 5)
+ return 0;
+ p[idx++] = SPOE_DATA_T_IPV4;
+ memcpy(p+idx, &smp->data.u.ipv6, 4);
+ idx += 4;
+ return idx;
+}
+
+static inline int
+encode_spoe_arg_sint(struct spoe_context *ctx, struct sample *smp,
+ char *p, size_t max_size)
+{
+ int idx = 0;
+
+ if (max_size < 9)
+ return 0;
+ p[idx++] = SPOE_DATA_T_INT64;
+ idx += encode_spoe_varint(smp->data.u.sint, p+idx);
+
+ return idx;
+}
+
+static inline int
+encode_spoe_arg_bool(struct spoe_context *ctx, struct sample *smp,
+ char *p, size_t max_size)
+{
+ int flag, idx = 0;
+
+ if (max_size < 1)
+ return 0;
+ flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
+ p[idx++] = (SPOE_DATA_T_BOOL | flag);
+
+ return idx;
+}
+
+/* Encode SPOE messages for a specific event.
+ *
+ *
+ * It returns 0 if During the processing, it returns
* 0 and it returns 1 when the processing is finished. If an error occurred, -1
* is returned. */
static int
-process_spoe_messages(struct stream *s, struct spoe_context *ctx,
- struct list *messages, int dir)
+encode_spoe_messages(struct stream *s, struct spoe_context *ctx,
+ struct list *messages, int dir)
{
struct spoe_config *conf = FLT_CONF(ctx->filter);
struct spoe_agent *agent = conf->agent;
struct spoe_arg *arg;
char *p;
size_t max_size;
- int off, flag, idx = 0;
+ int r, idx = 0;
max_size = agent->frame_size - FRAME_HDR_SIZE;
p = ctx->buffer->p;
+ /* Resume encoding of a SPOE message */
+ if (ctx->frag_ctx.curmsg != NULL) {
+ msg = ctx->frag_ctx.curmsg;
+ goto encode_message;
+ }
+
/* Loop on messages */
list_for_each_entry(msg, messages, list) {
- if (idx + msg->id_len + 1 > max_size)
- goto skip;
+ ctx->frag_ctx.curmsg = msg;
+ ctx->frag_ctx.curarg = NULL;
+ ctx->frag_ctx.curoff = UINT_MAX;
+
+ encode_message:
+ /* Resume encoding of a SPOE argument */
+ if (ctx->frag_ctx.curarg != NULL) {
+ arg = ctx->frag_ctx.curarg;
+ goto encode_argument;
+ }
+
+ if (ctx->frag_ctx.curoff != UINT_MAX)
+ goto encode_msg_payload;
+
+ /* <idx> + <string> + <nb-args>.
+ * Implies <id_len> is encoded on 2 bytes, at most (< 2288). */
+ if (idx + 3 + msg->id_len + 1 > max_size)
+ goto too_big;
/* Set the message name */
idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
- /* Save offset where to store the number of arguments for this
- * message */
- off = idx++;
- p[off] = 0;
+ /* Store the number of arguments for this message */
+ p[idx++] = msg->nargs;
+
+ ctx->frag_ctx.curoff = 0;
+ encode_msg_payload:
/* Loop on arguments */
list_for_each_entry(arg, &msg->args, list) {
- p[off]++; /* Increment the number of arguments */
+ ctx->frag_ctx.curarg = arg;
+ ctx->frag_ctx.curoff = UINT_MAX;
- if (idx + arg->name_len + 1 > max_size)
- goto skip;
+ encode_argument:
+ if (ctx->frag_ctx.curoff != UINT_MAX)
+ goto encode_arg_value;
+
+ /* <idx> + <string>.
+ * Implies <name_len> is encoded on 2 bytes, at most (< 2288). */
+ if (idx + 3 + arg->name_len > max_size)
+ goto too_big;
/* Encode the arguement name as a string. It can by NULL */
idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
+ ctx->frag_ctx.curoff = 0;
+ encode_arg_value:
+
+ if (idx + 1 > max_size)
+ goto too_big;
+
/* Fetch the arguement value */
smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
if (!smp) {
/* Else, encode the arguement value */
switch (smp->data.type) {
case SMP_T_BOOL:
- flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
- p[idx++] = (SPOE_DATA_T_BOOL | flag);
+ if (!(r = encode_spoe_arg_bool(ctx, smp, p+idx, max_size-idx)))
+ goto too_big;
+ idx += r;
break;
+
case SMP_T_SINT:
- p[idx++] = SPOE_DATA_T_INT64;
- if (idx + 8 > max_size)
- goto skip;
- idx += encode_spoe_varint(smp->data.u.sint, p+idx);
+ if (!(r = encode_spoe_arg_sint(ctx, smp, p+idx, max_size-idx)))
+ goto too_big;
+ idx += r;
break;
+
case SMP_T_IPV4:
- p[idx++] = SPOE_DATA_T_IPV4;
- if (idx + 4 > max_size)
- goto skip;
- memcpy(p+idx, &smp->data.u.ipv4, 4);
- idx += 4;
+ if (!(r = encode_spoe_arg_ipv4(ctx, smp, p+idx, max_size-idx)))
+ goto too_big;
+ idx += r;
break;
+
case SMP_T_IPV6:
- p[idx++] = SPOE_DATA_T_IPV6;
- if (idx + 16 > max_size)
- goto skip;
- memcpy(p+idx, &smp->data.u.ipv6, 16);
- idx += 16;
+ if (!(r = encode_spoe_arg_ipv6(ctx, smp, p+idx, max_size-idx)))
+ goto too_big;
+ idx += r;
break;
+
case SMP_T_STR:
- p[idx++] = SPOE_DATA_T_STR;
- if (idx + smp->data.u.str.len > max_size)
- goto skip;
- idx += encode_spoe_string(smp->data.u.str.str,
- smp->data.u.str.len,
- p+idx);
- break;
case SMP_T_BIN:
- p[idx++] = SPOE_DATA_T_BIN;
- if (idx + smp->data.u.str.len > max_size)
- goto skip;
- idx += encode_spoe_string(smp->data.u.str.str,
- smp->data.u.str.len,
- p+idx);
+ idx += encode_spoe_arg_string(ctx, smp, p+idx, max_size-idx);
+ if (ctx->frag_ctx.curoff)
+ goto too_big;
break;
+
case SMP_T_METH:
- if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
- p[idx++] = SPOE_DATA_T_STR;
- if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
- goto skip;
- idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
- http_known_methods[smp->data.u.meth.meth].len,
- p+idx);
- }
- else {
- p[idx++] = SPOE_DATA_T_STR;
- if (idx + smp->data.u.str.len > max_size)
- goto skip;
- idx += encode_spoe_string(smp->data.u.meth.str.str,
- smp->data.u.meth.str.len,
- p+idx);
- }
+ if (!(r = encode_spoe_arg_method(ctx, smp, p+idx, max_size-idx)))
+ goto too_big;
+ idx += r;
break;
+
default:
p[idx++] = SPOE_DATA_T_NULL;
}
}
}
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - encode %s messages - spoe_appctx=%p - max_size=%lu - idx=%u\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ agent->id, __FUNCTION__, s,
+ ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
+ ctx->frag_ctx.spoe_appctx, max_size, idx);
+
ctx->buffer->i = idx;
+ ctx->frag_ctx.curmsg = NULL;
+ ctx->frag_ctx.curarg = NULL;
+ ctx->frag_ctx.curoff = 0;
+ ctx->frag_ctx.flags = SPOE_FRM_FL_FIN;
return 1;
- skip:
- return 0;
+ too_big:
+ // FIXME: if fragmentation not supported =>
+ // ctx->status_code = SPOE_CTX_ERR_TOO_BIG;
+ // return -1;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - encode fragmented messages - spoe_appctx=%p - curmsg=%p - curarg=%p - curoff=%u"
+ " - max_size=%lu - idx=%u\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx,
+ ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
+ max_size, idx);
+
+ ctx->buffer->i = idx;
+ ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
+ ctx->frag_ctx.flags &= ~SPOE_FRM_FL_FIN;
+ return 1;
}
+
+/***************************************************************************
+ * Functions that handle SPOE actions
+ **************************************************************************/
/* Helper function to set a variable */
static void
set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
return 0;
}
-static int
+/***************************************************************************
+ * Functions that process SPOE events
+ **************************************************************************/
+static inline int
start_event_processing(struct spoe_context *ctx, int dir)
{
/* If a process is already started for this SPOE context, retry
* later. */
if (ctx->flags & SPOE_CTX_FL_PROCESS)
- goto wait;
-
- if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait))
- goto wait;
+ return 0;
/* Set the right flag to prevent request and response processing
* in same time. */
ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
? SPOE_CTX_FL_REQ_PROCESS
: SPOE_CTX_FL_RSP_PROCESS);
-
return 1;
-
- wait:
- return 0;
}
-static void
+static inline void
stop_event_processing(struct spoe_context *ctx)
{
+ struct spoe_appctx *sa = ctx->frag_ctx.spoe_appctx;
+
+ if (sa) {
+ sa->frag_ctx.ctx = NULL;
+ wakeup_spoe_appctx(sa->owner);
+ }
+
/* Reset the flag to allow next processing */
- ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+ ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
ctx->status_code = 0;
release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
+ ctx->frag_ctx.spoe_appctx = NULL;
+ ctx->frag_ctx.curmsg = NULL;
+ ctx->frag_ctx.curarg = NULL;
+ ctx->frag_ctx.curoff = 0;
+ ctx->frag_ctx.flags = 0;
+
if (!LIST_ISEMPTY(&ctx->list)) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
spoe_event_str[ev]);
-
dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
if (LIST_ISEMPTY(&(ctx->messages[ev])))
ret = start_event_processing(ctx, dir);
if (!ret)
goto out;
- ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
- if (!ret)
- goto skip;
- if (!queue_spoe_context(ctx)) {
- ctx->status_code = SPOE_CTX_ERR_RES;
+ if (queue_spoe_context(ctx) < 0)
goto error;
- }
- ctx->state = SPOE_CTX_ST_SENDING_MSGS;
+ ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
/* fall through */
}
- if (ctx->state == SPOE_CTX_ST_SENDING_MSGS ||
- ctx->state == SPOE_CTX_ST_WAITING_ACK) {
+ if (ctx->state == SPOE_CTX_ST_ENCODING_MSGS) {
+ if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait))
+ goto out;
+ ret = encode_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
+ if (ret < 0)
+ goto error;
+ ctx->state = SPOE_CTX_ST_SENDING_MSGS;
+ }
+
+ if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
+ if (ctx->frag_ctx.spoe_appctx)
+ wakeup_spoe_appctx(ctx->frag_ctx.spoe_appctx->owner);
+ ret = 0;
+ goto out;
+ }
+
+ if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
ret = 0;
goto out;
}
if (agent->var_on_error) {
struct sample smp;
- // FIXME: Get the error code here
memset(&smp, 0, sizeof(smp));
smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
smp.data.u.sint = ctx->status_code;
set_spoe_var(ctx, "txn", agent->var_on_error,
strlen(agent->var_on_error), &smp);
}
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - failed to create process event '%s': code=%u\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm, spoe_event_str[ev],
+ ctx->status_code);
send_log(ctx->strm->be, LOG_WARNING,
"SPOE: [%s] failed to process event '%s': code=%u\n",
agent->id, spoe_event_str[ev], ctx->status_code);
curmsg->event = SPOE_EV_NONE;
curmsg->conf.file = strdup(file);
curmsg->conf.line = linenum;
+ curmsg->nargs = 0;
LIST_INIT(&curmsg->args);
LIST_ADDQ(&curmsgs, &curmsg->list);
}
free(arg);
goto out;
}
+ curmsg->nargs++;
LIST_ADDQ(&curmsg->args, &arg->list);
cur_arg++;
}