]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: spoe: Add support for fragmentation capability in the SPOA example
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 2 Feb 2017 09:14:36 +0000 (10:14 +0100)
committerWilly Tarreau <w@1wt.eu>
Thu, 9 Mar 2017 14:32:55 +0000 (15:32 +0100)
This is just an example. So be careful to not send really huge payload because
it would eat all your memory.

contrib/spoa_example/spoa.c

index 5c3a4538e265d828f9600aedc3218efee7de65f3..8e234b58fd723ccc8cdb3ac3163d3068ee442658 100644 (file)
@@ -98,6 +98,9 @@ enum spoe_frame_error {
        SPOE_FRM_ERR_NO_CAP,
        SPOE_FRM_ERR_BAD_VSN,
        SPOE_FRM_ERR_BAD_FRAME_SIZE,
+       SPOE_FRM_ERR_FRAG_NOT_SUPPORTED,
+       SPOE_FRM_ERR_INTERLACED_FRAMES,
+       SPOE_FRM_ERR_RES,
        SPOE_FRM_ERR_UNKNOWN = 99,
        SPOE_FRM_ERRS,
 };
@@ -131,6 +134,10 @@ enum spoa_frame_type {
        SPOA_FRM_T_AGENT,
 };
 
+/* Flags set on the SPOE frame */
+#define SPOE_FRM_FL_FIN         0x00000001
+#define SPOE_FRM_FL_ABRT        0x00000002
+
 /* Masks to get data type or flags value */
 #define SPOE_DATA_T_MASK  0x0F
 #define SPOE_DATA_FL_MASK 0xF0
@@ -157,8 +164,10 @@ struct spoe_frame {
 
        unsigned int         stream_id;
        unsigned int         frame_id;
-       bool                 hcheck;    /* true is the CONNECT frame is a healthcheck */
-       int                  ip_score;  /* -1 if unset, else between 0 and 100 */
+       unsigned int         flags;
+       bool                 hcheck;     /* true is the CONNECT frame is a healthcheck */
+       bool                 fragmented; /* true if the frame is fragmented */
+       int                  ip_score;   /* -1 if unset, else between 0 and 100 */
 
        struct event         process_frame_event;
        struct worker       *worker;
@@ -166,6 +175,9 @@ struct spoe_frame {
        struct client       *client;
        struct list          list;
 
+       char                *frag_buf; /* used to accumulate payload of a fragmented frame */
+       unsigned int         frag_len;
+
        char                 data[0];
 };
 
@@ -190,6 +202,7 @@ struct client {
        struct spoe_engine *engine;
        bool                pipelining;
        bool                async;
+       bool                fragmentation;
 
        struct worker      *worker;
        struct list         by_worker;
@@ -244,20 +257,24 @@ struct timeval        processing_delay = {0, 0};
 static bool           debug            = false;
 static bool           pipelining       = false;
 static bool           async            = false;
+static bool           fragmentation    = false;
 
 
 static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
-       [SPOE_FRM_ERR_NONE]           = "normal",
-       [SPOE_FRM_ERR_IO]             = "I/O error",
-       [SPOE_FRM_ERR_TOUT]           = "a timeout occurred",
-       [SPOE_FRM_ERR_TOO_BIG]        = "frame is too big",
-       [SPOE_FRM_ERR_INVALID]        = "invalid frame received",
-       [SPOE_FRM_ERR_NO_VSN]         = "version value not found",
-       [SPOE_FRM_ERR_NO_FRAME_SIZE]  = "max-frame-size value not found",
-       [SPOE_FRM_ERR_NO_CAP]         = "capabilities value not found",
-       [SPOE_FRM_ERR_BAD_VSN]        = "unsupported version",
-       [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
-       [SPOE_FRM_ERR_UNKNOWN]        = "an unknown error occurred",
+       [SPOE_FRM_ERR_NONE]               = "normal",
+       [SPOE_FRM_ERR_IO]                 = "I/O error",
+       [SPOE_FRM_ERR_TOUT]               = "a timeout occurred",
+       [SPOE_FRM_ERR_TOO_BIG]            = "frame is too big",
+       [SPOE_FRM_ERR_INVALID]            = "invalid frame received",
+       [SPOE_FRM_ERR_NO_VSN]             = "version value not found",
+       [SPOE_FRM_ERR_NO_FRAME_SIZE]      = "max-frame-size value not found",
+       [SPOE_FRM_ERR_NO_CAP]             = "capabilities value not found",
+       [SPOE_FRM_ERR_BAD_VSN]            = "unsupported version",
+       [SPOE_FRM_ERR_BAD_FRAME_SIZE]     = "max-frame-size too big or too small",
+       [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
+       [SPOE_FRM_ERR_INTERLACED_FRAMES]  = "invalid interlaced frames",
+       [SPOE_FRM_ERR_RES]                = "resource allocation error",
+       [SPOE_FRM_ERR_UNKNOWN]            = "an unknown error occurred",
 };
 
 static void signal_cb(evutil_socket_t, short, void *);
@@ -640,6 +657,15 @@ check_capabilities(struct spoe_frame *frame, int idx)
                                client->async = true;
                        }
                }
+               else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) {
+                       i += 5;
+                       if (sz == i || isspace(str[i]) || str[i] == ',') {
+                               DEBUG(frame->worker,
+                                     "<%lu> HAProxy supports fragmented frame",
+                                     client->id);
+                               client->fragmentation = true;
+                       }
+               }
 
                if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
                        break;
