]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: spoe: Improve implementation of the payload fragmentation
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 17 Feb 2017 14:18:35 +0000 (15:18 +0100)
committerWilly Tarreau <w@1wt.eu>
Thu, 9 Mar 2017 14:32:55 +0000 (15:32 +0100)
Now, when a payload is fragmented, the first frame must define the frame type
and the followings must use the special type SPOE_FRM_T_UNSET. This way, it is
easy to know if a fragment is the first one or not. Of course, all frames must
still share the same stream-id and frame-id.

Update SPOA example accordingly.

contrib/spoa_example/spoa.c
include/types/spoe.h
src/flt_spoe.c

index e2459844adadbf0b4caea665b3011a0ced60e29d..7c6d50424cba72758b17bb162ce5a5b84d2808b1 100644 (file)
@@ -401,6 +401,42 @@ check_engine_id(struct spoe_frame *frame, char **buf, char *end)
        return ret;
 }
 
+static int
+acc_payload(struct spoe_frame *frame)
+{
+       struct client *client = frame->client;
+       char          *buf;
+       size_t         len = frame->len - frame->offset;
+       int            ret = frame->offset;
+
+       /* No need to accumulation payload */
+       if (frame->fragmented == false)
+               return ret;
+
+       buf = realloc(frame->frag_buf, frame->frag_len + len);
+       if (buf == NULL) {
+               client->status_code = SPOE_FRM_ERR_RES;
+               return -1;
+       }
+       memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
+       frame->frag_buf  = buf;
+       frame->frag_len += len;
+
+       if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+               /* Wait for next parts */
+               frame->buf    = (char *)(frame->data);
+               frame->offset = 0;
+               frame->len    = 0;
+               frame->flags  = 0;
+               return 1;
+       }
+
+       frame->buf    = frame->frag_buf;
+       frame->len    = frame->frag_len;
+       frame->offset = 0;
+       return ret;
+}
+
 /* Check disconnect status code. It returns -1 if an error occurred, the number
  * of read bytes otherwise. */
 static int
@@ -454,6 +490,8 @@ check_discon_message(struct spoe_frame *frame, char **buf, char *end)
        return ret;
 }
 
+
+
 /* Decode a HELLO frame received from HAProxy. It returns -1 if an error
  * occurred, otherwise the number of read bytes. HELLO frame cannot be
  * ignored and having another frame than a HELLO frame is an error. */
@@ -664,7 +702,7 @@ handle_hanotify(struct spoe_frame *frame)
        memcpy((char *)&(frame->flags), p, 4);
        p += 4;
 
-       /* Fragmentation is not supported for DISCONNECT frame */
+       /* Fragmentation is not supported */
        if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
                client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
                goto error;
@@ -676,44 +714,87 @@ handle_hanotify(struct spoe_frame *frame)
        if (spoe_decode_varint(&p, end, &frame_id) == -1)
                goto ignore;
 
-       if (frame->fragmented == true) {
-               if (frame->stream_id != (unsigned int)stream_id ||
-                   frame->frame_id  != (unsigned int)frame_id) {
-                       client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
-                       goto error;
-               }
+       frame->stream_id = (unsigned int)stream_id;
+       frame->frame_id  = (unsigned int)frame_id;
 
-               if (frame->flags & SPOE_FRM_FL_ABRT) {
-                       DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
-                             " - Abort processing of a fragmented frame"
-                             " - frag_len=%u - len=%u - offset=%ld",
-                             client->id, frame->stream_id, frame->frame_id,
-                             frame->frag_len, frame->len, p - frame->buf);
-                       goto ignore;
-               }
-               DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
-                     " - %s fragment of a fragmented frame received"
-                     " - frag_len=%u - len=%u - offset=%ld",
-                     client->id, frame->stream_id, frame->frame_id,
-                     (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
-                     frame->frag_len, frame->len, p - frame->buf);
+       DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+             " - %s frame received"
+             " - frag_len=%u - len=%u - offset=%ld",
+             client->id, frame->stream_id, frame->frame_id,
+             (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
+             frame->frag_len, frame->len, p - frame->buf);
+
+       frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
+       frame->offset = (p - frame->buf);
+       return acc_payload(frame);
+
+  ignore:
+       return 0;
+
+  error:
+       return -1;
+}
+
+/* Decode next part of a fragmented frame received from HAProxy. It returns -1
+ * if an error occurred, 0 if it must be must be ignored, otherwise the number
+ * of read bytes. */
+static int
+handle_hafrag(struct spoe_frame *frame)
+{
+       struct client *client = frame->client;
+       char          *p, *end;
+       uint64_t       stream_id, frame_id;
+
+       p = frame->buf;
+       end = frame->buf + frame->len;
+
+       /* Check frame type */
+       if (*p++ != SPOE_FRM_T_UNSET)
+               goto ignore;
+
+       DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id);
+
+       /* Fragmentation is not supported */
+       if (fragmentation == false) {
+               client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+               goto error;
        }
