SPOE_FRM_ERR_NO_CAP,
SPOE_FRM_ERR_BAD_VSN,
SPOE_FRM_ERR_BAD_FRAME_SIZE,
+ SPOE_FRM_ERR_FRAG_NOT_SUPPORTED,
+ SPOE_FRM_ERR_INTERLACED_FRAMES,
+ SPOE_FRM_ERR_RES,
SPOE_FRM_ERR_UNKNOWN = 99,
SPOE_FRM_ERRS,
};
SPOA_FRM_T_AGENT,
};
+/* Flags set on the SPOE frame */
+#define SPOE_FRM_FL_FIN 0x00000001
+#define SPOE_FRM_FL_ABRT 0x00000002
+
/* Masks to get data type or flags value */
#define SPOE_DATA_T_MASK 0x0F
#define SPOE_DATA_FL_MASK 0xF0
unsigned int stream_id;
unsigned int frame_id;
- bool hcheck; /* true is the CONNECT frame is a healthcheck */
- int ip_score; /* -1 if unset, else between 0 and 100 */
+ unsigned int flags;
+ bool hcheck; /* true is the CONNECT frame is a healthcheck */
+ bool fragmented; /* true if the frame is fragmented */
+ int ip_score; /* -1 if unset, else between 0 and 100 */
struct event process_frame_event;
struct worker *worker;
struct client *client;
struct list list;
+ char *frag_buf; /* used to accumulate payload of a fragmented frame */
+ unsigned int frag_len;
+
char data[0];
};
struct spoe_engine *engine;
bool pipelining;
bool async;
+ bool fragmentation;
struct worker *worker;
struct list by_worker;
static bool debug = false;
static bool pipelining = false;
static bool async = false;
+static bool fragmentation = false;
static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
- [SPOE_FRM_ERR_NONE] = "normal",
- [SPOE_FRM_ERR_IO] = "I/O error",
- [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
- [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
- [SPOE_FRM_ERR_INVALID] = "invalid frame received",
- [SPOE_FRM_ERR_NO_VSN] = "version value not found",
- [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
- [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
- [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
- [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
- [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
+ [SPOE_FRM_ERR_NONE] = "normal",
+ [SPOE_FRM_ERR_IO] = "I/O error",
+ [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
+ [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
+ [SPOE_FRM_ERR_INVALID] = "invalid frame received",
+ [SPOE_FRM_ERR_NO_VSN] = "version value not found",
+ [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
+ [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
+ [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
+ [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
+ [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
+ [SPOE_FRM_ERR_INTERLACED_FRAMES] = "invalid interlaced frames",
+ [SPOE_FRM_ERR_RES] = "resource allocation error",
+ [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
};
static void signal_cb(evutil_socket_t, short, void *);
client->async = true;
}
}
+ else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) {
+ i += 5;
+ if (sz == i || isspace(str[i]) || str[i] == ',') {
+ DEBUG(frame->worker,
+ "<%lu> HAProxy supports fragmented frame",
+ client->id);
+ client->fragmentation = true;
+ }
+ }
if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
break;
DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id);
- /* Skip flags */
+ /* Retrieve flags */
+ memcpy((char *)&(frame->flags), buf+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for HELLO frame */
+ if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+ client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ goto error;
+ }
+
/* stream-id and frame-id must be cleared */
if (buf[idx] != 0 || buf[idx+1] != 0) {
client->status_code = SPOE_FRM_ERR_INVALID;
DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id);
- /* Skip flags */
+ /* Retrieve flags */
+ memcpy((char *)&(frame->flags), buf+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for DISCONNECT frame */
+ if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+ client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ goto error;
+ }
+
/* stream-id and frame-id must be cleared */
if (buf[idx] != 0 || buf[idx+1] != 0) {
client->status_code = SPOE_FRM_ERR_INVALID;
}
/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
- * occurred or if the frame must be ignored, 0 if the frame must be ack without
- * any processing, otherwise the number of read bytes (always > 0). */
+ * occurred, 0 if it must be must be ignored, otherwise the number of read
+ * bytes. */
static int
handle_hanotify(struct spoe_frame *frame)
{
DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id);
- /* Skip flags */
+ /* Retrieve flags */
+ memcpy((char *)&(frame->flags), buf+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for DISCONNECT frame */
+ if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
+ client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ goto error;
+ }
+
/* Read the stream-id */
if ((i = decode_spoe_varint(buf+idx, end, &stream_id)) == -1)
goto ignore;
goto ignore;
idx += i;
- frame->stream_id = (unsigned int)stream_id;
- frame->frame_id = (unsigned int)frame_id;
+ if (frame->fragmented == true) {
+ if (frame->stream_id != (unsigned int)stream_id ||
+ frame->frame_id != (unsigned int)frame_id) {
+ client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
+ goto error;
+ }
+
+ if (frame->flags & SPOE_FRM_FL_ABRT) {
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - Abort processing of a fragmented frame"
+ " - frag_len=%u - len=%u - offset=%u",
+ client->id, frame->stream_id, frame->frame_id,
+ frame->frag_len, frame->len, idx);
+ goto ignore;
+ }
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - %s fragment of a fragmented frame received"
+ " - frag_len=%u - len=%u - offset=%u",
+ client->id, frame->stream_id, frame->frame_id,
+ (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
+ frame->frag_len, frame->len, idx);
+ }
+ else {
+ frame->stream_id = (unsigned int)stream_id;
+ frame->frame_id = (unsigned int)frame_id;
- DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u",
- client->id, frame->stream_id, frame->frame_id);
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - %s frame received"
+ " - frag_len=%u - len=%u - offset=%u",
+ client->id, frame->stream_id, frame->frame_id,
+ (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
+ frame->frag_len, frame->len, idx);
- if (buf + idx == end) {
- return 0;
+ frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
}
frame->offset = idx;
return idx;
ignore:
+ return 0;
+
+ error:
return -1;
}
{
struct client *client = frame->client;
char *buf = frame->buf;
- int idx = 0;
+ char capabilities[64];
+ int n, idx = 0;
+ unsigned int flags = SPOE_FRM_FL_FIN;
DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id);
frame->type = SPOA_FRM_T_AGENT;
/* Frame Type */
buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
- /* No flags for now */
- memset(buf+idx, 0, 4); /* No flags */
+ /* Set flags */
+ memcpy(buf+idx, (char *)&flags, 4);
idx += 4;
/* No stream-id and frame-id for HELLO frames */
/* "capabilities" K/V item */
idx += encode_spoe_string("capabilities", 12, buf+idx);
buf[idx++] = SPOE_DATA_T_STR;
- if (client->pipelining == true && client->async == true)
- idx += encode_spoe_string("pipelining,async", 16, buf+idx);
- else if (client->pipelining == true)
- idx += encode_spoe_string("pipelining", 10, buf+idx);
- else if (client->async == true)
- idx += encode_spoe_string("async", 5, buf+idx);
+
+ memset(capabilities, 0, sizeof(capabilities));
+ n = 0;
+
+ /* 1. Fragmentation capability ? */
+ if (fragmentation == true) {
+ memcpy(capabilities, "fragmentation", 13);
+ n += 13;
+ }
+ /* 2. Pipelining capability ? */
+ if (client->pipelining == true && n != 0) {
+ memcpy(capabilities + n, ", pipelining", 12);
+ n += 12;
+ }
+ else if (client->pipelining == true) {
+ memcpy(capabilities, "pipelining", 10);
+ n += 10;
+ }
+ /* 3. Async capability ? */
+ if (client->async == true && n != 0) {
+ memcpy(capabilities + n, ", async", 7);
+ n += 7;
+ }
+ else if (client->async == true) {
+ memcpy(capabilities, "async", 5);
+ n += 5;
+ }
+ /* 4. Encode capabilities string */
+ if (n != 0)
+ idx += encode_spoe_string(capabilities, n, buf+idx);
else
idx += encode_spoe_string(NULL, 0, buf+idx);
- DEBUG(frame->worker, "<%lu> Agent capabilities : %s %s",
- client->id, (client->pipelining?"pipelining":""),
- (client->async?"async":""));
+ DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s",
+ client->id, n, capabilities);
frame->len = idx;
return idx;
char *buf = frame->buf;
const char *reason;
int rlen, idx = 0;
+ unsigned int flags = SPOE_FRM_FL_FIN;
DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id);
frame->type = SPOA_FRM_T_AGENT;
/* Frame type */
buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
- /* No flags for now */
- memset(buf+idx, 0, 4);
+ /* Set flags */
+ memcpy(buf+idx, (char *)&flags, 4);
idx += 4;
/* No stream-id and frame-id for DISCONNECT frames */
static int
prepare_agentack(struct spoe_frame *frame)
{
- char *buf = frame->buf;
- int idx = 0;
+ char *buf = frame->buf;
+ int idx = 0;
+ unsigned int flags = SPOE_FRM_FL_FIN;
/* Be careful here, in async mode, frame->client can be NULL */
/* Frame type */
buf[idx++] = SPOE_FRM_T_AGENT_ACK;
- /* No flags for now */
- memset(buf+idx, 0, 4); /* No flags */
+ /* Set flags */
+ memcpy(buf+idx, (char *)&flags, 4);
idx += 4;
/* Set stream-id and frame-id for ACK frames */
worker = frame->worker;
LIST_DEL(&frame->list);
+ if (frame->frag_buf)
+ free(frame->frag_buf);
memset(frame, 0, sizeof(*frame)+max_frame_size+4);
LIST_ADDQ(&worker->frames, &frame->list);
}
if (frame == NULL)
return;
- frame->type = SPOA_FRM_T_UNKNOWN;
- frame->buf = (char *)(frame->data);
- frame->offset = 0;
- frame->len = 0;
- frame->stream_id = 0;
- frame->frame_id = 0;
- frame->hcheck = false;
- frame->ip_score = -1;
+ if (frame->frag_buf)
+ free(frame->frag_buf);
+
+ frame->type = SPOA_FRM_T_UNKNOWN;
+ frame->buf = (char *)(frame->data);
+ frame->offset = 0;
+ frame->len = 0;
+ frame->stream_id = 0;
+ frame->frame_id = 0;
+ frame->flags = 0;
+ frame->hcheck = false;
+ frame->fragmented = false;
+ frame->ip_score = -1;
+ frame->frag_buf = NULL;
+ frame->frag_len = 0;
LIST_INIT(&frame->list);
}
int idx = frame->offset;
DEBUG(frame->worker,
- "Process frame messages : STREAM-ID=%u - FRAME-ID=%u",
- frame->stream_id, frame->frame_id);
+ "Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes",
+ frame->stream_id, frame->frame_id, frame->len - frame->offset);
/* Loop on messages */
while (buf+idx < end) {
stop_processing:
/* Prepare agent ACK frame */
+ frame->buf = (char *)(frame->data) + 4;
frame->offset = 0;
+ frame->len = 0;
+ frame->flags = 0;
idx = prepare_agentack(frame);
if (frame->ip_score != -1) {
goto write_frame;
case SPOA_ST_PROCESSING:
- n = handle_hanotify(frame);
- if (n < 0 && frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
+ if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
client->state = SPOA_ST_DISCONNECTING;
goto disconnecting;
}
+ n = handle_hanotify(frame);
if (n < 0) {
- LOG(client->worker, "Ignore invalid or unknown frame");
- goto ignore_frame;
+ LOG(client->worker, "Failed to decode frame: %s",
+ spoe_frm_err_reasons[client->status_code]);
+ goto disconnect;
}
if (n == 0) {
- DEBUG(client->worker, "<%lu> No message found, ack it now",
- client->id);
- prepare_agentack(frame);
- goto write_frame;
+ LOG(client->worker, "Ignore invalid/unknown/aborted frame");
+ goto ignore_frame;
}
else
goto process_frame;
return;
process_frame:
+ if (frame->fragmented == true) {
+ char *buf;
+ size_t len = frame->len - frame->offset;
+
+ buf = realloc(frame->frag_buf, frame->frag_len + len);
+ if (buf == NULL) {
+ client->status_code = SPOE_FRM_ERR_RES;
+ goto disconnect;
+ }
+ memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
+ frame->frag_buf = buf;
+ frame->frag_len += len;
+
+ if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+ /* Wait for next fragments */
+ frame->buf = (char *)(frame->data);
+ frame->offset = 0;
+ frame->len = 0;
+ frame->flags = 0;
+ return;
+ }
+
+ frame->buf = frame->frag_buf;
+ frame->len = frame->frag_len;
+ frame->offset = 0;
+ /* fall through */
+ }
+
process_incoming_frame(frame);
client->incoming_frame = NULL;
return;
" but can be in any other unit if the number is suffixed\n"
" by a unit (us, ms, s)\n"
"\n"
- " Supported capabilities: pipelining, async\n",
+ " Supported capabilities: fragmentation, pipelining, async\n",
prog, MAX_FRAME_SIZE, DEFAULT_PORT, NUM_WORKERS);
}
pipelining = true;
else if (!strcmp(optarg, "async"))
async = true;
+ else if (!strcmp(optarg, "fragmentation"))
+ fragmentation = true;
else
fprintf(stderr, "WARNING: unsupported capability '%s'\n", optarg);
break;
DEBUG(&null_worker,
"Server is ready"
- " [pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
- (pipelining?"true":"false"), (async?"true":"false"),
+ " [fragmentation=%s - pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
+ (fragmentation?"true":"false"), (pipelining?"true":"false"), (async?"true":"false"),
(debug?"true":"false"), max_frame_size);
event_base_dispatch(base);