@@ -740,9 +766,16 @@ handle_hahello(struct spoe_frame *frame)
 
        DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id);
 
-       /* Skip flags */
+       /* Retrieve flags */
+       memcpy((char *)&(frame->flags), buf+idx, 4);
        idx += 4;
 
+       /* Fragmentation is not supported for HELLO frame */
+       if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+               client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+               goto error;
+       }
+
        /* stream-id and frame-id must be cleared */
        if (buf[idx] != 0 || buf[idx+1] != 0) {
                client->status_code = SPOE_FRM_ERR_INVALID;
@@ -846,9 +879,16 @@ handle_hadiscon(struct spoe_frame *frame)
 
        DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id);
 
-       /* Skip flags */
+       /* Retrieve flags */
+       memcpy((char *)&(frame->flags), buf+idx, 4);
        idx += 4;
 
+       /* Fragmentation is not supported for DISCONNECT frame */
+       if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+               client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+               goto error;
+       }
+
        /* stream-id and frame-id must be cleared */
        if (buf[idx] != 0 || buf[idx+1] != 0) {
                client->status_code = SPOE_FRM_ERR_INVALID;
@@ -906,8 +946,8 @@ handle_hadiscon(struct spoe_frame *frame)
 }
 
 /* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
- * occurred or if the frame must be ignored, 0 if the frame must be ack without
- * any processing, otherwise the number of read bytes (always > 0). */
+ * occurred, 0 if it must be must be ignored, otherwise the number of read
+ * bytes. */
 static int
 handle_hanotify(struct spoe_frame *frame)
 {
@@ -923,9 +963,16 @@ handle_hanotify(struct spoe_frame *frame)
 
        DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id);
 
-       /* Skip flags */
+       /* Retrieve flags */
+       memcpy((char *)&(frame->flags), buf+idx, 4);
        idx += 4;
 
+       /* Fragmentation is not supported for DISCONNECT frame */
+       if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
+               client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+               goto error;
+       }
+
        /* Read the stream-id */
        if ((i = decode_spoe_varint(buf+idx, end, &stream_id)) == -1)
                goto ignore;
@@ -936,20 +983,49 @@ handle_hanotify(struct spoe_frame *frame)
                goto ignore;
        idx += i;
 
-       frame->stream_id = (unsigned int)stream_id;
-       frame->frame_id  = (unsigned int)frame_id;
+       if (frame->fragmented == true) {
+               if (frame->stream_id != (unsigned int)stream_id ||
+                   frame->frame_id  != (unsigned int)frame_id) {
+                       client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
+                       goto error;
+               }
+
+               if (frame->flags & SPOE_FRM_FL_ABRT) {
+                       DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+                             " - Abort processing of a fragmented frame"
+                             " - frag_len=%u - len=%u - offset=%u",
+                             client->id, frame->stream_id, frame->frame_id,
+                             frame->frag_len, frame->len, idx);
+                       goto ignore;
+               }
+               DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+                     " - %s fragment of a fragmented frame received"
+                     " - frag_len=%u - len=%u - offset=%u",
+                     client->id, frame->stream_id, frame->frame_id,
+                     (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
+                     frame->frag_len, frame->len, idx);
+       }
+       else {
+               frame->stream_id = (unsigned int)stream_id;
+               frame->frame_id  = (unsigned int)frame_id;
 
-       DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u",
-             client->id, frame->stream_id, frame->frame_id);
+               DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+                     " - %s frame received"
+                     " - frag_len=%u - len=%u - offset=%u",
+                     client->id, frame->stream_id, frame->frame_id,
+                     (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
+                     frame->frag_len, frame->len, idx);
 
-       if (buf + idx == end) {
-               return 0;
+               frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
        }
 
        frame->offset = idx;
        return idx;
 
   ignore:
+       return 0;
+
+  error:
        return -1;
 }
 
@@ -960,7 +1036,9 @@ prepare_agenthello(struct spoe_frame *frame)
 {
        struct client *client = frame->client;
        char          *buf    = frame->buf;
-       int            idx    = 0;
+       char           capabilities[64];
+       int            n, idx = 0;
+       unsigned int   flags  = SPOE_FRM_FL_FIN;
 
        DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id);
        frame->type = SPOA_FRM_T_AGENT;
