]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: spoe: Be sure to wakeup the good entity waiting for a buffer
authorChristopher Faulet <cfaulet@haproxy.com>
Wed, 11 Jan 2017 13:05:19 +0000 (14:05 +0100)
committerWilly Tarreau <w@1wt.eu>
Thu, 9 Mar 2017 14:32:55 +0000 (15:32 +0100)
This happens when buffer allocation failed. In the SPOE context, buffers are
allocated by streams and SPOE applets at different time. First, by streams, when
messages need to be encoded before sending them in a NOTIFY frame. Then, by SPOE
applets, when a ACK frame is received.

The first case works as expected, we wake up the stream. But for the second one,
we must wake up the waiting SPOE applet.

src/flt_spoe.c

index 1de125f17d8e6d22289755b3ae869a7b207a7727..f4da4ddd7caa82b183dd2a3a11f6f96c0916fe7c 100644 (file)
@@ -246,8 +246,8 @@ struct spoe_context {
        struct stream      *strm;         /* The stream that should be offloaded */
 
        struct list        *messages;     /* List of messages that will be sent during the stream processing */
-       struct buffer      *buffer;       /* Buffer used to store a NOTIFY or ACK frame */
-       struct buffer_wait  buffer_wait;  /* position in the list of streams waiting for a buffer */
+       struct buffer      *buffer;       /* Buffer used to store a encoded messages */
+       struct buffer_wait  buffer_wait;  /* position in the list of ressources waiting for a buffer */
        struct list         list;
 
        enum spoe_ctx_state state;        /* SPOE_CTX_ST_* */
@@ -270,6 +270,8 @@ struct spoe_appctx {
        unsigned int        flags;          /* SPOE_APPCTX_FL_* */
 
        unsigned int        status_code;    /* SPOE_FRM_ERR_* */
+       struct buffer      *buffer;         /* Buffer used to store a encoded messages */
+       struct buffer_wait  buffer_wait;    /* position in the list of ressources waiting for a buffer */
        struct list         waiting_queue;  /* list of streams waiting for a ACK frame, in sync and pipelining mode */
        struct list         list;           /* next spoe appctx for the same agent */
 };
@@ -309,8 +311,8 @@ char spoe_reason[256];
 struct flt_ops spoe_ops;
 
 static int  queue_spoe_context(struct spoe_context *ctx);
-static int  acquire_spoe_buffer(struct spoe_context *ctx);
-static void release_spoe_buffer(struct spoe_context *ctx);
+static int  acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
+static void release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
 
 /********************************************************************
  * helper functions/globals
@@ -913,15 +915,15 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
        idx += encode_spoe_varint(ctx->stream_id, frame+idx);
        idx += encode_spoe_varint(ctx->frame_id, frame+idx);
 
-       /* Copy encoded messages */
-       if (idx + ctx->buffer->i > size) {
+       /* check the buffer size */
+       if (idx + SPOE_APPCTX(appctx)->buffer->i > size) {
                spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
                return 0;
        }
 
        /* Copy encoded messages */
-       memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
-       idx += ctx->buffer->i;
+       memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i);
+       idx += SPOE_APPCTX(appctx)->buffer->i;
 
        return idx;
 }
@@ -1230,9 +1232,13 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
        return 0;
 
   found:
-       if (!acquire_spoe_buffer(ctx))
+       if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait))
                return 1; /* Retry later */
 
+       /* Transfer the buffer ownership to the SPOE context */
+       ctx->buffer = SPOE_APPCTX(appctx)->buffer;
+       SPOE_APPCTX(appctx)->buffer = &buf_empty;
+
        /* Copy encoded actions */
        memcpy(ctx->buffer->p, frame+idx, size-idx);
        ctx->buffer->i = size-idx;
@@ -1379,6 +1385,15 @@ recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
 /********************************************************************
  * Functions that manage the SPOE applet
  ********************************************************************/
+static int
+wakeup_spoe_appctx(struct appctx *appctx)
+{
+       si_applet_want_get(appctx->owner);
+       si_applet_want_put(appctx->owner);
+       appctx_wakeup(appctx);
+       return 1;
+}
+
 /* Callback function that catches applet timeouts. If a timeout occurred, we set
  * <appctx->st1> flag and the SPOE applet is woken up. */
 static struct task *
@@ -1391,9 +1406,7 @@ process_spoe_applet(struct task * task)
                task->expire = TICK_ETERNITY;
                appctx->st1 = SPOE_APPCTX_ERR_TOUT;
        }
-       si_applet_want_get(appctx->owner);
-       si_applet_want_put(appctx->owner);
-       appctx_wakeup(appctx);
+       wakeup_spoe_appctx(appctx);
        return task;
 }
 
@@ -1441,6 +1454,7 @@ release_spoe_applet(struct appctx *appctx)
                task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
        }
 
+       release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
        pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
 
        if (!LIST_ISEMPTY(&agent->applets))
@@ -1633,6 +1647,11 @@ handle_processing_spoe_applet(struct appctx *appctx)
        }
 
        ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+
+       /* Transfer the buffer ownership to the SPOE appctx */
+       SPOE_APPCTX(appctx)->buffer = ctx->buffer;
+       ctx->buffer = &buf_empty;
+
        ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
        if (ret > 1)
                ret = send_spoe_frame(appctx, frame, ret);
@@ -1646,7 +1665,7 @@ handle_processing_spoe_applet(struct appctx *appctx)
                        agent->sending_rate++;
                        ctx->state = SPOE_CTX_ST_ERROR;
                        ctx->status_code = (spoe_status_code + 0x100);
