/* Config info */
char *engine_id; /* engine-id string */
char *var_pfx; /* Prefix used for vars set by the agent */
- char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
+ char *var_on_error; /* Variable to set when an error occurred, in the TXN scope */
unsigned int flags; /* SPOE_FL_* */
unsigned int cps_max; /* Maximum # of connections per second */
unsigned int eps_max; /* Maximum # of errors per second */
unsigned int flags; /* SPOE_APPCTX_FL_* */
unsigned int status_code; /* SPOE_FRM_ERR_* */
+#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
+ char *reason; /* Error message, used for debugging only */
+ int rlen; /* reason length */
+#endif
+
struct buffer *buffer; /* Buffer used to store a encoded messages */
struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
static struct pool_head *pool2_spoe_ctx = NULL;
static struct pool_head *pool2_spoe_appctx = NULL;
-/* Temporary variables used to ease error processing */
-int spoe_status_code = SPOE_FRM_ERR_NONE;
-char spoe_reason[256];
-
struct flt_ops spoe_ops;
-static int queue_spoe_context(struct spoe_context *ctx);
-static int acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
-static void release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
+static int spoe_queue_context(struct spoe_context *ctx);
+static int spoe_acquire_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
+static void spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
/********************************************************************
* helper functions/globals
********************************************************************/
static void
-release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
+spoe_release_msg_placeholder(struct spoe_msg_placeholder *mp)
{
if (!mp)
return;
static void
-release_spoe_message(struct spoe_message *msg)
+spoe_release_message(struct spoe_message *msg)
{
struct spoe_arg *arg, *back;
}
static void
-release_spoe_agent(struct spoe_agent *agent)
+spoe_release_agent(struct spoe_agent *agent)
{
struct spoe_message *msg, *back;
int i;
for (i = 0; i < SPOE_EV_EVENTS; ++i) {
list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
LIST_DEL(&msg->list);
- release_spoe_message(msg);
+ spoe_release_message(msg);
}
}
free(agent);
#endif
+/* Used to generates a unique id for an engine. On success, it returns a
+ * allocated string. So it is the caller's reponsibility to release it. If the
+ * allocation failed, it returns NULL. */
static char *
generate_pseudo_uuid()
{
return uuid;
}
+/* Returns the minimum number of appets alive at a time. This function is used
+ * to know if more applets should be created for an engine. */
static inline unsigned int
min_applets_act(struct spoe_agent *agent)
{
unsigned int nbsrv;
+ /* TODO: Add a config parameter to customize this value. Always 0 for
+ * now */
if (agent->min_applets)
return agent->min_applets;
- nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck);
+ /* Get the number of active servers for the backend */
+ nbsrv = (agent->b.be->srv_act
+ ? agent->b.be->srv_act
+ : agent->b.be->srv_bck);
return 2*nbsrv;
}
#define SUPPORTED_VERSIONS_VAL "1.0"
/* Comma-separated list of supported capabilities (none for now) */
-//#define CAPABILITIES_VAL ""
#define CAPABILITIES_VAL "pipelining,async"
+/* Convert a string to a SPOE version value. The string must follow the format
+ * "MAJOR.MINOR". It will be concerted into the integer (1000 * MAJOR + MINOR).
+ * If an error occurred, -1 is returned. */
static int
-decode_spoe_version(const char *str, size_t len)
+spoe_str_to_vsn(const char *str, size_t len)
{
- char tmp[len+1], *start, *end;
- double d;
- int vsn = -1;
+ const char *p, *end;
+ int maj, min, vsn;
- memcpy(tmp, str, len);
- tmp[len] = 0;
+ p = str;
+ end = str+len;
+ maj = min = 0;
+ vsn = -1;
- start = tmp;
- while (isspace(*start))
- start++;
+ /* skip leading spaces */
+ while (p < end && isspace(*p))
+ p++;
- d = strtod(start, &end);
- if (d == 0 || start == end)
+ /* parse Major number, until the '.' */
+ while (*p != '.') {
+ if (p >= end || *p < '0' || *p > '9')
+ goto out;
+ maj *= 10;
+ maj += (*p - '0');
+ p++;
+ }
+
+ /* check Major version */
+ if (!maj)
goto out;
- if (*end) {
- while (isspace(*end))
- end++;
- if (*end)
- goto out;
+ p++; /* skip the '.' */
+ if (p >= end || *p < '0' || *p > '9') /* Minor number is missing */
+ goto out;
+
+ /* Parse Minor number */
+ while (p < end) {
+ if (*p < '0' || *p > '9')
+ break;
+ min *= 10;
+ min += (*p - '0');
+ p++;
}
- vsn = (int)(d * 1000);
+
+ /* check Minor number */
+ if (min > 999)
+ goto out;
+
+ /* skip trailing spaces */
+ while (p < end && isspace(*p))
+ p++;
+ if (p != end)
+ goto out;
+
+ vsn = maj * 1000 + min;
out:
return vsn;
}
-/* Encode a variable-length integer. This function never fails and returns the
- * number of written bytes. */
+/* Encode the integer <i> into a varint (variable-length integer). The encoded
+ * value is copied in <*buf>. Here is the encoding format:
+ *
+ * 0 <= X < 240 : 1 byte (7.875 bits) [ XXXX XXXX ]
+ * 240 <= X < 2288 : 2 bytes (11 bits) [ 1111 XXXX ] [ 0XXX XXXX ]
+ * 2288 <= X < 264432 : 3 bytes (18 bits) [ 1111 XXXX ] [ 1XXX XXXX ] [ 0XXX XXXX ]
+ * 264432 <= X < 33818864 : 4 bytes (25 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*2 [ 0XXX XXXX ]
+ * 33818864 <= X < 4328786160 : 5 bytes (32 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*3 [ 0XXX XXXX ]
+ * ...
+ *
+ * On success, it returns the number of written bytes and <*buf> is moved after
+ * the encoded value. Otherwise, it returns -1. */
static int
-encode_spoe_varint(uint64_t i, char *buf)
+spoe_encode_varint(uint64_t i, char **buf, char *end)
{
- int idx;
+ unsigned char *p = (unsigned char *)*buf;
+ int r;
+
+ if (p >= (unsigned char *)end)
+ return -1;
if (i < 240) {
- buf[0] = (unsigned char)i;
+ *p++ = i;
+ *buf = (char *)p;
return 1;
}
- buf[0] = (unsigned char)i | 240;
+ *p++ = (unsigned char)i | 240;
i = (i - 240) >> 4;
- for (idx = 1; i >= 128; ++idx) {
- buf[idx] = (unsigned char)i | 128;
+ while (i >= 128) {
+ if (p >= (unsigned char *)end)
+ return -1;
+ *p++ = (unsigned char)i | 128;
i = (i - 128) >> 7;
}
- buf[idx++] = (unsigned char)i;
- return idx;
+
+ if (p >= (unsigned char *)end)
+ return -1;
+ *p++ = (unsigned char)i;
+
+ r = ((char *)p - *buf);
+ *buf = (char *)p;
+ return r;
}
-/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
- * happens when the buffer's end in reached. On success, the number of read
- * bytes is returned. */
+/* Decode a varint from <*buf> and save the decoded value in <*i>. See
+ * 'spoe_encode_varint' for details about varint.
+ * On success, it returns the number of read bytes and <*buf> is moved after the
+ * varint. Otherwise, it returns -1. */
static int
-decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
+spoe_decode_varint(char **buf, char *end, uint64_t *i)
{
- unsigned char *msg = (unsigned char *)buf;
- int idx = 0;
+ unsigned char *p = (unsigned char *)*buf;
+ int r;
- if (msg >= (unsigned char *)end)
+ if (p >= (unsigned char *)end)
return -1;
- if (msg[0] < 240) {
- *i = msg[0];
+ *i = *p++;
+ if (*i < 240) {
+ *buf = (char *)p;
return 1;
}
- *i = msg[0];
+
+ r = 4;
do {
- ++idx;
- if (msg+idx >= (unsigned char *)end)
+ if (p >= (unsigned char *)end)
return -1;
- *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
- } while (msg[idx] >= 128);
- return (idx + 1);
+ *i += (uint64_t)*p << r;
+ r += 7;
+ } while (*p++ >= 128);
+
+ r = ((char *)p - *buf);
+ *buf = (char *)p;
+ return r;
}
-/* Encode a string. The string will be prefix by its length, encoded as a
- * variable-length integer. This function never fails and returns the number of
- * written bytes. */
+/* Encode a buffer. Its length <len> is encoded as a varint, followed by a copy
+ * of <str>. It must have enough space in <*buf> to encode the buffer, else an
+ * error is triggered.
+ * On success, it returns <len> and <*buf> is moved after the encoded value. If
+ * an error occurred, it returns -1. */
static int
-encode_spoe_string(const char *str, size_t len, char *dst)
+spoe_encode_buffer(const char *str, size_t len, char **buf, char *end)
{
- int idx = 0;
+ char *p = *buf;
+ int ret;
+
+ if (p >= end)
+ return -1;
if (!len) {
- dst[0] = 0;
- return 1;
+ *p++ = 0;
+ *buf = p;
+ return 0;
}
- idx += encode_spoe_varint(len, dst);
- memcpy(dst+idx, str, len);
- return (idx + len);
+ ret = spoe_encode_varint(len, &p, end);
+ if (ret == -1 || p + len > end)
+ return -1;
+
+ memcpy(p, str, len);
+ *buf = p + len;
+ return len;
}
-/* Encode first part of a fragmented string. The string will be prefix by its
- * length, encoded as a variable-length integer. This function never fails and
- * returns the number of written bytes. */
+/* Encode a buffer, possibly partially. It does the same thing than
+ * 'spoe_encode_buffer', but if there is not enough space, it does not fail.
+ * On success, it returns the number of copied bytes and <*buf> is moved after
+ * the encoded value. If an error occured, it returns -1. */
static int
-encode_frag_spoe_string(const char *str, size_t sz, size_t len, char *dst)
+spoe_encode_frag_buffer(const char *str, size_t len, char **buf, char *end)
{
- int idx = 0;
+ char *p = *buf;
+ int ret;
- if (!sz) {
- dst[0] = 0;
- return 1;
+ if (p >= end)
+ return -1;
+
+ if (!len) {
+ *p++ = 0;
+ *buf = p;
+ return 0;
}
- idx += encode_spoe_varint(sz, dst);
- memcpy(dst+idx, str, len);
- return (idx + len);
+ ret = spoe_encode_varint(len, &p, end);
+ if (ret == -1 || p >= end)
+ return -1;
+
+ ret = (p+len < end) ? len : (end - p);
+ memcpy(p, str, ret);
+ *buf = p + ret;
+ return ret;
}
-/* Decode a string. Its length is decoded first as a variable-length integer. If
- * it succeeds, and if the string length is valid, the begin of the string is
- * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
- * read is returned. If an error occurred, -1 is returned and <*str> remains
- * NULL. */
+/* Decode a buffer. The buffer length is decoded and saved in <*len>. <*str>
+ * points on the first byte of the buffer.
+ * On success, it returns the buffer length and <*buf> is moved after the
+ * encoded buffer. Otherwise, it returns -1. */
static int
-decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
+spoe_decode_buffer(char **buf, char *end, char **str, size_t *len)
{
- int i, idx = 0;
+ char *p = *buf;
+ uint64_t sz;
+ int ret;
*str = NULL;
*len = 0;
- if ((i = decode_spoe_varint(buf, end, len)) == -1)
- goto error;
- idx += i;
- if (buf + idx + *len > end)
- goto error;
+ ret = spoe_decode_varint(&p, end, &sz);
+ if (ret == -1 || p + sz > end)
+ return -1;
- *str = buf+idx;
- return (idx + *len);
+ *str = p;
+ *len = sz;
+ *buf = p + sz;
+ return sz;
+}
- error:
- return -1;
+/* Encode a typed data using value in <smp>. On success, it returns the number
+ * of copied bytes and <*buf> is moved after the encoded value. If an error
+ * occured, it returns -1.
+ *
+ * If the value is too big to be encoded, depending on its type, then encoding
+ * failed or the value is partially encoded. Only strings and binaries can be
+ * partially encoded. In this case, the offset <*off> is updated to known how
+ * many bytes has been encoded. If <*off> is zero at the end, it means that all
+ * data has been encoded. */
+static int
+spoe_encode_data(struct sample *smp, unsigned int *off, char **buf, char *end)
+{
+ char *p = *buf;
+ int ret;
+
+ if (p >= end)
+ return -1;
+
+ if (smp == NULL) {
+ *p++ = SPOE_DATA_T_NULL;
+ goto end;
+ }
+
+ switch (smp->data.type) {
+ case SMP_T_BOOL:
+ *p = SPOE_DATA_T_BOOL;
+ *p++ |= ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
+ break;
+
+ case SMP_T_SINT:
+ *p++ = SPOE_DATA_T_INT64;
+ if (spoe_encode_varint(smp->data.u.sint, &p, end) == -1)
+ return -1;
+ break;
+
+ case SMP_T_IPV4:
+ if (p + 5 > end)
+ return -1;
+ *p++ = SPOE_DATA_T_IPV4;
+ memcpy(p, &smp->data.u.ipv4, 4);
+ p += 4;
+ break;
+
+ case SMP_T_IPV6:
+ if (p + 17 > end)
+ return -1;
+ *p++ = SPOE_DATA_T_IPV6;
+ memcpy(p, &smp->data.u.ipv6, 16);
+ p += 16;
+ break;
+
+ case SMP_T_STR:
+ case SMP_T_BIN: {
+ struct chunk *chk = &smp->data.u.str;
+
+ /* Here, we need to know if the sample has already been
+ * partially encoded. If yes, we only need to encode the
+ * remaining, <*off> reprensenting the number of bytes
+ * already encoded. */
+ if (!*off) {
+ /* First evaluation of the sample : encode the
+ * type (string or binary), the buffer length
+ * (as a varint) and at least 1 byte of the
+ * buffer. */
+ struct chunk *chk = &smp->data.u.str;
+
+ *p++ = (smp->data.type == SMP_T_STR)
+ ? SPOE_DATA_T_STR
+ : SPOE_DATA_T_BIN;
+ ret = spoe_encode_frag_buffer(chk->str, chk->len, &p, end);
+ if (ret == -1)
+ return -1;
+ }
+ else {
+ /* The sample has been fragmented, encode remaining data */
+ ret = MIN(chk->len - *off, end - p);
+ memcpy(p, chk->str + *off, ret);
+ p += ret;
+ }
+ /* Now update <*off> */
+ if (ret + *off != chk->len)
+ *off += ret;
+ else
+ *off = 0;
+ break;
+ }
+
+ case SMP_T_METH:
+ *p++ = SPOE_DATA_T_STR;
+ if (smp->data.u.meth.meth != HTTP_METH_OTHER) {
+ const struct http_method_name *meth =
+ &http_known_methods[smp->data.u.meth.meth];
+
+ if (spoe_encode_buffer(meth->name, meth->len, &p, end) == -1)
+ return -1;
+ }
+ else {
+ struct chunk *meth = &smp->data.u.meth.str;
+
+ if (spoe_encode_buffer(meth->str, meth->len, &p, end) == -1)
+ return -1;
+ }
+ break;
+
+ default:
+ *p++ = SPOE_DATA_T_NULL;
+ break;
+ }
+
+ end:
+ ret = (p - *buf);
+ *buf = p;
+ return ret;
}
/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
- * of bytes read is returned. A types data is composed of a type (1 byte) and
- * corresponding data:
+ * of skipped bytes is returned and the <*buf> is moved after skipped data.
+ *
+ * A types data is composed of a type (1 byte) and corresponding data:
* - boolean: non additional data (0 bytes)
- * - integers: a variable-length integer (see decode_spoe_varint)
+ * - integers: a variable-length integer (see spoe_decode_varint)
* - ipv4: 4 bytes
* - ipv6: 16 bytes
* - binary and string: a buffer prefixed by its size, a variable-length
- * integer (see decode_spoe_string) */
+ * integer (see spoe_decode_buffer) */
static int
-skip_spoe_data(char *frame, char *end)
+spoe_skip_data(char **buf, char *end)
{
- uint64_t sz = 0;
- int i, idx = 0;
+ char *str, *p = *buf;
+ int type, ret;
+ size_t sz;
+ uint64_t v;
- if (frame > end)
+ if (p >= end)
return -1;
- switch (frame[idx++] & SPOE_DATA_T_MASK) {
+ type = *p++;
+ switch (type & SPOE_DATA_T_MASK) {
case SPOE_DATA_T_BOOL:
break;
case SPOE_DATA_T_INT32:
case SPOE_DATA_T_INT64:
case SPOE_DATA_T_UINT32:
case SPOE_DATA_T_UINT64:
- if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+ if (spoe_decode_varint(&p, end, &v) == -1)
return -1;
- idx += i;
break;
case SPOE_DATA_T_IPV4:
- idx += 4;
+ if (p+4 > end)
+ return -1;
+ p += 4;
break;
case SPOE_DATA_T_IPV6:
- idx += 16;
+ if (p+16 > end)
+ return -1;
+ p += 16;
break;
case SPOE_DATA_T_STR:
case SPOE_DATA_T_BIN:
- if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+ /* All the buffer must be skipped */
+ if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
return -1;
- idx += i + sz;
break;
}
- if (frame+idx > end)
- return -1;
- return idx;
+ ret = (p - *buf);
+ *buf = p;
+ return ret;
}
-/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
- * number of read bytes is returned. See skip_spoe_data for details. */
+/* Decode a typed data and fill <smp>. If an error occurred, -1 is returned,
+ * otherwise the number of read bytes is returned and <*buf> is moved after the
+ * decoded data. See spoe_skip_data for details. */
static int
-decode_spoe_data(char *frame, char *end, struct sample *smp)
+spoe_decode_data(char **buf, char *end, struct sample *smp)
{
- uint64_t sz = 0;
- int type, i, idx = 0;
+ char *str, *p = *buf;
+ int type, r = 0;
+ size_t sz;
- if (frame > end)
+ if (p >= end)
return -1;
- type = frame[idx++];
+ type = *p++;
switch (type & SPOE_DATA_T_MASK) {
case SPOE_DATA_T_BOOL:
- smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
+ smp->data.u.sint = ((type & SPOE_DATA_FL_MASK) == SPOE_DATA_FL_TRUE);
smp->data.type = SMP_T_BOOL;
break;
case SPOE_DATA_T_INT32:
case SPOE_DATA_T_INT64:
case SPOE_DATA_T_UINT32:
case SPOE_DATA_T_UINT64:
- if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
+ if (spoe_decode_varint(&p, end, (uint64_t *)&smp->data.u.sint) == -1)
return -1;
- idx += i;
smp->data.type = SMP_T_SINT;
break;
case SPOE_DATA_T_IPV4:
- if (frame+idx+4 > end)
+ if (p+4 > end)
return -1;
- memcpy(&smp->data.u.ipv4, frame+idx, 4);
smp->data.type = SMP_T_IPV4;
- idx += 4;
+ memcpy(&smp->data.u.ipv4, p, 4);
+ p += 4;
break;
case SPOE_DATA_T_IPV6:
- if (frame+idx+16 > end)
+ if (p+16 > end)
return -1;
- memcpy(&smp->data.u.ipv6, frame+idx, 16);
+ memcpy(&smp->data.u.ipv6, p, 16);
smp->data.type = SMP_T_IPV6;
- idx += 16;
+ p += 16;
break;
case SPOE_DATA_T_STR:
- if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- idx += i;
- if (frame+idx+sz > end)
- return -1;
- smp->data.u.str.str = frame+idx;
- smp->data.u.str.len = sz;
- smp->data.type = SMP_T_STR;
- idx += sz;
- break;
case SPOE_DATA_T_BIN:
- if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- idx += i;
- if (frame+idx+sz > end)
+ /* All the buffer must be decoded */
+ if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
return -1;
- smp->data.u.str.str = frame+idx;
+ smp->data.u.str.str = str;
smp->data.u.str.len = sz;
- smp->data.type = SMP_T_BIN;
- idx += sz;
+ smp->data.type = (type == SPOE_DATA_T_STR) ? SMP_T_STR : SMP_T_BIN;
break;
}
- if (frame+idx > end)
- return -1;
- return idx;
-}
-
-/* Skip an action in a frame received from an agent. If an error occurred, -1 is
- * returned, otherwise the number of read bytes is returned. An action is
- * composed of the action type followed by a typed data. */
-static int
-skip_spoe_action(char *frame, char *end)
-{
- int n, i, idx = 0;
-
- if (frame+2 > end)
- return -1;
-
- idx++; /* Skip the action type */
- n = frame[idx++];
- while (n-- > 0) {
- if ((i = skip_spoe_data(frame+idx, end)) == -1)
- return -1;
- idx += i;
- }
-
- if (frame+idx > end)
- return -1;
- return idx;
+ r = (p - *buf);
+ *buf = p;
+ return r;
}
-/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
- * success, 0 if the frame can be ignored and -1 if an error occurred. */
+/* Encode the HELLO frame sent by HAProxy to an agent. It returns the number of
+ * encoded bytes in the frame on success, 0 if an encoding error occured and -1
+ * if a fatal error occurred. */
static int
-prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
+spoe_prepare_hahello_frame(struct appctx *appctx, char *frame, size_t size)
{
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- unsigned int flags = SPOE_FRM_FL_FIN;
- int idx = 0;
- size_t max = (7 /* TYPE + METADATA */
- + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
- + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
- + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
- + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
-
- if (size < max) {
- spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
- return -1;
- }
+ char *p, *end;
+ unsigned int flags = SPOE_FRM_FL_FIN;
+ size_t sz;
- /* Frame type */
- frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
+ p = frame;
+ end = frame+size;
+
+ /* Set Frame type */
+ *p++ = SPOE_FRM_T_HAPROXY_HELLO;
/* Set flags */
- //flags = htonl(flags);
- memcpy(frame+idx, (char *)&flags, 4);
- idx += 4;
+ memcpy(p, (char *)&flags, 4);
+ p += 4;
/* No stream-id and frame-id for HELLO frames */
- frame[idx++] = 0;
- frame[idx++] = 0;
+ *p++ = 0; *p++ = 0;
/* There are 3 mandatory items: "supported-versions", "max-frame-size"
* and "capabilities" */
/* "supported-versions" K/V item */
- idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
- frame[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
+ sz = SLEN(SUPPORTED_VERSIONS_KEY);
+ if (spoe_encode_buffer(SUPPORTED_VERSIONS_KEY, sz, &p, end) == -1)
+ goto too_big;
+
+ *p++ = SPOE_DATA_T_STR;
+ sz = SLEN(SUPPORTED_VERSIONS_VAL);
+ if (spoe_encode_buffer(SUPPORTED_VERSIONS_VAL, sz, &p, end) == -1)
+ goto too_big;
/* "max-fram-size" K/V item */
- idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
- frame[idx++] = SPOE_DATA_T_UINT32;
- idx += encode_spoe_varint(SPOE_APPCTX(appctx)->max_frame_size, frame+idx);
+ sz = SLEN(MAX_FRAME_SIZE_KEY);
+ if (spoe_encode_buffer(MAX_FRAME_SIZE_KEY, sz, &p, end) == -1)
+ goto too_big;
+
+ *p++ = SPOE_DATA_T_UINT32;
+ if (spoe_encode_varint(SPOE_APPCTX(appctx)->max_frame_size, &p, end) == -1)
+ goto too_big;
/* "capabilities" K/V item */
- idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
- frame[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
+ sz = SLEN(CAPABILITIES_KEY);
+ if (spoe_encode_buffer(CAPABILITIES_KEY, sz, &p, end) == -1)
+ goto too_big;
+
+ *p++ = SPOE_DATA_T_STR;
+ sz = SLEN(CAPABILITIES_VAL);
+ if (spoe_encode_buffer(CAPABILITIES_VAL, sz, &p, end) == -1)
+ goto too_big;
- /* "engine-id" K/V item */
+ /* (optionnal) "engine-id" K/V item, if present */
if (agent != NULL && agent->engine_id != NULL) {
- idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx);
- frame[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx);
+ sz = SLEN(ENGINE_ID_KEY);
+ if (spoe_encode_buffer(ENGINE_ID_KEY, sz, &p, end) == -1)
+ goto too_big;
+
+ *p++ = SPOE_DATA_T_STR;
+ sz = strlen(agent->engine_id);
+ if (spoe_encode_buffer(agent->engine_id, sz, &p, end) == -1)
+ goto too_big;
}
- return idx;
+ return (p - frame);
+
+ too_big:
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
+ return 0;
}
-/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
- * size on success, 0 if the frame can be ignored and -1 if an error
- * occurred. */
+/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the number of
+ * encoded bytes in the frame on success, 0 if an encoding error occurred and -1
+ * if a fatal error occurred. */
static int
-prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
+spoe_prepare_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
{
- const char *reason;
+ const char *reason;
+ char *p, *end;
unsigned int flags = SPOE_FRM_FL_FIN;
- int rlen, idx = 0;
- size_t max = (7 /* TYPE + METADATA */
- + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
- + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
-
- if (size < max)
- return -1;
+ size_t sz;
- /* Get the message corresponding to the status code */
- if (spoe_status_code >= SPOE_FRM_ERRS)
- spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
- reason = spoe_frm_err_reasons[spoe_status_code];
- rlen = strlen(reason);
+ p = frame;
+ end = frame+size;
- /* Frame type */
- frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
+ /* Set Frame type */
+ *p++ = SPOE_FRM_T_HAPROXY_DISCON;
/* Set flags */
- memcpy(frame+idx, (char *)&flags, 4);
- idx += 4;
+ memcpy(p, (char *)&flags, 4);
+ p += 4;
/* No stream-id and frame-id for DISCONNECT frames */
- frame[idx++] = 0;
- frame[idx++] = 0;
+ *p++ = 0; *p++ = 0;
+
+ if (SPOE_APPCTX(appctx)->status_code >= SPOE_FRM_ERRS)
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_UNKNOWN;
/* There are 2 mandatory items: "status-code" and "message" */
/* "status-code" K/V item */
- idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
- frame[idx++] = SPOE_DATA_T_UINT32;
- idx += encode_spoe_varint(spoe_status_code, frame+idx);
+ sz = SLEN(STATUS_CODE_KEY);
+ if (spoe_encode_buffer(STATUS_CODE_KEY, sz, &p, end) == -1)
+ goto too_big;
+
+ *p++ = SPOE_DATA_T_UINT32;
+ if (spoe_encode_varint(SPOE_APPCTX(appctx)->status_code, &p, end) == -1)
+ goto too_big;
/* "message" K/V item */
- idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
- frame[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(reason, rlen, frame+idx);
+ sz = SLEN(MSG_KEY);
+ if (spoe_encode_buffer(MSG_KEY, sz, &p, end) == -1)
+ goto too_big;
+
+ /*Get the message corresponding to the status code */
+ reason = spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code];
- return idx;
+ *p++ = SPOE_DATA_T_STR;
+ sz = strlen(reason);
+ if (spoe_encode_buffer(reason, sz, &p, end) == -1)
+ goto too_big;
+
+ return (p - frame);
+
+ too_big:
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
+ return 0;
}
-/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
- * success, 0 if the frame can be ignored and -1 if an error occurred. */
+/* Encode the NOTIFY frame sent by HAProxy to an agent. It returns the number of
+ * encoded bytes in the frame on success, 0 if an encoding error occurred and -1
+ * if a fatal error occurred. */
static int
-prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
+spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
char *frame, size_t size)
{
- int idx = 0;
- unsigned int stream_id, frame_id, flags = SPOE_FRM_FL_FIN;
+ char *p, *end;
+ unsigned int stream_id, frame_id;
+ unsigned int flags = SPOE_FRM_FL_FIN;
+ size_t sz;
- frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
+ p = frame;
+ end = frame+size;
+ /* <ctx> is null when the stream has aborted the processing of a
+ * fragmented frame. In this case, we must notify the corresponding
+ * agent using ids stored in <frag_ctx>. */
if (ctx == NULL) {
flags |= SPOE_FRM_FL_ABRT;
stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
frame_id = ctx->frame_id;
if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
+ /* The fragmentation is not supported by the applet */
if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
- spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
- return 0;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ return -1;
}
flags = ctx->frag_ctx.flags;
}
}
+ /* Set Frame type */
+ *p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
+
/* Set flags */
- memcpy(frame+idx, (char *)&flags, 4);
- idx += 4;
+ memcpy(p, (char *)&flags, 4);
+ p += 4;
/* Set stream-id and frame-id */
- idx += encode_spoe_varint(stream_id, frame+idx);
- idx += encode_spoe_varint(frame_id, frame+idx);
+ if (spoe_encode_varint(stream_id, &p, end) == -1)
+ goto too_big;
+ if (spoe_encode_varint(frame_id, &p, end) == -1)
+ goto too_big;
- /* check the buffer size */
- if (idx + SPOE_APPCTX(appctx)->buffer->i > size) {
- spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
- return 0;
- }
+ /* Copy encoded messages, if possible */
+ sz = SPOE_APPCTX(appctx)->buffer->i;
+ if (p + sz >= end)
+ goto too_big;
+ memcpy(p, SPOE_APPCTX(appctx)->buffer->p, sz);
+ p += sz;
+
+ return (p - frame);
- /* Copy encoded messages */
- memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i);
- idx += SPOE_APPCTX(appctx)->buffer->i;
- return idx;
+ too_big:
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
+ return 0;
}
-/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
- * on success, 0 if the frame can be ignored and -1 if an error occurred. */
+/* Decode and process the HELLO frame sent by an agent. It returns the number of
+ * read bytes on success, 0 if a decoding error occurred, and -1 if a fatal
+ * error occurred. */
static int
-handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
+spoe_handle_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
{
- int vsn, max_frame_size, i, idx = 0;
+ char *p, *end;
+ int vsn, max_frame_size;
unsigned int flags;
- size_t min_size = (7 /* TYPE + METADATA */
- + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
- + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
- + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
+
+ p = frame;
+ end = frame + size;
/* Check frame type */
- if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
+ if (*p++ != SPOE_FRM_T_AGENT_HELLO) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
return 0;
+ }
- if (size < min_size) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (size < 7 /* TYPE + METADATA */) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
/* Retrieve flags */
- memcpy((char *)&flags, frame+idx, 4);
- idx += 4;
+ memcpy((char *)&flags, p, 4);
+ p += 4;
/* Fragmentation is not supported for HELLO frame */
if (!(flags & SPOE_FRM_FL_FIN)) {
- spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
return -1;
}
/* stream-id and frame-id must be cleared */
- if (frame[idx] != 0 || frame[idx+1] != 0) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (*p != 0 || *(p+1) != 0) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += 2;
+ p += 2;
/* There are 3 mandatory items: "version", "max-frame-size" and
* "capabilities" */
/* Loop on K/V items */
vsn = max_frame_size = flags = 0;
- while (idx < size) {
- char *str;
- uint64_t sz;
+ while (p < end) {
+ char *str;
+ size_t sz;
+ int ret;
/* Decode the item key */
- idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
- if (str == NULL) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ ret = spoe_decode_buffer(&p, end, &str, &sz);
+ if (ret == -1 || !sz) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
+
/* Check "version" K/V item */
if (!memcmp(str, VERSION_KEY, sz)) {
+ int i, type = *p++;
+
/* The value must be a string */
- if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
- if (str == NULL) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (spoe_decode_buffer(&p, end, &str, &sz) == -1) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- vsn = decode_spoe_version(str, sz);
+ vsn = spoe_str_to_vsn(str, sz);
if (vsn == -1) {
- spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_VSN;
return -1;
}
for (i = 0; supported_versions[i].str != NULL; ++i) {
break;
}
if (supported_versions[i].str == NULL) {
- spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_VSN;
return -1;
}
}
/* Check "max-frame-size" K/V item */
else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
- int type;
+ int type = *p++;
/* The value must be integer */
- type = frame[idx++];
if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (spoe_decode_varint(&p, end, &sz) == -1) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += i;
- if (sz < MIN_FRAME_SIZE || sz > SPOE_APPCTX(appctx)->max_frame_size) {
- spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
+ if (sz < MIN_FRAME_SIZE ||
+ sz > SPOE_APPCTX(appctx)->max_frame_size) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
return -1;
}
max_frame_size = sz;
}
/* Check "capabilities" K/V item */
else if (!memcmp(str, CAPABILITIES_KEY, sz)) {
- int i;
+ int type = *p++;
/* The value must be a string */
- if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
+ }
+ if (spoe_decode_buffer(&p, end, &str, &sz) == -1) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
- if (str == NULL)
- continue;
- i = 0;
- while (i < sz) {
+ while (sz) {
char *delim;
/* Skip leading spaces */
- for (; isspace(str[i]) && i < sz; i++);
+ for (; isspace(*str) && sz; str++, sz--);
- if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) {
- i += 10;
- if (sz == i || isspace(str[i]) || str[i] == ',')
+ if (sz >= 10 && !strncmp(str, "pipelining", 10)) {
+ str += 10; sz -= 10;
+ if (!sz || isspace(*str) || *str == ',')
flags |= SPOE_APPCTX_FL_PIPELINING;
}
- else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) {
- i += 5;
- if (sz == i || isspace(str[i]) || str[i] == ',')
+ else if (sz >= 5 && !strncmp(str, "async", 5)) {
+ str += 5; sz -= 5;
+ if (!sz || isspace(*str) || *str == ',')
flags |= SPOE_APPCTX_FL_ASYNC;
}
- else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) {
- i += 13;
- if (sz == i || isspace(str[i]) || str[i] == ',')
+ else if (sz >= 13 && !strncmp(str, "fragmentation", 13)) {
+ str += 13; sz -= 13;
+ if (!sz || isspace(*str) || *str == ',')
flags |= SPOE_APPCTX_FL_FRAGMENTATION;
}
- if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
+ /* Get the next comma or break */
+ if (!sz || (delim = memchr(str, ',', sz)) == NULL)
break;
- i = (delim - str) + 1;
+ delim++;
+ sz -= (delim - str);
+ str = delim;
}
}
else {
/* Silently ignore unknown item */
- if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (spoe_skip_data(&p, end) == -1) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += i;
}
}
/* Final checks */
if (!vsn) {
- spoe_status_code = SPOE_FRM_ERR_NO_VSN;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NO_VSN;
return -1;
}
if (!max_frame_size) {
- spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
return -1;
}
SPOE_APPCTX(appctx)->version = (unsigned int)vsn;
SPOE_APPCTX(appctx)->max_frame_size = (unsigned int)max_frame_size;
SPOE_APPCTX(appctx)->flags |= flags;
- return idx;
+
+ return (p - frame);
}
/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
* bytes on success, 0 if the frame can be ignored and -1 if an error
* occurred. */
static int
-handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
+spoe_handle_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
{
- int i, idx = 0;
+ char *p, *end;
unsigned int flags;
- size_t min_size = (7 /* TYPE + METADATA */
- + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
- + 1 + SLEN(MSG_KEY) + 1 + 1);
+
+ p = frame;
+ end = frame + size;
/* Check frame type */
- if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
+ if (*p++ != SPOE_FRM_T_AGENT_DISCON) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
return 0;
+ }
- if (size < min_size) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (size < 7 /* TYPE + METADATA */) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
/* Retrieve flags */
- memcpy((char *)&flags, frame+idx, 4);
- idx += 4;
+ memcpy((char *)&flags, p, 4);
+ p += 4;
/* Fragmentation is not supported for DISCONNECT frame */
if (!(flags & SPOE_FRM_FL_FIN)) {
- spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
return -1;
}
/* stream-id and frame-id must be cleared */
- if (frame[idx] != 0 || frame[idx+1] != 0) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (*p != 0 || *(p+1) != 0) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += 2;
+ p += 2;
/* There are 2 mandatory items: "status-code" and "message" */
/* Loop on K/V items */
- while (idx < size) {
- char *str;
- uint64_t sz;
+ while (p < end) {
+ char *str;
+ size_t sz;
+ int ret;
/* Decode the item key */
- idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
- if (str == NULL) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ ret = spoe_decode_buffer(&p, end, &str, &sz);
+ if (ret == -1 || !sz) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
/* Check "status-code" K/V item */
if (!memcmp(str, STATUS_CODE_KEY, sz)) {
- int type;
+ int type = *p++;
/* The value must be an integer */
- type = frame[idx++];
if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (spoe_decode_varint(&p, end, &sz) == -1) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += i;
- spoe_status_code = sz;
+ SPOE_APPCTX(appctx)->status_code = sz;
}
/* Check "message" K/V item */
- else if (sz && !memcmp(str, MSG_KEY, sz)) {
+ else if (!memcmp(str, MSG_KEY, sz)) {
+ int type = *p++;
+
/* The value must be a string */
- if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
- if (str == NULL || sz > 255) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ ret = spoe_decode_buffer(&p, end, &str, &sz);
+ if (ret == -1 || sz > 255) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- memcpy(spoe_reason, str, sz);
- spoe_reason[sz] = 0;
+#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
+ SPOE_APPCTX(appctx)->reason = str;
+ SPOE_APPCTX(appctx)->rlen = sz;
+#endif
}
else {
/* Silently ignore unknown item */
- if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (spoe_skip_data(&p, end) == -1) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
- idx += i;
}
}
- return idx;
+ return (p - frame);
}
/* Decode ACK frame sent by an agent. It returns the number of read bytes on
* success, 0 if the frame can be ignored and -1 if an error occurred. */
static int
-handle_spoe_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
+spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
char *frame, size_t size)
{
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ char *p, *end;
uint64_t stream_id, frame_id;
- int i, idx = 0;
+ int len;
unsigned int flags;
- size_t min_size = (7 /* TYPE + METADATA */);
+
+ p = frame;
+ end = frame + size;
+ *ctx = NULL;
/* Check frame type */
- if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
+ if (*p++ != SPOE_FRM_T_AGENT_ACK) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
return 0;
+ }
- if (size < min_size) {
- spoe_status_code = SPOE_FRM_ERR_INVALID;
- return -1;
+ if (size < 7 /* TYPE + METADATA */) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
+ return 0;
}
/* Retrieve flags */
- memcpy((char *)&flags, frame+idx, 4);
- idx += 4;
+ memcpy((char *)&flags, p, 4);
+ p += 4;
/* Fragmentation is not supported for now */
if (!(flags & SPOE_FRM_FL_FIN)) {
- spoe_status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
return -1;
}
/* Get the stream-id and the frame-id */
- if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1)
+ if (spoe_decode_varint(&p, end, &stream_id) == -1) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
return 0;
- idx += i;
- if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1)
+ }
+ if (spoe_decode_varint(&p, end, &frame_id) == -1) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
return 0;
- idx += i;
+ }
+ /* Try to find the corresponding SPOE context */
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
list_for_each_entry((*ctx), &agent->waiting_queue, list) {
if ((*ctx)->stream_id == (unsigned int)stream_id &&
else {
list_for_each_entry((*ctx), &SPOE_APPCTX(appctx)->waiting_queue, list) {
if ((*ctx)->stream_id == (unsigned int)stream_id &&
- (*ctx)->frame_id == (unsigned int)frame_id)
+ (*ctx)->frame_id == (unsigned int)frame_id)
goto found;
}
}
- /* FIXME: check if ABRT bit is set for a unfinished fragmented frame */
/* No Stream found, ignore the frame */
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Ignore ACK frame"
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - Ignore ACK frame"
" - stream-id=%u - frame-id=%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, appctx,
(unsigned int)stream_id, (unsigned int)frame_id);
- *ctx = NULL;
+ /* FIXME: Define a proper error for this case (SPOE_FRM_ERR_FRAMEID_NOTFOUND ?) */
return 0;
found:
- if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait)) {
+ if (!spoe_acquire_buffer(&SPOE_APPCTX(appctx)->buffer,
+ &SPOE_APPCTX(appctx)->buffer_wait)) {
*ctx = NULL;
return 1; /* Retry later */
}
/* Copy encoded actions */
- memcpy(SPOE_APPCTX(appctx)->buffer->p, frame+idx, size-idx);
- SPOE_APPCTX(appctx)->buffer->i = size-idx;
+ len = (end - p);
+ memcpy(SPOE_APPCTX(appctx)->buffer->p, p, len);
+ SPOE_APPCTX(appctx)->buffer->i = len;
+ p += len;
/* Transfer the buffer ownership to the SPOE context */
(*ctx)->buffer = SPOE_APPCTX(appctx)->buffer;
SPOE_APPCTX(appctx)->buffer = &buf_empty;
+ (*ctx)->state = SPOE_CTX_ST_DONE;
+
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
- " - ACK frame received - ctx=%p - stream-id=%u - frame-id=%u\n",
+ " - ACK frame received"
+ " - ctx=%p - stream-id=%u - frame-id=%u - flags=0x%08x\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, appctx,
- *ctx, (*ctx)->stream_id, (*ctx)->frame_id);
- return idx;
+ __FUNCTION__, appctx, *ctx, (*ctx)->stream_id,
+ (*ctx)->frame_id, flags);
+ return (p - frame);
}
/* This function is used in cfgparse.c and declared in proto/checks.h. It
* prepare the request to send to agents during a healthcheck. It returns 0 on
* success and -1 if an error occurred. */
int
-prepare_spoe_healthcheck_request(char **req, int *len)
+spoe_prepare_healthcheck_request(char **req, int *len)
{
- struct appctx appctx;
- struct spoe_appctx spoe_appctx;
- char *frame, buf[MAX_FRAME_SIZE+4];
- unsigned int framesz;
- int idx;
+ struct appctx appctx;
+ struct spoe_appctx spoe_appctx;
+ char *frame, *end, buf[MAX_FRAME_SIZE+4];
+ size_t sz;
+ int ret;
memset(&appctx, 0, sizeof(appctx));
memset(&spoe_appctx, 0, sizeof(spoe_appctx));
appctx.ctx.spoe.ptr = &spoe_appctx;
SPOE_APPCTX(&appctx)->max_frame_size = MAX_FRAME_SIZE;
- frame = buf+4;
- idx = prepare_spoe_hahello_frame(&appctx, frame, MAX_FRAME_SIZE);
- if (idx <= 0)
- return -1;
- if (idx + SLEN(HEALTHCHECK_KEY) + 1 > MAX_FRAME_SIZE)
+ frame = buf+4; /* Reserved the 4 first bytes for the frame size */
+ end = frame + MAX_FRAME_SIZE;
+
+ ret = spoe_prepare_hahello_frame(&appctx, frame, MAX_FRAME_SIZE);
+ if (ret <= 0)
return -1;
+ frame += ret;
- /* "healthcheck" K/V item */
- idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
- frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
+ /* Add "healthcheck" K/V item */
+ sz = SLEN(HEALTHCHECK_KEY);
+ if (spoe_encode_buffer(HEALTHCHECK_KEY, sz, &frame, end) == -1)
+ return -1;
+ *frame++ = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
- framesz = htonl(idx);
- memcpy(buf, (char *)&framesz, 4);
+ *len = frame - buf;
+ sz = htonl(*len - 4);
+ memcpy(buf, (char *)&sz, 4);
- if ((*req = malloc(idx+4)) == NULL)
+ if ((*req = malloc(*len)) == NULL)
return -1;
- memcpy(*req, buf, idx+4);
- *len = idx+4;
+ memcpy(*req, buf, *len);
return 0;
}
* the response received from an agent during a healthcheck. It returns 0 on
* success and -1 if an error occurred. */
int
-handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
+spoe_handle_healthcheck_response(char *frame, size_t size, char *err, int errlen)
{
struct appctx appctx;
struct spoe_appctx spoe_appctx;
- int r;
memset(&appctx, 0, sizeof(appctx));
memset(&spoe_appctx, 0, sizeof(spoe_appctx));
appctx.ctx.spoe.ptr = &spoe_appctx;
SPOE_APPCTX(&appctx)->max_frame_size = MAX_FRAME_SIZE;
- if (handle_spoe_agentdiscon_frame(&appctx, frame, size) != 0)
- goto error;
- if ((r = handle_spoe_agenthello_frame(&appctx, frame, size)) <= 0) {
- if (r == 0)
- spoe_status_code = SPOE_FRM_ERR_INVALID;
+ if (*frame == SPOE_FRM_T_AGENT_DISCON) {
+ spoe_handle_agentdiscon_frame(&appctx, frame, size);
goto error;
}
+ if (spoe_handle_agenthello_frame(&appctx, frame, size) <= 0)
+ goto error;
return 0;
error:
- if (spoe_status_code >= SPOE_FRM_ERRS)
- spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
- strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
+ if (SPOE_APPCTX(&appctx)->status_code >= SPOE_FRM_ERRS)
+ SPOE_APPCTX(&appctx)->status_code = SPOE_FRM_ERR_UNKNOWN;
+ strncpy(err, spoe_frm_err_reasons[SPOE_APPCTX(&appctx)->status_code], errlen);
return -1;
}
* the frame can be ignored, 1 to retry later, and the frame legnth on
* success. */
static int
-send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
+spoe_send_frame(struct appctx *appctx, char *buf, size_t framesz)
{
struct stream_interface *si = appctx->owner;
- int ret;
- uint32_t netint;
+ int ret;
+ uint32_t netint;
if (si_ic(si)->buf == &buf_empty)
- return 1;
+ goto retry;
+ /* 4 bytes are reserved at the beginning of <buf> to store the frame
+ * length. */
netint = htonl(framesz);
memcpy(buf, (char *)&netint, 4);
ret = bi_putblk(si_ic(si), buf, framesz+4);
if (ret <= 0) {
- if (ret == -1)
+ if (ret == -1) {
+ retry:
+ si_applet_cant_put(si);
return 1; /* retry */
- spoe_status_code = SPOE_FRM_ERR_IO;
+ }
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
return -1; /* error */
}
return framesz;
* when the frame can be ignored, 1 to retry later and the frame length on
* success. */
static int
-recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
+spoe_recv_frame(struct appctx *appctx, char *buf, size_t framesz)
{
struct stream_interface *si = appctx->owner;
- int ret;
- uint32_t netint;
+ int ret;
+ uint32_t netint;
if (si_oc(si)->buf == &buf_empty)
- return 1;
+ goto retry;
ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
if (ret > 0) {
framesz = ntohl(netint);
if (framesz > SPOE_APPCTX(appctx)->max_frame_size) {
- spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
return -1;
}
- ret = bo_getblk(si_oc(si), trash.str, framesz, 4);
+ ret = bo_getblk(si_oc(si), buf, framesz, 4);
}
if (ret <= 0) {
- if (ret == 0)
+ if (ret == 0) {
+ retry:
return 1; /* retry */
- spoe_status_code = SPOE_FRM_ERR_IO;
+ }
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
return -1; /* error */
}
return framesz;
* Functions that manage the SPOE applet
********************************************************************/
static int
-wakeup_spoe_appctx(struct appctx *appctx)
+spoe_wakeup_appctx(struct appctx *appctx)
{
si_applet_want_get(appctx->owner);
si_applet_want_put(appctx->owner);
/* Callback function that catches applet timeouts. If a timeout occurred, we set
* <appctx->st1> flag and the SPOE applet is woken up. */
static struct task *
-process_spoe_applet(struct task * task)
+spoe_process_appctx(struct task * task)
{
struct appctx *appctx = task->context;
appctx->st1 = SPOE_APPCTX_ERR_NONE;
if (tick_is_expired(task->expire, now_ms)) {
task->expire = TICK_ETERNITY;
- appctx->st1 = SPOE_APPCTX_ERR_TOUT;
+ appctx->st1 = SPOE_APPCTX_ERR_TOUT;
}
- wakeup_spoe_appctx(appctx);
+ spoe_wakeup_appctx(appctx);
return task;
}
/* Callback function that releases a SPOE applet. This happens when the
* connection with the agent is closed. */
static void
-release_spoe_applet(struct appctx *appctx)
+spoe_release_appctx(struct appctx *appctx)
{
- struct stream_interface *si = appctx->owner;
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ struct stream_interface *si = appctx->owner;
+ struct spoe_appctx *spoe_appctx = SPOE_APPCTX(appctx);
+ struct spoe_agent *agent;
struct spoe_context *ctx, *back;
- struct spoe_appctx *spoe_appctx;
+
+ if (spoe_appctx == NULL)
+ return;
+
+ appctx->ctx.spoe.ptr = NULL;
+ agent = spoe_appctx->agent;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, appctx);
+ /* Remove applet from the list of running applets */
agent->applets_act--;
- if (!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->list)) {
- LIST_DEL(&SPOE_APPCTX(appctx)->list);
- LIST_INIT(&SPOE_APPCTX(appctx)->list);
+ if (!LIST_ISEMPTY(&spoe_appctx->list)) {
+ LIST_DEL(&spoe_appctx->list);
+ LIST_INIT(&spoe_appctx->list);
}
+ /* Shutdown the server connection, if needed */
if (appctx->st0 != SPOE_APPCTX_ST_END) {
if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
agent->applets_idle--;
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
appctx->st0 = SPOE_APPCTX_ST_END;
- if (SPOE_APPCTX(appctx)->status_code == SPOE_FRM_ERR_NONE)
- SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
+ if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
+ spoe_appctx->status_code = SPOE_FRM_ERR_IO;
}
- if (SPOE_APPCTX(appctx)->task) {
- task_delete(SPOE_APPCTX(appctx)->task);
- task_free(SPOE_APPCTX(appctx)->task);
+ /* Destroy the task attached to this applet */
+ if (spoe_appctx->task) {
+ task_delete(spoe_appctx->task);
+ task_free(spoe_appctx->task);
}
- list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) {
+ /* Notify all waiting streams */
+ list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR;
- ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ ctx->status_code = (spoe_appctx->status_code + 0x100);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
- if (SPOE_APPCTX(appctx)->frag_ctx.ctx) {
- ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
+ /* If the applet was processing a fragmented frame, notify the
+ * corresponding stream. */
+ if (spoe_appctx->frag_ctx.ctx) {
+ ctx = spoe_appctx->frag_ctx.ctx;
ctx->frag_ctx.spoe_appctx = NULL;
ctx->state = SPOE_CTX_ST_ERROR;
- ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ ctx->status_code = (spoe_appctx->status_code + 0x100);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
- release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
- pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
+ /* Release allocated memory */
+ spoe_release_buffer(&spoe_appctx->buffer,
+ &spoe_appctx->buffer_wait);
+ pool_free2(pool2_spoe_appctx, spoe_appctx);
if (!LIST_ISEMPTY(&agent->applets))
goto end;
+ /* If this was the last running applet, notify all waiting streams */
list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR;
- ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ ctx->status_code = (spoe_appctx->status_code + 0x100);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
-
list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR;
- ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ ctx->status_code = (spoe_appctx->status_code + 0x100);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
/* Update runtinme agent info */
agent->frame_size = agent->max_frame_size;
list_for_each_entry(spoe_appctx, &agent->applets, list)
- agent->frame_size = MIN(spoe_appctx->max_frame_size, agent->frame_size);
+ agent->frame_size = MIN(spoe_appctx->max_frame_size,
+ agent->frame_size);
}
static int
-handle_connect_spoe_applet(struct appctx *appctx)
+spoe_handle_connect_appctx(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- char *frame = trash.str;
+ char *frame, *buf;
int ret;
if (si->state <= SI_ST_CON) {
goto stop;
}
if (si->state != SI_ST_EST) {
- spoe_status_code = SPOE_FRM_ERR_IO;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
goto exit;
}
if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
- spoe_status_code = SPOE_FRM_ERR_TOUT;
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - Connection timed out\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx);
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
goto exit;
}
if (SPOE_APPCTX(appctx)->task->expire == TICK_ETERNITY)
- SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
-
- ret = prepare_spoe_hahello_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
+ SPOE_APPCTX(appctx)->task->expire =
+ tick_add_ifset(now_ms, agent->timeout.hello);
+
+ /* 4 bytes are reserved at the beginning of <buf> to store the frame
+ * length. */
+ buf = trash.str; frame = buf+4;
+ ret = spoe_prepare_hahello_frame(appctx, frame,
+ SPOE_APPCTX(appctx)->max_frame_size);
if (ret > 1)
- ret = send_spoe_frame(appctx, frame, ret);
+ ret = spoe_send_frame(appctx, buf, ret);
switch (ret) {
case -1: /* error */
- goto exit;
-
case 0: /* ignore => an error, cannot be ignored */
goto exit;
case 1: /* retry later */
- si_applet_cant_put(si);
goto stop;
- default: /* CONNECT frame successfully sent */
+ default:
+ /* HELLO frame successfully sent, now wait for the
+ * reply. */
appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
goto next;
}
stop:
return 1;
exit:
- SPOE_APPCTX(appctx)->status_code = spoe_status_code;
appctx->st0 = SPOE_APPCTX_ST_EXIT;
return 0;
}
static int
-handle_connecting_spoe_applet(struct appctx *appctx)
+spoe_handle_connecting_appctx(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- char *frame = trash.str;
- int ret, framesz = 0;
+ char *frame;
+ int ret;
if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
- spoe_status_code = SPOE_FRM_ERR_IO;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
goto exit;
}
if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
- spoe_status_code = SPOE_FRM_ERR_TOUT;
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - Connection timed out\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx);
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
goto exit;
}
- ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
+ frame = trash.str; trash.len = 0;
+ ret = spoe_recv_frame(appctx, frame,
+ SPOE_APPCTX(appctx)->max_frame_size);
if (ret > 1) {
if (*frame == SPOE_FRM_T_AGENT_DISCON) {
appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
goto next;
}
- framesz = ret;
- ret = handle_spoe_agenthello_frame(appctx, frame, framesz);
+ trash.len = ret + 4;
+ ret = spoe_handle_agenthello_frame(appctx, frame, ret);
}
switch (ret) {
case -1: /* error */
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- goto next;
-
- case 0: /* ignore */
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
+ case 0: /* ignore => an error, cannot be ignored */
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
goto next;
goto stop;
default:
- /* hello handshake is finished, set the idle timeout,
- * Add the appctx in the agent cache, decrease the
- * number of new applets and wake up waiting streams. */
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
+ /* HELLO handshake is finished, set the idle timeout and
+ * add the applet in the list of running applets. */
agent->applets_idle++;
appctx->st0 = SPOE_APPCTX_ST_IDLE;
LIST_DEL(&SPOE_APPCTX(appctx)->list);
LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
/* Update runtinme agent info */
- agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size, agent->frame_size);
+ agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size,
+ agent->frame_size);
goto next;
}
next:
- SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ /* Do not forget to remove processed frame from the output buffer */
+ if (trash.len)
+ bo_skip(si_oc(si), trash.len);
+
+ SPOE_APPCTX(appctx)->task->expire =
+ tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
stop:
return 1;
exit:
- SPOE_APPCTX(appctx)->status_code = spoe_status_code;
appctx->st0 = SPOE_APPCTX_ST_EXIT;
return 0;
}
static int
-handle_processing_spoe_applet(struct appctx *appctx)
+spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx,
+ int *skip)
{
- struct stream_interface *si = appctx->owner;
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- struct spoe_context *ctx = NULL;
- char *frame = trash.str;
- unsigned int fpa = 0;
- int ret, framesz = 0, skip_sending = 0, skip_receiving = 0;
+ struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ char *frame, *buf;
+ int ret;
- if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
- spoe_status_code = SPOE_FRM_ERR_IO;
- goto exit;
- }
+ /* 4 bytes are reserved at the beginning of <buf> to store the frame
+ * length. */
+ buf = trash.str; frame = buf+4;
+ ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
+ SPOE_APPCTX(appctx)->max_frame_size);
+ if (ret > 1)
+ ret = spoe_send_frame(appctx, buf, ret);
- if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
- spoe_status_code = SPOE_FRM_ERR_TOUT;
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- appctx->st1 = SPOE_APPCTX_ERR_NONE;
- goto next;
- }
+ switch (ret) {
+ case -1: /* error */
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ goto end;
- process:
+ case 0: /* ignore */
+ if (ctx == NULL)
+ goto abort_frag_frame;
+
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ ctx->state = SPOE_CTX_ST_ERROR;
+ ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ break;
+
+ case 1: /* retry */
+ *skip = 1;
+ break;
+
+ default:
+ if (ctx == NULL)
+ goto abort_frag_frame;
+
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
+ (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN))
+ goto no_frag_frame_sent;
+ else {
+ *skip = 1;
+ goto frag_frame_sent;
+ }
+ }
+ goto end;
+
+ frag_frame_sent:
+ appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY;
+ SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx;
+ SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
+ SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
+
+ ctx->frag_ctx.spoe_appctx = SPOE_APPCTX(appctx);
+ ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ goto end;
+
+ no_frag_frame_sent:
+ if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+ }
+ else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
+ }
+ else {
+ appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
+ LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
+ }
+ SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
+ SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
+ SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
+
+ ctx->frag_ctx.spoe_appctx = NULL;
+ ctx->state = SPOE_CTX_ST_WAITING_ACK;
+ goto end;
+
+ abort_frag_frame:
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
+ SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
+ SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
+ goto end;
+
+ end:
+ return ret;
+}
+
+static int
+spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
+{
+ struct spoe_context *ctx = NULL;
+ char *frame;
+ int ret;
+
+ frame = trash.str; trash.len = 0;
+ ret = spoe_recv_frame(appctx, frame,
+ SPOE_APPCTX(appctx)->max_frame_size);
+ if (ret > 1) {
+ if (*frame == SPOE_FRM_T_AGENT_DISCON) {
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+ goto end;
+ }
+ trash.len = ret + 4;
+ ret = spoe_handle_agentack_frame(appctx, &ctx, frame, ret);
+ }
+ switch (ret) {
+ case -1: /* error */
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ break;
+
+ case 0: /* ignore */
+ break;
+
+ case 1: /* retry */
+ *skip = 1;
+ break;
+
+ default:
+ LIST_DEL(&ctx->list);
+ LIST_INIT(&ctx->list);
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ break;
+ }
+
+ /* Do not forget to remove processed frame from the output buffer */
+ if (trash.len)
+ bo_skip(si_oc(appctx->owner), trash.len);
+ end:
+ return ret;
+}
+
+static int
+spoe_handle_processing_appctx(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ struct spoe_context *ctx = NULL;
+ unsigned int fpa = 0;
+ int ret, skip_sending = 0, skip_receiving = 0;
+
+ if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
+ goto exit;
+ }
+
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ appctx->st1 = SPOE_APPCTX_ERR_NONE;
+ goto next;
+ }
+
+ process:
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
" - process: fpa=%u/%u - skip_sending=%d - skip_receiving=%d"
" - appctx-state=%s\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, appctx, fpa, agent->max_fpa,
- skip_sending, skip_receiving, spoe_appctx_state_str[appctx->st0]);
+ skip_sending, skip_receiving,
+ spoe_appctx_state_str[appctx->st0]);
if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
goto stop;
SPOE_APPCTX(appctx)->buffer = ctx->buffer;
ctx->buffer = &buf_empty;
}
-
- ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
- if (ret > 1)
- ret = send_spoe_frame(appctx, frame, ret);
-
+ ret = spoe_handle_sending_frame_appctx(appctx, ctx, &skip_sending);
switch (ret) {
case -1: /* error */
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
goto next;
case 0: /* ignore */
- release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
+ spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
+ &SPOE_APPCTX(appctx)->buffer_wait);
agent->sending_rate++;
fpa++;
-
- LIST_DEL(&ctx->list);
- LIST_INIT(&ctx->list);
- ctx->state = SPOE_CTX_ST_ERROR;
- ctx->status_code = (spoe_status_code + 0x100);
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
break;
case 1: /* retry */
- si_applet_cant_put(si);
- skip_sending = 1;
break;
default:
- release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
+ spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
+ &SPOE_APPCTX(appctx)->buffer_wait);
agent->sending_rate++;
fpa++;
-
- if (ctx == NULL) {
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
- SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
- SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
- break;
- }
- LIST_DEL(&ctx->list);
- LIST_INIT(&ctx->list);
-
- if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
- if (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN) {
- if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- LIST_ADDQ(&agent->waiting_queue, &ctx->list);
- }
- else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
- }
- else {
- appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
- LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
- }
- SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
- SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
- SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
-
- ctx->frag_ctx.spoe_appctx = NULL;
- ctx->state = SPOE_CTX_ST_WAITING_ACK;
- }
- else {
- appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY;
- SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx;
- SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
- SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
-
- ctx->frag_ctx.spoe_appctx = SPOE_APPCTX(appctx);
- ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- skip_sending = 1;
- }
- }
- else {
- if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- LIST_ADDQ(&agent->waiting_queue, &ctx->list);
- }
- else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
- }
- else {
- appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
- LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
- }
-
- ctx->state = SPOE_CTX_ST_WAITING_ACK;
- }
+ break;
}
-
if (fpa > agent->max_fpa)
goto stop;
recv_frame:
if (skip_receiving)
goto process;
-
- framesz = 0;
- ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
- if (ret > 1) {
- if (*frame == SPOE_FRM_T_AGENT_DISCON) {
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
- goto next;
- }
- framesz = ret;
- ret = handle_spoe_agentack_frame(appctx, &ctx, frame, framesz);
- }
+ ret = spoe_handle_receiving_frame_appctx(appctx, &skip_receiving);
switch (ret) {
case -1: /* error */
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
goto next;
case 0: /* ignore */
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
fpa++;
break;
case 1: /* retry */
- skip_receiving = 1;
break;
default:
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
fpa++;
-
- LIST_DEL(&ctx->list);
- LIST_INIT(&ctx->list);
-
- if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
-
- ctx->state = SPOE_CTX_ST_DONE;
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ break;
}
goto process;
next:
- SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ SPOE_APPCTX(appctx)->task->expire =
+ tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
stop:
if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
LIST_DEL(&SPOE_APPCTX(appctx)->list);
LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
if (fpa)
- SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ SPOE_APPCTX(appctx)->task->expire =
+ tick_add_ifset(now_ms, agent->timeout.idle);
}
return 1;
exit:
- SPOE_APPCTX(appctx)->status_code = spoe_status_code;
appctx->st0 = SPOE_APPCTX_ST_EXIT;
return 0;
}
static int
-handle_disconnect_spoe_applet(struct appctx *appctx)
+spoe_handle_disconnect_appctx(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- char *frame = trash.str;
- int ret;
-
- SPOE_APPCTX(appctx)->status_code = spoe_status_code;
+ char *frame, *buf;
+ int ret;
if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
goto exit;
if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
goto exit;
- ret = prepare_spoe_hadiscon_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
+ /* 4 bytes are reserved at the beginning of <buf> to store the frame
+ * length. */
+ buf = trash.str; frame = buf+4;
+ ret = spoe_prepare_hadiscon_frame(appctx, frame,
+ SPOE_APPCTX(appctx)->max_frame_size);
if (ret > 1)
- ret = send_spoe_frame(appctx, frame, ret);
+ ret = spoe_send_frame(appctx, buf, ret);
switch (ret) {
case -1: /* error */
- goto exit;
-
- case 0: /* ignore */
+ case 0: /* ignore => an error, cannot be ignored */
goto exit;
case 1: /* retry */
- si_applet_cant_put(si);
goto stop;
default:
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
" - disconnected by HAProxy (%d): %s\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, appctx, spoe_status_code,
- spoe_frm_err_reasons[spoe_status_code]);
+ __FUNCTION__, appctx,
+ SPOE_APPCTX(appctx)->status_code,
+ spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code]);
appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
goto next;
}
next:
- SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ SPOE_APPCTX(appctx)->task->expire =
+ tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
stop:
return 1;
}
static int
-handle_disconnecting_spoe_applet(struct appctx *appctx)
+spoe_handle_disconnecting_appctx(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
- char *frame = trash.str;
- int ret, framesz = 0;
+ char *frame;
+ int ret;
if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
- spoe_status_code = SPOE_FRM_ERR_IO;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
goto exit;
}
if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
- spoe_status_code = SPOE_FRM_ERR_TOUT;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
goto exit;
}
- framesz = 0;
- ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
+ frame = trash.str; trash.len = 0;
+ ret = spoe_recv_frame(appctx, frame,
+ SPOE_APPCTX(appctx)->max_frame_size);
if (ret > 1) {
- framesz = ret;
- ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
+ trash.len = ret + 4;
+ ret = spoe_handle_agentdiscon_frame(appctx, frame, ret);
}
switch (ret) {
case -1: /* error */
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
" - error on frame (%s)\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
__FUNCTION__, appctx,
- spoe_frm_err_reasons[spoe_status_code]);
+ spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code]);
goto exit;
case 0: /* ignore */
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
goto next;
case 1: /* retry */
goto stop;
default:
- if (framesz)
- bo_skip(si_oc(si), framesz+4);
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
- " - disconnected by peer (%d): %s\n",
+ " - disconnected by peer (%d): %.*s\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
- __FUNCTION__, appctx, spoe_status_code,
- spoe_reason);
+ __FUNCTION__, appctx, SPOE_APPCTX(appctx)->status_code,
+ SPOE_APPCTX(appctx)->rlen, SPOE_APPCTX(appctx)->reason);
goto exit;
}
next:
+ /* Do not forget to remove processed frame from the output buffer */
+ if (trash.len)
+ bo_skip(si_oc(appctx->owner), trash.len);
+
return 0;
stop:
return 1;
exit:
- if (SPOE_APPCTX(appctx)->status_code == SPOE_FRM_ERR_NONE)
- SPOE_APPCTX(appctx)->status_code = spoe_status_code;
appctx->st0 = SPOE_APPCTX_ST_EXIT;
return 0;
}
/* I/O Handler processing messages exchanged with the agent */
static void
-handle_spoe_applet(struct appctx *appctx)
+spoe_handle_appctx(struct appctx *appctx)
{
- struct stream_interface *si = appctx->owner;
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ struct stream_interface *si = appctx->owner;
+ struct spoe_agent *agent;
+
+ if (SPOE_APPCTX(appctx) == NULL)
+ return;
- spoe_status_code = SPOE_FRM_ERR_NONE;
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
+ agent = SPOE_APPCTX(appctx)->agent;
switchstate:
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
switch (appctx->st0) {
case SPOE_APPCTX_ST_CONNECT:
- if (handle_connect_spoe_applet(appctx))
+ if (spoe_handle_connect_appctx(appctx))
goto out;
goto switchstate;
case SPOE_APPCTX_ST_CONNECTING:
- if (handle_connecting_spoe_applet(appctx))
+ if (spoe_handle_connecting_appctx(appctx))
goto out;
goto switchstate;
if (stopping &&
LIST_ISEMPTY(&agent->sending_queue) &&
LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
- SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ SPOE_APPCTX(appctx)->task->expire =
+ tick_add_ifset(now_ms, agent->timeout.idle);
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
goto switchstate;
}
case SPOE_APPCTX_ST_PROCESSING:
case SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY:
case SPOE_APPCTX_ST_WAITING_SYNC_ACK:
- if (handle_processing_spoe_applet(appctx))
+ if (spoe_handle_processing_appctx(appctx))
goto out;
goto switchstate;
case SPOE_APPCTX_ST_DISCONNECT:
- if (handle_disconnect_spoe_applet(appctx))
+ if (spoe_handle_disconnect_appctx(appctx))
goto out;
goto switchstate;
case SPOE_APPCTX_ST_DISCONNECTING:
- if (handle_disconnecting_spoe_applet(appctx))
+ if (spoe_handle_disconnecting_appctx(appctx))
goto out;
goto switchstate;
case SPOE_APPCTX_ST_EXIT:
+ appctx->st0 = SPOE_APPCTX_ST_END;
+ SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
+
si_shutw(si);
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
- appctx->st0 = SPOE_APPCTX_ST_END;
- SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
/* fall through */
case SPOE_APPCTX_ST_END:
struct applet spoe_applet = {
.obj_type = OBJ_TYPE_APPLET,
.name = "<SPOE>", /* used for logging */
- .fct = handle_spoe_applet,
- .release = release_spoe_applet,
+ .fct = spoe_handle_appctx,
+ .release = spoe_release_appctx,
};
/* Create a SPOE applet. On success, the created applet is returned, else
* NULL. */
static struct appctx *
-create_spoe_appctx(struct spoe_config *conf)
+spoe_create_appctx(struct spoe_config *conf)
{
struct appctx *appctx;
struct session *sess;
goto out_free_spoe_appctx;
SPOE_APPCTX(appctx)->owner = appctx;
- SPOE_APPCTX(appctx)->task->process = process_spoe_applet;
+ SPOE_APPCTX(appctx)->task->process = spoe_process_appctx;
SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
SPOE_APPCTX(appctx)->task->context = appctx;
SPOE_APPCTX(appctx)->agent = conf->agent;
LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
SPOE_APPCTX(appctx)->buffer_wait.target = appctx;
- SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_appctx;
+ SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_appctx;
LIST_INIT(&SPOE_APPCTX(appctx)->list);
LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
}
static int
-queue_spoe_context(struct spoe_context *ctx)
+spoe_queue_context(struct spoe_context *ctx)
{
struct spoe_config *conf = FLT_CONF(ctx->filter);
struct spoe_agent *agent = conf->agent;
min_applets = min_applets_act(agent);
/* Check if we need to create a new SPOE applet or not. */
- if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate)
+ if (agent->applets_act >= min_applets &&
+ agent->applets_idle &&
+ agent->sending_rate)
goto end;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
}
}
- appctx = create_spoe_appctx(conf);
+ appctx = spoe_create_appctx(conf);
if (appctx == NULL) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to create SPOE appctx\n",
agent->sending_rate--;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - Add stream in sending queue - applets_act=%u - applets_idle=%u"
- " - sending_rate=%u\n",
+ " - Add stream in sending queue"
+ " - applets_act=%u - applets_idle=%u - sending_rate=%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
- ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate);
+ ctx->strm, agent->applets_act, agent->applets_idle,
+ agent->sending_rate);
/* Finally try to wakeup the first IDLE applet found and move it at the
* end of the list. */
list_for_each_entry(spoe_appctx, &agent->applets, list) {
appctx = spoe_appctx->owner;
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
- wakeup_spoe_appctx(appctx);
+ spoe_wakeup_appctx(appctx);
LIST_DEL(&spoe_appctx->list);
LIST_ADDQ(&agent->applets, &spoe_appctx->list);
break;
/***************************************************************************
* Functions that encode SPOE messages
**************************************************************************/
-static inline int
-encode_spoe_arg_string(struct spoe_context *ctx, struct sample *smp,
- char *p, size_t max_size)
-{
- struct chunk *chk = &smp->data.u.str;
- int idx = 0;
-
- /* Here, we need to know if the sample has already been partially
- * encoded. If yes, we only need to encode the remaining, <curoff>
- * reprensenting the number of bytes already encoded in previous
- * frames. Else, <curoff> == 0 */
-
- if (!ctx->frag_ctx.curoff) {
- /* First evaluation of the sample : encode the type (string or
- * binary) and check its size against <max_size> */
-
- /* the string/binary length must not exceed 4 Gb. So 5 bytes is
- * reserved to encode its size. */
- if (max_size < 6)
- return 0;
-
- p[idx++] = (smp->data.type == SMP_T_STR) ? SPOE_DATA_T_STR : SPOE_DATA_T_BIN;
- max_size -= (idx + 5);
-
- if (chk->len > max_size) {
- /* The sample is too big, we will fragment it. <curoff>
- * will be updated accordingly. */
- idx += encode_frag_spoe_string(chk->str, chk->len, max_size, p+idx);
- ctx->frag_ctx.curoff = max_size;
- }
- else {
- /* No fragmentation needed, all the sample is encoded
- * and <curoff> remains 0 */
- idx += encode_spoe_string(chk->str, chk->len, p+idx);
- }
- }
- else {
- /* Continue the sample fragmentation, the type was already set
- * in a previous frame. So just do a copy of data. */
-
- idx = chk->len - ctx->frag_ctx.curoff; /* Remaining data */
- if (idx > max_size) {
- /* The sample still needs to be fragmented. <curoff>
- * will be incremented accordingly. */
- memcpy(p, chk->str + ctx->frag_ctx.curoff, max_size);
- idx = max_size;
- ctx->frag_ctx.curoff += max_size;
- }
- else {
- /* Finish the fragmentation. <curoff> will be reset. */
- memcpy(p, chk->str + ctx->frag_ctx.curoff, idx);
- ctx->frag_ctx.curoff = 0;
- }
- }
- return idx;
-}
-
-static inline int
-encode_spoe_arg_method(struct spoe_context *ctx, struct sample *smp,
- char *p, size_t max_size)
-{
- int idx = 0;
-
- /* method length must not exceed 2288 bytes. So 3 bytes is reserved to
- * encode its size. */
-
- if (smp->data.u.meth.meth != HTTP_METH_OTHER) {
- const struct http_method_name *meth =
- &http_known_methods[smp->data.u.meth.meth];
-
- if (meth->len + 3 > max_size)
- return 0;
- p[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(meth->name, meth->len, p+idx);
- }
- else {
- struct chunk *meth = &smp->data.u.meth.str;
-
- if (meth->len + 3 > max_size)
- return 0;
- p[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(meth->str, meth->len, p+idx);
- }
- return idx;
-}
-
-static inline int
-encode_spoe_arg_ipv6(struct spoe_context *ctx, struct sample *smp,
- char *p, size_t max_size)
-{
- int idx = 0;
-
- if (max_size < 17)
- return 0;
- p[idx++] = SPOE_DATA_T_IPV6;
- memcpy(p+idx, &smp->data.u.ipv6, 16);
- idx += 16;
- return idx;
-}
-
-
-static inline int
-encode_spoe_arg_ipv4(struct spoe_context *ctx, struct sample *smp,
- char *p, size_t max_size)
-{
- int idx = 0;
-
- if (max_size < 5)
- return 0;
- p[idx++] = SPOE_DATA_T_IPV4;
- memcpy(p+idx, &smp->data.u.ipv6, 4);
- idx += 4;
- return idx;
-}
-
-static inline int
-encode_spoe_arg_sint(struct spoe_context *ctx, struct sample *smp,
- char *p, size_t max_size)
-{
- int idx = 0;
-
- if (max_size < 9)
- return 0;
- p[idx++] = SPOE_DATA_T_INT64;
- idx += encode_spoe_varint(smp->data.u.sint, p+idx);
-
- return idx;
-}
-
-static inline int
-encode_spoe_arg_bool(struct spoe_context *ctx, struct sample *smp,
- char *p, size_t max_size)
-{
- int flag, idx = 0;
-
- if (max_size < 1)
- return 0;
- flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
- p[idx++] = (SPOE_DATA_T_BOOL | flag);
-
- return idx;
-}
-
-/* Encode SPOE messages for a specific event.
- *
- *
- * It returns 0 if During the processing, it returns
- * 0 and it returns 1 when the processing is finished. If an error occurred, -1
- * is returned. */
+/* Encode SPOE messages for a specific event. Info in <ctx->frag_ctx>, if any,
+ * are used to handle fragmented content. On success it returns 1. If an error
+ * occurred, -1 is returned. */
static int
-encode_spoe_messages(struct stream *s, struct spoe_context *ctx,
+spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
struct list *messages, int dir)
{
struct spoe_config *conf = FLT_CONF(ctx->filter);
struct spoe_message *msg;
struct sample *smp;
struct spoe_arg *arg;
- char *p;
- size_t max_size;
- int r, idx = 0;
+ char *p, *end;
+ int ret;
- max_size = agent->frame_size - FRAME_HDR_SIZE;
-
- p = ctx->buffer->p;
+ p = ctx->buffer->p;
+ end = p + agent->frame_size - FRAME_HDR_SIZE;
/* Resume encoding of a SPOE message */
if (ctx->frag_ctx.curmsg != NULL) {
if (ctx->frag_ctx.curoff != UINT_MAX)
goto encode_msg_payload;
- /* <idx> + <string> + <nb-args>.
- * Implies <id_len> is encoded on 2 bytes, at most (< 2288). */
- if (idx + 3 + msg->id_len + 1 > max_size)
+ /* Check if there is enough space for the message name and the
+ * number of arguments. It implies <msg->id_len> is encoded on 2
+ * bytes, at most (< 2288). */
+ if (p + 2 + msg->id_len + 1 > end)
goto too_big;
- /* Set the message name */
- idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
+ /* Encode the message name */
+ if (spoe_encode_buffer(msg->id, msg->id_len, &p, end) == -1)
+ goto too_big;
- /* Store the number of arguments for this message */
- p[idx++] = msg->nargs;
+ /* Set the number of arguments for this message */
+ *p++ = msg->nargs;
ctx->frag_ctx.curoff = 0;
encode_msg_payload:
if (ctx->frag_ctx.curoff != UINT_MAX)
goto encode_arg_value;
- /* <idx> + <string>.
- * Implies <name_len> is encoded on 2 bytes, at most (< 2288). */
- if (idx + 3 + arg->name_len > max_size)
- goto too_big;
-
/* Encode the arguement name as a string. It can by NULL */
- idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
+ if (spoe_encode_buffer(arg->name, arg->name_len, &p, end) == -1)
+ goto too_big;
ctx->frag_ctx.curoff = 0;
encode_arg_value:
- if (idx + 1 > max_size)
- goto too_big;
-
/* Fetch the arguement value */
- smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
- if (!smp) {
- /* If no value is available, set it to NULL */
- p[idx++] = SPOE_DATA_T_NULL;
- continue;
- }
-
- /* Else, encode the arguement value */
- switch (smp->data.type) {
- case SMP_T_BOOL:
- if (!(r = encode_spoe_arg_bool(ctx, smp, p+idx, max_size-idx)))
- goto too_big;
- idx += r;
- break;
-
- case SMP_T_SINT:
- if (!(r = encode_spoe_arg_sint(ctx, smp, p+idx, max_size-idx)))
- goto too_big;
- idx += r;
- break;
-
- case SMP_T_IPV4:
- if (!(r = encode_spoe_arg_ipv4(ctx, smp, p+idx, max_size-idx)))
- goto too_big;
- idx += r;
- break;
-
- case SMP_T_IPV6:
- if (!(r = encode_spoe_arg_ipv6(ctx, smp, p+idx, max_size-idx)))
- goto too_big;
- idx += r;
- break;
-
- case SMP_T_STR:
- case SMP_T_BIN:
- idx += encode_spoe_arg_string(ctx, smp, p+idx, max_size-idx);
- if (ctx->frag_ctx.curoff)
- goto too_big;
- break;
-
- case SMP_T_METH:
- if (!(r = encode_spoe_arg_method(ctx, smp, p+idx, max_size-idx)))
- goto too_big;
- idx += r;
- break;
-
- default:
- p[idx++] = SPOE_DATA_T_NULL;
- }
+ smp = sample_process(s->be, s->sess, s,
+ dir|SMP_OPT_FINAL, arg->expr, NULL);
+ ret = spoe_encode_data(smp, &ctx->frag_ctx.curoff, &p, end);
+ if (ret == -1 || ctx->frag_ctx.curoff)
+ goto too_big;
}
}
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - encode %s messages - spoe_appctx=%p - max_size=%lu - idx=%u\n",
+ " - encode %s messages - spoe_appctx=%p"
+ "- max_size=%u - encoded=%ld\n",
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s,
((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
- ctx->frag_ctx.spoe_appctx, max_size, idx);
+ ctx->frag_ctx.spoe_appctx, (agent->frame_size - FRAME_HDR_SIZE),
+ p - ctx->buffer->p);
- ctx->buffer->i = idx;
+ ctx->buffer->i = p - ctx->buffer->p;
ctx->frag_ctx.curmsg = NULL;
ctx->frag_ctx.curarg = NULL;
ctx->frag_ctx.curoff = 0;
// return -1;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - encode fragmented messages - spoe_appctx=%p - curmsg=%p - curarg=%p - curoff=%u"
- " - max_size=%lu - idx=%u\n",
+ " - encode fragmented messages - spoe_appctx=%p"
+ " - curmsg=%p - curarg=%p - curoff=%u"
+ " - max_size=%u - encoded=%ld\n",
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx,
ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
- max_size, idx);
+ (agent->frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
- ctx->buffer->i = idx;
+ ctx->buffer->i = p - ctx->buffer->p;
ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
ctx->frag_ctx.flags &= ~SPOE_FRM_FL_FIN;
return 1;
**************************************************************************/
/* Helper function to set a variable */
static void
-set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
+spoe_set_var(struct spoe_context *ctx, char *scope, char *name, int len,
struct sample *smp)
{
struct spoe_config *conf = FLT_CONF(ctx->filter);
/* Helper function to unset a variable */
static void
-unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
+spoe_unset_var(struct spoe_context *ctx, char *scope, char *name, int len,
struct sample *smp)
{
struct spoe_config *conf = FLT_CONF(ctx->filter);
}
-/* Process SPOE actions for a specific event. During the processing, it returns
- * 0 and it returns 1 when the processing is finished. If an error occurred, -1
- * is returned. */
-static int
-process_spoe_actions(struct stream *s, struct spoe_context *ctx,
- enum spoe_event ev, int dir)
+static inline int
+spoe_decode_action_set_var(struct stream *s, struct spoe_context *ctx,
+ char **buf, char *end, int dir)
{
- char *p;
- size_t size;
- int off, i, idx = 0;
+ char *str, *scope, *p = *buf;
+ struct sample smp;
+ uint64_t sz;
+ int ret;
- p = ctx->buffer->p;
- size = ctx->buffer->i;
+ if (p + 2 >= end)
+ goto skip;
- while (idx < size) {
- char *str;
- uint64_t sz;
- struct sample smp;
- enum spoe_action_type type;
+ /* SET-VAR requires 3 arguments */
+ if (*p++ != 3)
+ goto skip;
- off = idx;
- if (idx+2 > size)
- goto skip;
+ switch (*p++) {
+ case SPOE_SCOPE_PROC: scope = "proc"; break;
+ case SPOE_SCOPE_SESS: scope = "sess"; break;
+ case SPOE_SCOPE_TXN : scope = "txn"; break;
+ case SPOE_SCOPE_REQ : scope = "req"; break;
+ case SPOE_SCOPE_RES : scope = "res"; break;
+ default: goto skip;
+ }
- type = p[idx++];
- switch (type) {
- case SPOE_ACT_T_SET_VAR: {
- char *scope;
-
- if (p[idx++] != 3)
- goto skip_action;
-
- switch (p[idx++]) {
- case SPOE_SCOPE_PROC: scope = "proc"; break;
- case SPOE_SCOPE_SESS: scope = "sess"; break;
- case SPOE_SCOPE_TXN : scope = "txn"; break;
- case SPOE_SCOPE_REQ : scope = "req"; break;
- case SPOE_SCOPE_RES : scope = "res"; break;
- default: goto skip;
- }
+ if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
+ goto skip;
+ memset(&smp, 0, sizeof(smp));
+ smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
- idx += decode_spoe_string(p+idx, p+size, &str, &sz);
- if (str == NULL)
- goto skip;
- memset(&smp, 0, sizeof(smp));
- smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+ if (spoe_decode_data(&p, end, &smp) == -1)
+ goto skip;
- if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
- goto skip;
- idx += i;
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - set-var '%s.%s.%.*s'\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
+ __FUNCTION__, s, scope,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
+ (int)sz, str);
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - set-var '%s.%s.%.*s'\n",
- (int)now.tv_sec, (int)now.tv_usec,
- ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
- __FUNCTION__, s, scope,
- ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
- (int)sz, str);
+ spoe_set_var(ctx, scope, str, sz, &smp);
- set_spoe_var(ctx, scope, str, sz, &smp);
- break;
- }
+ ret = (p - *buf);
+ *buf = p;
+ return ret;
+ skip:
+ return 0;
+}
- case SPOE_ACT_T_UNSET_VAR: {
- char *scope;
+static inline int
+spoe_decode_action_unset_var(struct stream *s, struct spoe_context *ctx,
+ char **buf, char *end, int dir)
+{
+ char *str, *scope, *p = *buf;
+ struct sample smp;
+ uint64_t sz;
+ int ret;
- if (p[idx++] != 2)
- goto skip_action;
+ if (p + 2 >= end)
+ goto skip;
- switch (p[idx++]) {
- case SPOE_SCOPE_PROC: scope = "proc"; break;
- case SPOE_SCOPE_SESS: scope = "sess"; break;
- case SPOE_SCOPE_TXN : scope = "txn"; break;
- case SPOE_SCOPE_REQ : scope = "req"; break;
- case SPOE_SCOPE_RES : scope = "res"; break;
- default: goto skip;
- }
+ /* UNSET-VAR requires 2 arguments */
+ if (*p++ != 2)
+ goto skip;
- idx += decode_spoe_string(p+idx, p+size, &str, &sz);
- if (str == NULL)
- goto skip;
- memset(&smp, 0, sizeof(smp));
- smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+ switch (*p++) {
+ case SPOE_SCOPE_PROC: scope = "proc"; break;
+ case SPOE_SCOPE_SESS: scope = "sess"; break;
+ case SPOE_SCOPE_TXN : scope = "txn"; break;
+ case SPOE_SCOPE_REQ : scope = "req"; break;
+ case SPOE_SCOPE_RES : scope = "res"; break;
+ default: goto skip;
+ }
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - unset-var '%s.%s.%.*s'\n",
- (int)now.tv_sec, (int)now.tv_usec,
- ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
- __FUNCTION__, s, scope,
- ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
- (int)sz, str);
+ if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
+ goto skip;
+ memset(&smp, 0, sizeof(smp));
+ smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - unset-var '%s.%s.%.*s'\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
+ __FUNCTION__, s, scope,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
+ (int)sz, str);
+
+ spoe_unset_var(ctx, scope, str, sz, &smp);
+
+ ret = (p - *buf);
+ *buf = p;
+ return ret;
+ skip:
+ return 0;
+}
- unset_spoe_var(ctx, scope, str, sz, &smp);
+/* Process SPOE actions for a specific event. It returns 1 on success. If an
+ * error occurred, 0 is returned. */
+static int
+spoe_process_actions(struct stream *s, struct spoe_context *ctx,
+ enum spoe_event ev, int dir)
+{
+ char *p, *end;
+ int ret;
+
+ p = ctx->buffer->p;
+ end = p + ctx->buffer->i;
+
+ while (p < end) {
+ enum spoe_action_type type;
+
+ type = *p++;
+ switch (type) {
+ case SPOE_ACT_T_SET_VAR:
+ ret = spoe_decode_action_set_var(s, ctx, &p, end, dir);
+ if (!ret)
+ goto skip;
break;
- }
- default:
- skip_action:
- if ((i = skip_spoe_action(p+off, p+size)) == -1)
+ case SPOE_ACT_T_UNSET_VAR:
+ ret = spoe_decode_action_unset_var(s, ctx, &p, end, dir);
+ if (!ret)
goto skip;
- idx += i;
+ break;
+
+ default:
+ goto skip;
}
}
* Functions that process SPOE events
**************************************************************************/
static inline int
-start_event_processing(struct spoe_context *ctx, int dir)
+spoe_start_event_processing(struct spoe_context *ctx, int dir)
{
/* If a process is already started for this SPOE context, retry
* later. */
}
static inline void
-stop_event_processing(struct spoe_context *ctx)
+spoe_stop_event_processing(struct spoe_context *ctx)
{
struct spoe_appctx *sa = ctx->frag_ctx.spoe_appctx;
if (sa) {
sa->frag_ctx.ctx = NULL;
- wakeup_spoe_appctx(sa->owner);
+ spoe_wakeup_appctx(sa->owner);
}
/* Reset the flag to allow next processing */
/* Reset processing timer */
ctx->process_exp = TICK_ETERNITY;
- release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
+ spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
ctx->frag_ctx.spoe_appctx = NULL;
ctx->frag_ctx.curmsg = NULL;
* returns 0 and it returns 1 when the processing is finished. If an error
* occurred, -1 is returned. */
static int
-process_spoe_event(struct stream *s, struct spoe_context *ctx,
+spoe_process_event(struct stream *s, struct spoe_context *ctx,
enum spoe_event ev)
{
struct spoe_config *conf = FLT_CONF(ctx->filter);
s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
ctx->process_exp);
}
- ret = start_event_processing(ctx, dir);
+ ret = spoe_start_event_processing(ctx, dir);
if (!ret)
goto out;
- if (queue_spoe_context(ctx) < 0)
+ if (spoe_queue_context(ctx) < 0)
goto error;
ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
}
if (ctx->state == SPOE_CTX_ST_ENCODING_MSGS) {
- if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait))
+ if (!spoe_acquire_buffer(&ctx->buffer, &ctx->buffer_wait))
goto out;
- ret = encode_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
+ ret = spoe_encode_messages(s, ctx, &(ctx->messages[ev]), dir);
if (ret < 0)
goto error;
ctx->state = SPOE_CTX_ST_SENDING_MSGS;
if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
if (ctx->frag_ctx.spoe_appctx)
- wakeup_spoe_appctx(ctx->frag_ctx.spoe_appctx->owner);
+ spoe_wakeup_appctx(ctx->frag_ctx.spoe_appctx->owner);
ret = 0;
goto out;
}
}
if (ctx->state == SPOE_CTX_ST_DONE) {
- ret = process_spoe_actions(s, ctx, ev, dir);
- if (!ret)
- goto skip;
+ spoe_process_actions(s, ctx, ev, dir);
+ ret = 1;
ctx->frame_id++;
ctx->state = SPOE_CTX_ST_READY;
goto end;
smp.data.u.sint = ctx->status_code;
smp.data.type = SMP_T_BOOL;
- set_spoe_var(ctx, "txn", agent->var_on_error,
+ spoe_set_var(ctx, "txn", agent->var_on_error,
strlen(agent->var_on_error), &smp);
}
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
ret = 1;
end:
- stop_event_processing(ctx);
+ spoe_stop_event_processing(ctx);
return ret;
}
* Functions that create/destroy SPOE contexts
**************************************************************************/
static int
-acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
+spoe_acquire_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
if (*buf != &buf_empty)
return 1;
}
static void
-release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
+spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
if (!LIST_ISEMPTY(&buffer_wait->list)) {
LIST_DEL(&buffer_wait->list);
}
static int
-wakeup_spoe_context(struct spoe_context *ctx)
+spoe_wakeup_context(struct spoe_context *ctx)
{
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
return 1;
}
static struct spoe_context *
-create_spoe_context(struct filter *filter)
+spoe_create_context(struct filter *filter)
{
struct spoe_config *conf = FLT_CONF(filter);
struct spoe_context *ctx;
ctx->buffer = &buf_empty;
LIST_INIT(&ctx->buffer_wait.list);
ctx->buffer_wait.target = ctx;
- ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
+ ctx->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_context;
LIST_INIT(&ctx->list);
ctx->stream_id = 0;
}
static void
-destroy_spoe_context(struct spoe_context *ctx)
+spoe_destroy_context(struct spoe_context *ctx)
{
if (!ctx)
return;
- if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
- LIST_DEL(&ctx->buffer_wait.list);
- if (!LIST_ISEMPTY(&ctx->list))
- LIST_DEL(&ctx->list);
+ spoe_stop_event_processing(ctx);
pool_free2(pool2_spoe_ctx, ctx);
}
static void
-reset_spoe_context(struct spoe_context *ctx)
+spoe_reset_context(struct spoe_context *ctx)
{
ctx->state = SPOE_CTX_ST_READY;
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
**************************************************************************/
/* Signal handler: Do a soft stop, wakeup SPOE applet */
static void
-sig_stop_spoe(struct sig_handler *sh)
+spoe_sig_stop(struct sig_handler *sh)
{
struct proxy *p;
agent = conf->agent;
list_for_each_entry(spoe_appctx, &agent->applets, list) {
- wakeup_spoe_appctx(spoe_appctx->owner);
+ spoe_wakeup_appctx(spoe_appctx->owner);
}
}
p = p->next;
conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
if (!sighandler_registered) {
- signal_register_fct(0, sig_stop_spoe, 0);
+ signal_register_fct(0, spoe_sig_stop, 0);
sighandler_registered = 1;
}
if (conf) {
struct spoe_agent *agent = conf->agent;
- struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
- struct listener *, by_fe);
- free(l);
- release_spoe_agent(agent);
+ spoe_release_agent(agent);
free(conf);
}
fconf->conf = NULL;
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, s);
- ctx = create_spoe_context(filter);
+ ctx = spoe_create_context(filter);
if (ctx == NULL) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to create SPOE context\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_config *)FLT_CONF(filter))->agent->id,
__FUNCTION__, s);
- destroy_spoe_context(filter->ctx);
+ spoe_destroy_context(filter->ctx);
}
if (tick_is_expired(ctx->process_exp, now_ms)) {
s->pending_events |= TASK_WOKEN_MSG;
- release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
+ spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
}
}
goto out;
ctx->stream_id = s->uniq_id;
- ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
+ ret = spoe_process_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
if (!ret)
goto out;
ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
goto out;
- ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
+ ret = spoe_process_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
if (!ret) {
channel_dont_read(chn);
channel_dont_close(chn);
switch (an_bit) {
case AN_REQ_INSPECT_FE:
- ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
+ ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
break;
case AN_REQ_INSPECT_BE:
- ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
+ ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
break;
case AN_RES_INSPECT:
- ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
+ ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_RSP);
break;
case AN_REQ_HTTP_PROCESS_FE:
- ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
+ ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
break;
case AN_REQ_HTTP_PROCESS_BE:
- ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
+ ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
break;
case AN_RES_HTTP_PROCESS_FE:
- ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
+ ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
break;
}
__FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
- reset_spoe_context(ctx);
+ spoe_reset_context(ctx);
}
return 1;
unsigned timeout;
if (!*args[1]) {
- Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
+ Alert("parsing [%s:%d] : 'timeout' expects 'hello', 'idle' and 'processing'.\n",
file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
else if (!strcmp(args[1], "processing"))
tv = &curagent->timeout.processing;
else {
- Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
+ Alert("parsing [%s:%d] : 'timeout' supports 'hello', 'idle' or 'processing' (got %s).\n",
file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
conf->agent = curagent;
list_for_each_entry_safe(mp, mpback, &curmps, list) {
LIST_DEL(&mp->list);
- release_spoe_msg_placeholder(mp);
+ spoe_release_msg_placeholder(mp);
}
list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
px->id, msg->id, msg->conf.file, msg->conf.line);
LIST_DEL(&msg->list);
- release_spoe_message(msg);
+ spoe_release_message(msg);
}
*cur_arg = pos;
return 0;
error:
- release_spoe_agent(curagent);
+ spoe_release_agent(curagent);
list_for_each_entry_safe(mp, mpback, &curmps, list) {
LIST_DEL(&mp->list);
- release_spoe_msg_placeholder(mp);
+ spoe_release_msg_placeholder(mp);
}
list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
LIST_DEL(&msg->list);
- release_spoe_message(msg);
+ spoe_release_message(msg);
}
free(conf);
return -1;