-       else {
-               frame->stream_id = (unsigned int)stream_id;
-               frame->frame_id  = (unsigned int)frame_id;
 
+       /* Retrieve flags */
+       memcpy((char *)&(frame->flags), p, 4);
+       p+= 4;
+
+       /* Read the stream-id and frame-id */
+       if (spoe_decode_varint(&p, end, &stream_id) == -1)
+               goto ignore;
+       if (spoe_decode_varint(&p, end, &frame_id) == -1)
+               goto ignore;
+
+       if (frame->fragmented == false                  ||
+           frame->stream_id != (unsigned int)stream_id ||
+           frame->frame_id  != (unsigned int)frame_id) {
+               client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
+               goto error;
+       }
+
+       if (frame->flags & SPOE_FRM_FL_ABRT) {
                DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
-                     " - %s frame received"
+                     " - Abort processing of a fragmented frame"
                      " - frag_len=%u - len=%u - offset=%ld",
                      client->id, frame->stream_id, frame->frame_id,
-                     (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
                      frame->frag_len, frame->len, p - frame->buf);
-
-               frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
+               goto ignore;
        }
 
+       DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+             " - %s fragment of a fragmented frame received"
+             " - frag_len=%u - len=%u - offset=%ld",
+             client->id, frame->stream_id, frame->frame_id,
+             (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
+             frame->frag_len, frame->len, p - frame->buf);
+
        frame->offset = (p - frame->buf);
-       return frame->offset;
+       return acc_payload(frame);
 
   ignore:
        return 0;
@@ -1356,7 +1437,11 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg)
                                client->state = SPOA_ST_DISCONNECTING;
                                goto disconnecting;
                        }
-                       n = handle_hanotify(frame);
+                       if (frame->buf[0] == SPOE_FRM_T_UNSET)
+                               n = handle_hafrag(frame);
+                       else
+                               n = handle_hanotify(frame);
+
                        if (n < 0) {
                                LOG(client->worker, "Failed to decode frame: %s",
                                    spoe_frm_err_reasons[client->status_code]);
@@ -1366,6 +1451,8 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg)
                                LOG(client->worker, "Ignore invalid/unknown/aborted frame");
                                goto ignore_frame;
                        }
+                       else if (n == 1)
+                               goto noop;
                        else
                                goto process_frame;
 
@@ -1382,39 +1469,14 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg)
                        goto disconnect;
        }
 
+  noop:
+       return;
+
   ignore_frame:
        reset_frame(frame);
        return;
 
   process_frame:
-       if (frame->fragmented == true) {
-               char  *buf;
-               size_t len = frame->len - frame->offset;
-
-               buf = realloc(frame->frag_buf, frame->frag_len + len);
-               if (buf == NULL) {
-                       client->status_code = SPOE_FRM_ERR_RES;
-                       goto disconnect;
-               }
-               memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
-               frame->frag_buf  = buf;
-               frame->frag_len += len;
-
-               if (!(frame->flags & SPOE_FRM_FL_FIN)) {
-                       /* Wait for next fragments */
-                       frame->buf    = (char *)(frame->data);
-                       frame->offset = 0;
-                       frame->len    = 0;
-                       frame->flags  = 0;
-                       return;
-               }
-
-               frame->buf    = frame->frag_buf;
-               frame->len    = frame->frag_len;
-               frame->offset = 0;
-               /* fall through */
-       }
-
        process_incoming_frame(frame);
        client->incoming_frame = NULL;
        return;