-                       release_spoe_buffer(ctx);
+                       release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
                        task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
                        LIST_DEL(&ctx->list);
                        LIST_INIT(&ctx->list);
@@ -1661,7 +1680,7 @@ handle_processing_spoe_applet(struct appctx *appctx)
                default:
                        agent->sending_rate++;
                        ctx->state = SPOE_CTX_ST_WAITING_ACK;
-                       release_spoe_buffer(ctx);
+                       release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
                        LIST_DEL(&ctx->list);
                        LIST_INIT(&ctx->list);
                        if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC)
@@ -1963,6 +1982,11 @@ create_spoe_appctx(struct spoe_config *conf)
        SPOE_APPCTX(appctx)->max_frame_size  = conf->agent->max_frame_size;
        SPOE_APPCTX(appctx)->flags           = 0;
        SPOE_APPCTX(appctx)->status_code     = SPOE_FRM_ERR_NONE;
+       SPOE_APPCTX(appctx)->buffer          = &buf_empty;
+
+       LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
+       SPOE_APPCTX(appctx)->buffer_wait.target = appctx;
+       SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_appctx;
 
        LIST_INIT(&SPOE_APPCTX(appctx)->list);
        LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
@@ -2093,9 +2117,7 @@ queue_spoe_context(struct spoe_context *ctx)
        list_for_each_entry(spoe_appctx, &agent->applets, list) {
                appctx = spoe_appctx->owner;
                if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
-                       si_applet_want_get(appctx->owner);
-                       si_applet_want_put(appctx->owner);
-                       appctx_wakeup(appctx);
+                       wakeup_spoe_appctx(appctx);
                        LIST_DEL(&spoe_appctx->list);
                        LIST_ADDQ(&agent->applets, &spoe_appctx->list);
                        break;
@@ -2379,7 +2401,7 @@ start_event_processing(struct spoe_context *ctx, int dir)
        if (ctx->flags & SPOE_CTX_FL_PROCESS)
                goto wait;
 
-       if (!acquire_spoe_buffer(ctx))
+       if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait))
                goto wait;
 
        /* Set the right flag to prevent request and response processing
@@ -2405,7 +2427,7 @@ stop_event_processing(struct spoe_context *ctx)
        /* Reset processing timer */
        ctx->process_exp = TICK_ETERNITY;
 
-       release_spoe_buffer(ctx);
+       release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
 
        if (!LIST_ISEMPTY(&ctx->list)) {
                LIST_DEL(&ctx->list);
@@ -2539,39 +2561,41 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
  * Functions that create/destroy SPOE contexts
  **************************************************************************/
 static int
-acquire_spoe_buffer(struct spoe_context *ctx)
+acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
 {
-       if (ctx->buffer != &buf_empty)
+       if (*buf != &buf_empty)
                return 1;
 
-       if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
-               LIST_DEL(&ctx->buffer_wait.list);
-               LIST_INIT(&ctx->buffer_wait.list);
+       if (!LIST_ISEMPTY(&buffer_wait->list)) {
+               LIST_DEL(&buffer_wait->list);
+               LIST_INIT(&buffer_wait->list);
        }
 
-       if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
+       if (b_alloc_margin(buf, global.tune.reserved_bufs))
                return 1;
 
-       LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
+       LIST_ADDQ(&buffer_wq, &buffer_wait->list);
        return 0;
 }
 
 static void
-release_spoe_buffer(struct spoe_context *ctx)
+release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
 {
-       if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
-               LIST_DEL(&ctx->buffer_wait.list);
-               LIST_INIT(&ctx->buffer_wait.list);
+       if (!LIST_ISEMPTY(&buffer_wait->list)) {
+               LIST_DEL(&buffer_wait->list);
+               LIST_INIT(&buffer_wait->list);
        }
 
        /* Release the buffer if needed */
-       if (ctx->buffer != &buf_empty) {
-               b_free(&ctx->buffer);
-               offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+       if (*buf != &buf_empty) {
+               b_free(buf);
+               offer_buffers(buffer_wait->target,
+                             tasks_run_queue + applets_active_queue);
        }
 }
 
-static int wakeup_spoe_context(struct spoe_context *ctx)
+static int
+wakeup_spoe_context(struct spoe_context *ctx)
 {
        task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
        return 1;
@@ -2643,7 +2667,6 @@ sig_stop_spoe(struct sig_handler *sh)
                list_for_each_entry(fconf, &p->filter_configs, list) {
                        struct spoe_config *conf;
                        struct spoe_agent  *agent;
-                       struct appctx      *appctx;
                        struct spoe_appctx *spoe_appctx;
 
                        if (fconf->id != spoe_filter_id)
@@ -2653,10 +2676,7 @@ sig_stop_spoe(struct sig_handler *sh)
                        agent = conf->agent;
 
                        list_for_each_entry(spoe_appctx, &agent->applets, list) {
-                               appctx = spoe_appctx->owner;
-                               si_applet_want_get(appctx->owner);
-                               si_applet_want_put(appctx->owner);
-                               appctx_wakeup(appctx);
+                               wakeup_spoe_appctx(spoe_appctx->owner);
                        }
                }
                p = p->next;
@@ -2818,7 +2838,7 @@ spoe_check_timeouts(struct stream *s, struct filter *filter)
 
        if (tick_is_expired(ctx->process_exp, now_ms)) {
                s->pending_events |= TASK_WOKEN_MSG;
-               release_spoe_buffer(ctx);
+               release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
        }
 }