@@ -968,8 +1046,8 @@ prepare_agenthello(struct spoe_frame *frame)
        /* Frame Type */
        buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
 
-       /* No flags for now */
-       memset(buf+idx, 0, 4); /* No flags */
+       /* Set flags */
+       memcpy(buf+idx, (char *)&flags, 4);
        idx += 4;
 
        /* No stream-id and frame-id for HELLO frames */
@@ -994,18 +1072,41 @@ prepare_agenthello(struct spoe_frame *frame)
        /* "capabilities" K/V item */
        idx += encode_spoe_string("capabilities", 12, buf+idx);
        buf[idx++] = SPOE_DATA_T_STR;
-       if (client->pipelining == true && client->async == true)
-               idx += encode_spoe_string("pipelining,async", 16, buf+idx);
-       else if (client->pipelining == true)
-               idx += encode_spoe_string("pipelining", 10, buf+idx);
-       else if (client->async == true)
-               idx += encode_spoe_string("async", 5, buf+idx);
+
+       memset(capabilities, 0, sizeof(capabilities));
+       n = 0;
+
+       /*     1. Fragmentation capability ? */
+       if (fragmentation == true) {
+               memcpy(capabilities, "fragmentation", 13);
+               n += 13;
+       }
+       /*     2. Pipelining capability ? */
+       if (client->pipelining == true && n != 0) {
+               memcpy(capabilities + n, ", pipelining", 12);
+               n += 12;
+       }
+       else if (client->pipelining == true) {
+               memcpy(capabilities, "pipelining", 10);
+               n += 10;
+       }
+       /*     3. Async capability ? */
+       if (client->async == true && n != 0) {
+               memcpy(capabilities + n, ", async", 7);
+               n += 7;
+       }
+       else if (client->async == true) {
+               memcpy(capabilities, "async", 5);
+               n += 5;
+       }
+       /*     4. Encode capabilities string */
+       if (n != 0)
+               idx += encode_spoe_string(capabilities, n, buf+idx);
        else
                idx += encode_spoe_string(NULL, 0, buf+idx);
 
-       DEBUG(frame->worker, "<%lu> Agent capabilities : %s %s",
-             client->id, (client->pipelining?"pipelining":""),
-             (client->async?"async":""));
+       DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s",
+             client->id, n, capabilities);
 
        frame->len = idx;
        return idx;
@@ -1020,6 +1121,7 @@ prepare_agentdicon(struct spoe_frame *frame)
        char           *buf   = frame->buf;
        const char     *reason;
        int             rlen, idx = 0;
+       unsigned int    flags  = SPOE_FRM_FL_FIN;
 
        DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id);
        frame->type = SPOA_FRM_T_AGENT;
@@ -1032,8 +1134,8 @@ prepare_agentdicon(struct spoe_frame *frame)
        /* Frame type */
        buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
 
-       /* No flags for now */
-       memset(buf+idx, 0, 4);
+       /* Set flags */
+       memcpy(buf+idx, (char *)&flags, 4);
        idx += 4;
 
        /* No stream-id and frame-id for DISCONNECT frames */
@@ -1065,8 +1167,9 @@ prepare_agentdicon(struct spoe_frame *frame)
 static int
 prepare_agentack(struct spoe_frame *frame)
 {
-       char *buf = frame->buf;
-       int   idx = 0;
+       char        *buf = frame->buf;
+       int          idx = 0;
+       unsigned int flags  = SPOE_FRM_FL_FIN;
 
        /* Be careful here, in async mode, frame->client can be NULL */
 
@@ -1076,8 +1179,8 @@ prepare_agentack(struct spoe_frame *frame)
        /* Frame type */
        buf[idx++] = SPOE_FRM_T_AGENT_ACK;
 
-       /* No flags for now */
-       memset(buf+idx, 0, 4); /* No flags */
+       /* Set flags */
+       memcpy(buf+idx, (char *)&flags, 4);
        idx += 4;
 
        /* Set stream-id and frame-id for ACK frames */
@@ -1140,6 +1243,8 @@ release_frame(struct spoe_frame *frame)
 
        worker = frame->worker;
        LIST_DEL(&frame->list);
+       if (frame->frag_buf)
+               free(frame->frag_buf);
        memset(frame, 0, sizeof(*frame)+max_frame_size+4);
        LIST_ADDQ(&worker->frames, &frame->list);
 }
@@ -1186,14 +1291,21 @@ reset_frame(struct spoe_frame *frame)
        if (frame == NULL)
                return;
 