index a6b0ffc27c41d6d2f3f0777a55e86ca86f6d004f..b65bc7a917291355a538afeead9330bf07648e72 100644 (file)
@@ -294,6 +294,8 @@ struct spoe_appctx {
 
 /* Frame Types sent by HAProxy and by agents */
 enum spoe_frame_type {
+       SPOE_FRM_T_UNSET = 0,
+
        /* Frames sent by HAProxy */
        SPOE_FRM_T_HAPROXY_HELLO = 1,
        SPOE_FRM_T_HAPROXY_DISCON,
index 8156287c66661aa9a7721531ba14a84dcd17da55..ddfb67ba2bffe34147ec13b7f8bbe3fef3ea9c24 100644 (file)
@@ -495,30 +495,76 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
        p   = frame;
        end = frame+size;
 
+       stream_id = ctx->stream_id;
+       frame_id  = ctx->frame_id;
+
+       if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
+               /* The fragmentation is not supported by the applet */
+               if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
+                       SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+                       return -1;
+               }
+               flags = ctx->frag_ctx.flags;
+       }
+
+       /* Set Frame type */
+       *p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
+
+       /* Set flags */
+       memcpy(p, (char *)&flags, 4);
+       p += 4;
+
+       /* Set stream-id and frame-id */
+       if (spoe_encode_varint(stream_id, &p, end) == -1)
+               goto too_big;
+       if (spoe_encode_varint(frame_id, &p, end) == -1)
+               goto too_big;
+
+       /* Copy encoded messages, if possible */
+       sz = ctx->buffer->i;
+       if (p + sz >= end)
+               goto too_big;
+       memcpy(p, ctx->buffer->p, sz);
+       p += sz;
+
+       return (p - frame);
+
+  too_big:
+       SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
+       return 0;
+}
+
+/* Encode next part of a fragmented frame sent by HAProxy to an agent. It
+ * returns the number of encoded bytes in the frame on success, 0 if an encoding
+ * error occurred and -1 if a fatal error occurred. */
+static int
+spoe_prepare_hafrag_frame(struct appctx *appctx, struct spoe_context *ctx,
+                         char *frame, size_t size)
+{
+       char        *p, *end;
+       unsigned int stream_id, frame_id;
+       unsigned int flags;
+       size_t       sz;
+
+       p   = frame;
+       end = frame+size;
+
        /* <ctx> is null when the stream has aborted the processing of a
         * fragmented frame. In this case, we must notify the corresponding
         * agent using ids stored in <frag_ctx>. */
        if (ctx == NULL) {
-               flags    |= SPOE_FRM_FL_ABRT;
+               flags     = (SPOE_FRM_FL_FIN|SPOE_FRM_FL_ABRT);
                stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
                frame_id  = SPOE_APPCTX(appctx)->frag_ctx.curfid;
        }
        else {
+               flags     = ctx->frag_ctx.flags;
                stream_id = ctx->stream_id;
                frame_id  = ctx->frame_id;
-
-               if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
-                       /* The fragmentation is not supported by the applet */
-                       if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
-                               SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
-                               return -1;
-                       }
-                       flags = ctx->frag_ctx.flags;
-               }
        }
 
        /* Set Frame type */
-       *p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
+       *p++ = SPOE_FRM_T_UNSET;
 
        /* Set flags */
        memcpy(p, (char *)&flags, 4);
@@ -530,13 +576,17 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
        if (spoe_encode_varint(frame_id, &p, end) == -1)
                goto too_big;
 
+       if (ctx == NULL)
+               goto end;
+
        /* Copy encoded messages, if possible */
-       sz = SPOE_APPCTX(appctx)->buffer->i;
+       sz = ctx->buffer->i;
        if (p + sz >= end)
                goto too_big;
-       memcpy(p, SPOE_APPCTX(appctx)->buffer->p, sz);
+       memcpy(p, ctx->buffer->p, sz);
        p += sz;
 
+  end:
        return (p - frame);
 
   too_big:
@@ -1150,12 +1200,13 @@ spoe_release_appctx(struct appctx *appctx)
                if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
                        agent->applets_idle--;
 