-       frame->type      = SPOA_FRM_T_UNKNOWN;
-       frame->buf       = (char *)(frame->data);
-       frame->offset    = 0;
-       frame->len       = 0;
-       frame->stream_id = 0;
-       frame->frame_id  = 0;
-       frame->hcheck    = false;
-       frame->ip_score  = -1;
+       if (frame->frag_buf)
+               free(frame->frag_buf);
+
+       frame->type       = SPOA_FRM_T_UNKNOWN;
+       frame->buf        = (char *)(frame->data);
+       frame->offset     = 0;
+       frame->len        = 0;
+       frame->stream_id  = 0;
+       frame->frame_id   = 0;
+       frame->flags      = 0;
+       frame->hcheck     = false;
+       frame->fragmented = false;
+       frame->ip_score   = -1;
+       frame->frag_buf   = NULL;
+       frame->frag_len   = 0;
        LIST_INIT(&frame->list);
 }
 
@@ -1414,8 +1526,8 @@ process_frame_cb(evutil_socket_t fd, short events, void *arg)
        int                idx    = frame->offset;
 
        DEBUG(frame->worker,
-             "Process frame messages : STREAM-ID=%u - FRAME-ID=%u",
-             frame->stream_id, frame->frame_id);
+             "Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes",
+             frame->stream_id, frame->frame_id, frame->len - frame->offset);
 
        /* Loop on messages */
        while (buf+idx < end) {
@@ -1471,7 +1583,10 @@ process_frame_cb(evutil_socket_t fd, short events, void *arg)
 
   stop_processing:
        /* Prepare agent ACK frame */
+       frame->buf    = (char *)(frame->data) + 4;
        frame->offset = 0;
+       frame->len    = 0;
+       frame->flags  = 0;
        idx = prepare_agentack(frame);
 
        if (frame->ip_score != -1) {
@@ -1547,20 +1662,19 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg)
                        goto write_frame;
 
                case SPOA_ST_PROCESSING:
-                       n = handle_hanotify(frame);
-                       if (n < 0 && frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
+                       if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
                                client->state = SPOA_ST_DISCONNECTING;
                                goto disconnecting;
                        }
+                       n = handle_hanotify(frame);
                        if (n < 0) {
-                               LOG(client->worker, "Ignore invalid or unknown frame");
-                               goto ignore_frame;
+                               LOG(client->worker, "Failed to decode frame: %s",
+                                   spoe_frm_err_reasons[client->status_code]);
+                               goto disconnect;
                        }
                        if (n == 0) {
-                               DEBUG(client->worker, "<%lu> No message found, ack it now",
-                                     client->id);
-                               prepare_agentack(frame);
-                               goto write_frame;
+                               LOG(client->worker, "Ignore invalid/unknown/aborted frame");
+                               goto ignore_frame;
                        }
                        else
                                goto process_frame;
@@ -1583,6 +1697,34 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg)
        return;
 
   process_frame:
+       if (frame->fragmented == true) {
+               char  *buf;
+               size_t len = frame->len - frame->offset;
+
+               buf = realloc(frame->frag_buf, frame->frag_len + len);
+               if (buf == NULL) {
+                       client->status_code = SPOE_FRM_ERR_RES;
+                       goto disconnect;
+               }
+               memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
+               frame->frag_buf  = buf;
+               frame->frag_len += len;
+
+               if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+                       /* Wait for next fragments */
+                       frame->buf    = (char *)(frame->data);
+                       frame->offset = 0;
+                       frame->len    = 0;
+                       frame->flags  = 0;
+                       return;
+               }
+
+               frame->buf    = frame->frag_buf;
+               frame->len    = frame->frag_len;
+               frame->offset = 0;
+               /* fall through */
+       }
+
        process_incoming_frame(frame);
        client->incoming_frame = NULL;
        return;
@@ -1829,7 +1971,7 @@ usage(char *prog)
                "                           but can be in any other unit if the number is suffixed\n"
                "                           by a unit (us, ms, s)\n"
                "\n"
-               "    Supported capabilities: pipelining, async\n",
+               "    Supported capabilities: fragmentation, pipelining, async\n",
                prog, MAX_FRAME_SIZE, DEFAULT_PORT, NUM_WORKERS);
 }
 
@@ -1863,6 +2005,8 @@ main(int argc, char **argv)
                                        pipelining = true;
                                else if (!strcmp(optarg, "async"))
                                        async = true;
+                               else if (!strcmp(optarg, "fragmentation"))
+                                       fragmentation = true;
                                else
                                        fprintf(stderr, "WARNING: unsupported capability '%s'\n", optarg);
                                break;
@@ -1964,8 +2108,8 @@ main(int argc, char **argv)
 
        DEBUG(&null_worker,
              "Server is ready"
-             " [pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
-             (pipelining?"true":"false"), (async?"true":"false"),
+             " [fragmentation=%s - pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
+             (fragmentation?"true":"false"), (pipelining?"true":"false"), (async?"true":"false"),
              (debug?"true":"false"), max_frame_size);
        event_base_dispatch(base);