-               si_shutw(si);
-               si_shutr(si);
-               si_ic(si)->flags |= CF_READ_NULL;
                appctx->st0 = SPOE_APPCTX_ST_END;
                if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
                        spoe_appctx->status_code = SPOE_FRM_ERR_IO;
+
+               si_shutw(si);
+               si_shutr(si);
+               si_ic(si)->flags |= CF_READ_NULL;
        }
 
        /* Destroy the task attached to this applet */
@@ -1351,19 +1402,36 @@ spoe_handle_connecting_appctx(struct appctx *appctx)
        return 0;
 }
 
+
 static int
-spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx,
-                                int *skip)
+spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
 {
-       struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+       struct spoe_agent   *agent = SPOE_APPCTX(appctx)->agent;
+       struct spoe_context *ctx = NULL;
        char *frame, *buf;
        int   ret;
 
        /* 4 bytes are reserved at the beginning of <buf> to store the frame
         * length. */
        buf = trash.str; frame = buf+4;
-       ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
-                                         SPOE_APPCTX(appctx)->max_frame_size);
+
+       if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
+               ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
+               ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
+                                               SPOE_APPCTX(appctx)->max_frame_size);
+       }
+       else if (LIST_ISEMPTY(&agent->sending_queue)) {
+               *skip = 1;
+               ret   = 1;
+               goto end;
+       }
+       else {
+               ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+               ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
+                                                 SPOE_APPCTX(appctx)->max_frame_size);
+
+       }
+
        if (ret > 1)
                ret = spoe_send_frame(appctx, buf, ret);
 
@@ -1376,6 +1444,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx
                        if (ctx == NULL)
                                goto abort_frag_frame;
 
+                       spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
                        LIST_DEL(&ctx->list);
                        LIST_INIT(&ctx->list);
                        ctx->state = SPOE_CTX_ST_ERROR;
@@ -1391,6 +1460,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx
                        if (ctx == NULL)
                                goto abort_frag_frame;
 
+                       spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
                        LIST_DEL(&ctx->list);
                        LIST_INIT(&ctx->list);
                        if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
@@ -1506,7 +1576,6 @@ spoe_handle_processing_appctx(struct appctx *appctx)
 {
        struct stream_interface *si    = appctx->owner;
        struct spoe_agent       *agent = SPOE_APPCTX(appctx)->agent;
-       struct spoe_context     *ctx = NULL;
        unsigned int  fpa = 0;
        int           ret, skip_sending = 0, skip_receiving = 0;
 
@@ -1531,39 +1600,21 @@ spoe_handle_processing_appctx(struct appctx *appctx)
                    skip_sending, skip_receiving,
                    spoe_appctx_state_str[appctx->st0]);
 
-       if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
+       if (fpa > agent->max_fpa)
                goto stop;
-       else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
+       else if (skip_sending || appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
                if (skip_receiving)
                        goto stop;
                goto recv_frame;
        }
-       else if (skip_sending)
-               goto recv_frame;
-       else if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
-               ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
-               goto send_frame;
-       }
-       else if (LIST_ISEMPTY(&agent->sending_queue)) {
-               skip_sending = 1;
-               goto recv_frame;
-       }
-       ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
 
-  send_frame:
-       /* Transfer the buffer ownership to the SPOE appctx */
-       if (ctx) {
-               SPOE_APPCTX(appctx)->buffer = ctx->buffer;
-               ctx->buffer = &buf_empty;
-       }
-       ret = spoe_handle_sending_frame_appctx(appctx, ctx, &skip_sending);
+       /* send_frame */
+       ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
        switch (ret) {
                case -1: /* error */
                        goto next;
 
                case 0: /* ignore */
-                       spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
-                                           &SPOE_APPCTX(appctx)->buffer_wait);
                        agent->sending_rate++;
                        fpa++;
                        break;
@@ -1572,8 +1623,6 @@ spoe_handle_processing_appctx(struct appctx *appctx)
                        break;
 
                default:
-                       spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
-                                           &SPOE_APPCTX(appctx)->buffer_wait);
                        agent->sending_rate++;
                        fpa++;
                        break;
@@ -2571,7 +2620,7 @@ static void
 spoe_reset_context(struct spoe_context *ctx)
 {
        ctx->state  = SPOE_CTX_ST_READY;
-       ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+       ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
 }