]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
BUG/MAJOR: Fix how the list of entities waiting for a buffer is handled
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 9 Dec 2016 16:30:18 +0000 (17:30 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 12 Dec 2016 18:11:04 +0000 (19:11 +0100)
When an entity tries to get a buffer, if it cannot be allocted, for example
because the number of buffers which may be allocated per process is limited,
this entity is added in a list (called <buffer_wq>) and wait for an available
buffer.

Historically, the <buffer_wq> list was logically attached to streams because it
were the only entities likely to be added in it. Now, applets can also be
waiting for a free buffer. And with filters, we could imagine to have more other
entities waiting for a buffer. So it make sense to have a generic list.

Anyway, with the current design there is a bug. When an applet failed to get a
buffer, it will wait. But we add the stream attached to the applet in
<buffer_wq>, instead of the applet itself. So when a buffer is available, we
wake up the stream and not the waiting applet. So, it is possible to have
waiting applets and never awakened.

So, now, <buffer_wq> is independant from streams. And we really add the waiting
entity in <buffer_wq>. To be generic, the entity is responsible to define the
callback used to awaken it.

In addition, applets will still request an input buffer when they become
active. But they will not be sleeped anymore if no buffer are available. So this
is the responsibility to the applet I/O handler to check if this buffer is
allocated or not. This way, an applet can decide if this buffer is required or
not and can do additional processing if not.

[wt: backport to 1.7 and 1.6]

15 files changed:
include/common/buffer.h
include/proto/applet.h
include/proto/channel.h
include/proto/stream.h
include/types/applet.h
include/types/stream.h
src/applet.c
src/buffer.c
src/cli.c
src/flt_spoe.c
src/hlua.c
src/peers.c
src/stats.c
src/stream.c
src/stream_interface.c

index ca90fbe4be9cd6f0f4a2fdd761b40ac5131373c0..ce3eb40a962a118221918edde8d7f06b4786e21a 100644 (file)
@@ -39,9 +39,18 @@ struct buffer {
        char data[0];                   /* <size> bytes */
 };
 
+/* an element of the <buffer_wq> list. It represents an object that need to
+ * acquire a buffer to continue its process. */
+struct buffer_wait {
+       void *target;              /* The waiting object that should be woken up */
+       int (*wakeup_cb)(void *);  /* The function used to wake up the <target>, passed as argument */
+       struct list list;          /* Next element in the <buffer_wq> list */
+};
+
 extern struct pool_head *pool2_buffer;
 extern struct buffer buf_empty;
 extern struct buffer buf_wanted;
+extern struct list buffer_wq;
 
 int init_buffer();
 int buffer_replace2(struct buffer *b, char *pos, char *end, const char *str, int len);
@@ -522,6 +531,16 @@ static inline struct buffer *b_alloc_margin(struct buffer **buf, int margin)
        return next;
 }
 
+
+void __offer_buffer(void *from, unsigned int threshold);
+
+static inline void offer_buffers(void *from, unsigned int threshold)
+{
+       if (LIST_ISEMPTY(&buffer_wq))
+               return;
+       __offer_buffer(from, threshold);
+}
+
 #endif /* _COMMON_BUFFER_H */
 
 /*
index 5a503b43c4c419d3feac245c1a5e61ea312c860a..653be31e486899aff069c2025a4aa2c20d1336e3 100644 (file)
@@ -36,6 +36,10 @@ extern struct list applet_active_queue;
 
 void applet_run_active();
 
+
+static int inline appctx_res_wakeup(struct appctx *appctx);
+
+
 /* Initializes all required fields for a new appctx. Note that it does the
  * minimum acceptable initialization for an appctx. This means only the
  * 3 integer states st0, st1, st2 are zeroed.
@@ -61,6 +65,9 @@ static inline struct appctx *appctx_new(struct applet *applet)
                appctx->applet = applet;
                appctx_init(appctx);
                LIST_INIT(&appctx->runq);
+               LIST_INIT(&appctx->buffer_wait.list);
+               appctx->buffer_wait.target = appctx;
+               appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
                nb_applets++;
        }
        return appctx;
@@ -75,6 +82,10 @@ static inline void appctx_free(struct appctx *appctx)
                LIST_DEL(&appctx->runq);
                applets_active_queue--;
        }
+       if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
+               LIST_DEL(&appctx->buffer_wait.list);
+               LIST_INIT(&appctx->buffer_wait.list);
+       }
        pool_free2(pool2_connection, appctx);
        nb_applets--;
 }
@@ -98,6 +109,19 @@ static inline void appctx_pause(struct appctx *appctx)
        }
 }
 
+/* Callback used to wake up an applet when a buffer is available. The applet
+ * <appctx> is woken up is if it is not already in the list of "active"
+ * applets. This functions returns 1 is the stream is woken up, otherwise it
+ * returns 0. */
+static inline int appctx_res_wakeup(struct appctx *appctx)
+{
+       if (!LIST_ISEMPTY(&appctx->runq))
+               return 0;
+       appctx_wakeup(appctx);
+       return 1;
+}
+
+
 #endif /* _PROTO_APPLET_H */
 
 /*
index 3d435c47ce94a8b9002a8abf734fa246fee69f36..304a9357a0ef3ec34b3aa15fc4da9713e9203d8f 100644 (file)
@@ -36,6 +36,9 @@
 #include <types/stream.h>
 #include <types/stream_interface.h>
 
+#include <proto/applet.h>
+#include <proto/task.h>
+
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_channel();
 
@@ -439,6 +442,41 @@ static inline int channel_recv_max(const struct channel *chn)
        return ret;
 }
 
+/* Allocates a buffer for channel <chn>, but only if it's guaranteed that it's
+ * not the last available buffer or it's the response buffer. Unless the buffer
+ * is the response buffer, an extra control is made so that we always keep
+ * <tune.buffers.reserved> buffers available after this allocation. Returns 0 in
+ * case of failure, non-zero otherwise.
+ *
+ * If no buffer are available, the requester, represented by <wait> pointer,
+ * will be added in the list of objects waiting for an available buffer.
+ */
+static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait *wait)
+{
+       int margin = 0;
+
+       if (!(chn->flags & CF_ISRESP))
+               margin = global.tune.reserved_bufs;
+
+       if (b_alloc_margin(&chn->buf, margin) != NULL)
+               return 1;
+
+       if (LIST_ISEMPTY(&wait->list))
+               LIST_ADDQ(&buffer_wq, &wait->list);
+       return 0;
+}
+
+/* Releases a possibly allocated buffer for channel <chn>. If it was not
+ * allocated, this function does nothing. Else the buffer is released and we try
+ * to wake up as many streams/applets as possible. */
+static inline void channel_release_buffer(struct channel *chn, struct buffer_wait *wait)
+{
+       if (chn->buf->size && buffer_empty(chn->buf)) {
+               b_free(&chn->buf);
+               offer_buffers(wait->target, tasks_run_queue + applets_active_queue);
+       }
+}
+
 /* Truncate any unread data in the channel's buffer, and disable forwarding.
  * Outgoing data are left intact. This is mainly to be used to send error
  * messages after existing data.
index b439344f2050fb39172d9cb4f853efc28481cc87..85c234edc38ceef67efcf4cf6aaa6509d4073c1c 100644 (file)
@@ -32,7 +32,6 @@
 
 extern struct pool_head *pool2_stream;
 extern struct list streams;
-extern struct list buffer_wq;
 
 extern struct data_cb sess_conn_cb;
 
@@ -55,11 +54,7 @@ int parse_track_counters(char **args, int *arg,
 
 /* Update the stream's backend and server time stats */
 void stream_update_time_stats(struct stream *s);
-void __stream_offer_buffers(int rqlimit);
-static inline void stream_offer_buffers();
-int stream_alloc_work_buffer(struct stream *s);
 void stream_release_buffers(struct stream *s);
-int stream_alloc_recv_buffer(struct channel *chn);
 
 /* returns the session this stream belongs to */
 static inline struct session *strm_sess(const struct stream *strm)
@@ -285,25 +280,16 @@ static void inline stream_init_srv_conn(struct stream *sess)
        LIST_INIT(&sess->by_srv);
 }
 
-static inline void stream_offer_buffers()
+/* Callback used to wake up a stream when a buffer is available. The stream <s>
+ * is woken up is if it is not already running and if it is not already in the
+ * task run queue. This functions returns 1 is the stream is woken up, otherwise
+ * it returns 0. */
+static int inline stream_res_wakeup(struct stream *s)
 {
-       int avail;
-
-       if (LIST_ISEMPTY(&buffer_wq))
-               return;
-
-       /* all streams will need 1 buffer, so we can stop waking up streams
-        * once we have enough of them to eat all the buffers. Note that we
-        * don't really know if they are streams or just other tasks, but
-        * that's a rough estimate. Similarly, for each cached event we'll need
-        * 1 buffer. If no buffer is currently used, always wake up the number
-        * of tasks we can offer a buffer based on what is allocated, and in
-        * any case at least one task per two reserved buffers.
-        */
-       avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2;
-
-       if (avail > (int)tasks_run_queue)
-               __stream_offer_buffers(avail);
+       if (s->task->state & TASK_RUNNING || task_in_rq(s->task))
+               return 0;
+       task_wakeup(s->task, TASK_WOKEN_RES);
+       return 1;
 }
 
 void service_keywords_register(struct action_kw_list *kw_list);
index da9f787a26dbb3bab25db603524136d354a3d508..89602aac5633240bb94ff7bb598dfa16297e9748 100644 (file)
@@ -26,6 +26,7 @@
 #include <types/obj_type.h>
 #include <types/proxy.h>
 #include <types/stream.h>
+#include <common/buffer.h>
 #include <common/chunk.h>
 #include <common/config.h>
 
@@ -58,6 +59,7 @@ struct appctx {
        void (*io_release)(struct appctx *appctx);  /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
                                                       if the command is terminated or the session released */
        void *private;
+       struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
 
        union {
                struct {
index 2cc903cfafafe60937034d808c03f9c4da430a1b..26e8dd590511ba38f3c0885ec37cc289587539d3 100644 (file)
@@ -135,7 +135,7 @@ struct stream {
        struct list list;               /* position in global streams list */
        struct list by_srv;             /* position in server stream list */
        struct list back_refs;          /* list of users tracking this stream */
-       struct list buffer_wait;        /* position in the list of streams waiting for a buffer */
+       struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
 
        struct {
                struct stksess *ts;
index ad40e1f9f97310e23cc4742457c819b0b08896e6..f5bc79d8003bf7c2d48166bfd8113c588eae02f4 100644 (file)
@@ -16,6 +16,7 @@
 #include <common/config.h>
 #include <common/mini-clist.h>
 #include <proto/applet.h>
+#include <proto/channel.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
 
@@ -48,13 +49,12 @@ void applet_run_active()
                curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq);
                si = curr->owner;
 
-               /* now we'll need a buffer */
-               if (!stream_alloc_recv_buffer(si_ic(si))) {
-                       si->flags |= SI_FL_WAIT_ROOM;
-                       LIST_DEL(&curr->runq);
-                       LIST_INIT(&curr->runq);
-                       continue;
-               }
+               /* Now we'll try to allocate the input buffer. We wake up the
+                * applet in all cases. So this is the applet responsibility to
+                * check if this buffer was allocated or not. This let a chance
+                * for applets to do some other processing if needed. */
+               if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait))
+                       si_applet_cant_put(si);
 
                /* We always pretend the applet can't get and doesn't want to
                 * put, it's up to it to change this if needed. This ensures
@@ -65,6 +65,7 @@ void applet_run_active()
 
                curr->applet->fct(curr);
                si_applet_wake_cb(si);
+               channel_release_buffer(si_ic(si), &curr->buffer_wait);
 
                if (applet_cur_queue.n == &curr->runq) {
                        /* curr was left in the list, move it back to the active list */
index f47fbddbdbd02c1a2a526730ecb65fd81d892ca7..4f8f6474c168d29ff7b63513169d228e71202773 100644 (file)
@@ -31,6 +31,9 @@ struct pool_head *pool2_buffer;
 struct buffer buf_empty  = { .p = buf_empty.data };
 struct buffer buf_wanted = { .p = buf_wanted.data };
 
+/* list of objects waiting for at least one buffer */
+struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
+
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_buffer()
 {
@@ -278,6 +281,35 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to)
        fflush(o);
 }
 
+void __offer_buffer(void *from, unsigned int threshold)
+{
+       struct buffer_wait *wait, *bak;
+       int avail;
+
+       /* For now, we consider that all objects need 1 buffer, so we can stop
+        * waking up them once we have enough of them to eat all the available
+        * buffers. Note that we don't really know if they are streams or just
+        * other tasks, but that's a rough estimate. Similarly, for each cached
+        * event we'll need 1 buffer. If no buffer is currently used, always
+        * wake up the number of tasks we can offer a buffer based on what is
+        * allocated, and in any case at least one task per two reserved
+        * buffers.
+        */
+       avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2;
+
+       list_for_each_entry_safe(wait, bak, &buffer_wq, list) {
+               if (avail <= threshold)
+                       break;
+
+               if (wait->target == from || !wait->wakeup_cb(wait->target))
+                       continue;
+
+               LIST_DEL(&wait->list);
+               LIST_INIT(&wait->list);
+
+               avail--;
+       }
+}
 
 /*
  * Local variables:
index a1923bc4e71719c2bc0a2220f41fb31a5136dd4b..3d537ba544daa8488d37dbb6e84c53d03be86a7d 100644 (file)
--- a/src/cli.c
+++ b/src/cli.c
@@ -488,6 +488,12 @@ static void cli_io_handler(struct appctx *appctx)
        if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO))
                goto out;
 
+       /* Check if the input buffer is avalaible. */
+       if (res->buf->size == 0) {
+               si_applet_cant_put(si);
+               goto out;
+       }
+
        while (1) {
                if (appctx->st0 == CLI_ST_INIT) {
                        /* Stats output not initialized yet */
index 776848e487be86afd2148de0df27b59239a90f08..aa6414abfdf43370e9a2f3df4628752d97126d90 100644 (file)
@@ -224,7 +224,7 @@ struct spoe_context {
        struct appctx      *appctx;       /* The SPOE appctx */
        struct list        *messages;     /* List of messages that will be sent during the stream processing */
        struct buffer      *buffer;       /* Buffer used to store a NOTIFY or ACK frame */
-       struct list         buffer_wait;  /* position in the list of streams waiting for a buffer */
+       struct buffer_wait  buffer_wait;  /* position in the list of streams waiting for a buffer */
        struct list         applet_wait;  /* position in the list of streams waiting for a SPOE applet */
 
        enum spoe_ctx_state state;        /* SPOE_CTX_ST_* */
@@ -1232,6 +1232,9 @@ send_spoe_frame(struct appctx *appctx,
        int                      framesz, ret;
        uint32_t                 netint;
 
+       if (si_ic(si)->buf->size == 0)
+               return -1;
+
        ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
        if (ret <= 0)
                goto skip_or_error;
@@ -1524,7 +1527,7 @@ handle_spoe_applet(struct appctx *appctx)
                        /* fall through */
 
                case SPOE_APPCTX_ST_END:
-                       break;
+                       return;
        }
 
  out:
@@ -1693,13 +1696,13 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir)
        /* If needed, initialize the buffer that will be used to encode messages
         * and decode actions. */
        if (ctx->buffer == &buf_empty) {
-               if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
-                       LIST_DEL(&ctx->buffer_wait);
-                       LIST_INIT(&ctx->buffer_wait);
+               if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
+                       LIST_DEL(&ctx->buffer_wait.list);
+                       LIST_INIT(&ctx->buffer_wait.list);
                }
 
-               if (!b_alloc_margin(&ctx->buffer, 0)) {
-                       LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
+               if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
+                       LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
                        goto wait;
                }
        }
@@ -1794,8 +1797,7 @@ release_spoe_appctx(struct spoe_context *ctx)
        /* Release the buffer if needed */
        if (ctx->buffer != &buf_empty) {
                b_free(&ctx->buffer);
-               if (!LIST_ISEMPTY(&buffer_wq))
-                       stream_offer_buffers();
+               offer_buffers(ctx, tasks_run_queue + applets_active_queue);
        }
 
        /* If there is no SPOE applet, all is done */
@@ -2213,6 +2215,12 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
 /***************************************************************************
  * Functions that create/destroy SPOE contexts
  **************************************************************************/
+static int wakeup_spoe_context(struct spoe_context *ctx)
+{
+       task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+       return 1;
+}
+
 static struct spoe_context *
 create_spoe_context(struct filter *filter)
 {
@@ -2229,7 +2237,9 @@ create_spoe_context(struct filter *filter)
        ctx->flags    = 0;
        ctx->messages = conf->agent->messages;
        ctx->buffer   = &buf_empty;
-       LIST_INIT(&ctx->buffer_wait);
+       LIST_INIT(&ctx->buffer_wait.list);
+       ctx->buffer_wait.target = ctx;
+       ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
        LIST_INIT(&ctx->applet_wait);
 
        ctx->stream_id   = 0;
@@ -2247,8 +2257,8 @@ destroy_spoe_context(struct spoe_context *ctx)
 
        if (ctx->appctx)
                APPCTX_SPOE(ctx->appctx).ctx = NULL;
-       if (!LIST_ISEMPTY(&ctx->buffer_wait))
-               LIST_DEL(&ctx->buffer_wait);
+       if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
+               LIST_DEL(&ctx->buffer_wait.list);
        if (!LIST_ISEMPTY(&ctx->applet_wait))
                LIST_DEL(&ctx->applet_wait);
        pool_free2(pool2_spoe_ctx, ctx);
@@ -2459,8 +2469,13 @@ spoe_check_timeouts(struct stream *s, struct filter *filter)
 {
        struct spoe_context *ctx = filter->ctx;
 
-       if (tick_is_expired(ctx->process_exp, now_ms))
-               s->task->state |= TASK_WOKEN_MSG;
+       if (tick_is_expired(ctx->process_exp, now_ms)) {
+               s->pending_events |= TASK_WOKEN_MSG;
+               if (ctx->buffer != &buf_empty) {
+                       b_free(&ctx->buffer);
+                       offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+               }
+       }
 }
 
 /* Called when we are ready to filter data on a channel */
index 0ca3ec21ee214eaac9347029f8ca799ecc4c3381..10ed8ee475250b3030fdb8e9f48412bcf0e371dd 100644 (file)
@@ -1884,10 +1884,8 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
         * the request buffer if its not required.
         */
        if (socket->s->req.buf->size == 0) {
-               if (!stream_alloc_recv_buffer(&socket->s->req)) {
-                       socket->s->si[0].flags |= SI_FL_WAIT_ROOM;
-                       goto hlua_socket_write_yield_return;
-               }
+               si_applet_cant_put(&socket->s->si[0]);
+               goto hlua_socket_write_yield_return;
        }
 
        /* Check for avalaible space. */
@@ -2610,6 +2608,14 @@ __LJMP static int hlua_channel_append_yield(lua_State *L, int status, lua_KConte
        int ret;
        int max;
 
+       /* Check if the buffer is avalaible because HAProxy doesn't allocate
+        * the request buffer if its not required.
+        */
+       if (chn->buf->size == 0) {
+               si_applet_cant_put(chn_prod(chn));
+               WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_append_yield, TICK_ETERNITY, 0));
+       }
+
        max = channel_recv_limit(chn) - buffer_len(chn->buf);
        if (max > len - l)
                max = len - l;
@@ -2700,10 +2706,8 @@ __LJMP static int hlua_channel_send_yield(lua_State *L, int status, lua_KContext
         * the request buffer if its not required.
         */
        if (chn->buf->size == 0) {
-               if (!stream_alloc_recv_buffer(chn)) {
-                       chn_prod(chn)->flags |= SI_FL_WAIT_ROOM;
-                       WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_send_yield, TICK_ETERNITY, 0));
-               }
+               si_applet_cant_put(chn_prod(chn));
+               WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_send_yield, TICK_ETERNITY, 0));
        }
 
        /* the writed data will be immediatly sent, so we can check
index 1a80ab34a4fae54b63654eea01f181141db022a7..1a280a570854b2252df1a09295232aa552c4cb5f 100644 (file)
@@ -547,6 +547,10 @@ static void peer_io_handler(struct appctx *appctx)
        size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
        unsigned int maj_ver, min_ver;
 
+       /* Check if the input buffer is avalaible. */
+       if (si_ic(si)->buf->size == 0)
+               goto full;
+
        while (1) {
 switchstate:
                maj_ver = min_ver = (unsigned int)-1;
index 1a842e8d9aa53ba392c697e9f2d8dc1f7c9a7f7e..8ad983df6c17a12a9e49bf63124f1c6364db452b 100644 (file)
@@ -2766,6 +2766,12 @@ static void http_stats_io_handler(struct appctx *appctx)
        if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO))
                goto out;
 
+       /* Check if the input buffer is avalaible. */
+       if (res->buf->size == 0) {
+               si_applet_cant_put(si);
+               goto out;
+       }
+
        /* check that the output is not closed */
        if (res->flags & (CF_SHUTW|CF_SHUTW_NOW))
                appctx->st0 = STAT_HTTP_DONE;
index db8702dd4028a62ecd79b004e04d8e10425d3550..298830d2f116878dccba6715eff408fd75bf7050 100644 (file)
@@ -62,9 +62,6 @@
 struct pool_head *pool2_stream;
 struct list streams;
 
-/* list of streams waiting for at least one buffer */
-struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
-
 /* List of all use-service keywords. */
 static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
 
@@ -139,7 +136,10 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
        /* OK, we're keeping the stream, so let's properly initialize the stream */
        LIST_ADDQ(&streams, &s->list);
        LIST_INIT(&s->back_refs);
-       LIST_INIT(&s->buffer_wait);
+
+       LIST_INIT(&s->buffer_wait.list);
+       s->buffer_wait.target = s;
+       s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup;
 
        s->flags |= SF_INITIALIZED;
        s->unique_id = NULL;
@@ -289,15 +289,15 @@ static void stream_free(struct stream *s)
                put_pipe(s->res.pipe);
 
        /* We may still be present in the buffer wait queue */
-       if (!LIST_ISEMPTY(&s->buffer_wait)) {
-               LIST_DEL(&s->buffer_wait);
-               LIST_INIT(&s->buffer_wait);
+       if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+               LIST_DEL(&s->buffer_wait.list);
+               LIST_INIT(&s->buffer_wait.list);
+       }
+       if (s->req.buf->size || s->res.buf->size) {
+               b_drop(&s->req.buf);
+               b_drop(&s->res.buf);
+               offer_buffers(NULL, tasks_run_queue + applets_active_queue);
        }
-
-       b_drop(&s->req.buf);
-       b_drop(&s->res.buf);
-       if (!LIST_ISEMPTY(&buffer_wq))
-               stream_offer_buffers();
 
        hlua_ctx_destroy(&s->hlua);
        if (s->txn)
@@ -370,33 +370,6 @@ static void stream_free(struct stream *s)
        }
 }
 
-/* Allocates a receive buffer for channel <chn>, but only if it's guaranteed
- * that it's not the last available buffer or it's the response buffer. Unless
- * the buffer is the response buffer, an extra control is made so that we always
- * keep <tune.buffers.reserved> buffers available after this allocation. To be
- * called at the beginning of recv() callbacks to ensure that the required
- * buffers are properly allocated. Returns 0 in case of failure, non-zero
- * otherwise.
- */
-int stream_alloc_recv_buffer(struct channel *chn)
-{
-       struct stream *s;
-       struct buffer *b;
-       int margin = 0;
-
-       if (!(chn->flags & CF_ISRESP))
-               margin = global.tune.reserved_bufs;
-
-       s = chn_strm(chn);
-
-       b = b_alloc_margin(&chn->buf, margin);
-       if (b)
-               return 1;
-
-       if (LIST_ISEMPTY(&s->buffer_wait))
-               LIST_ADDQ(&buffer_wq, &s->buffer_wait);
-       return 0;
-}
 
 /* Allocates a work buffer for stream <s>. It is meant to be called inside
  * process_stream(). It will only allocate the side needed for the function
@@ -406,60 +379,44 @@ int stream_alloc_recv_buffer(struct channel *chn)
  * server from releasing a connection. Returns 0 in case of failure, non-zero
  * otherwise.
  */
-int stream_alloc_work_buffer(struct stream *s)
+static int stream_alloc_work_buffer(struct stream *s)
 {
-       if (!LIST_ISEMPTY(&s->buffer_wait)) {
-               LIST_DEL(&s->buffer_wait);
-               LIST_INIT(&s->buffer_wait);
+       if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+               LIST_DEL(&s->buffer_wait.list);
+               LIST_INIT(&s->buffer_wait.list);
        }
 
        if (b_alloc_margin(&s->res.buf, 0))
                return 1;
 
-       LIST_ADDQ(&buffer_wq, &s->buffer_wait);
+       LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
        return 0;
 }
 
 /* releases unused buffers after processing. Typically used at the end of the
- * update() functions. It will try to wake up as many tasks as the number of
- * buffers that it releases. In practice, most often streams are blocked on
- * a single buffer, so it makes sense to try to wake two up when two buffers
- * are released at once.
+ * update() functions. It will try to wake up as many tasks/applets as the
+ * number of buffers that it releases. In practice, most often streams are
+ * blocked on a single buffer, so it makes sense to try to wake two up when two
+ * buffers are released at once.
  */
 void stream_release_buffers(struct stream *s)
 {
-       if (s->req.buf->size && buffer_empty(s->req.buf))
-               b_free(&s->req.buf);
+       int offer = 0;
 
-       if (s->res.buf->size && buffer_empty(s->res.buf))
+       if (s->req.buf->size && buffer_empty(s->req.buf)) {
+               offer = 1;
+               b_free(&s->req.buf);
+       }
+       if (s->res.buf->size && buffer_empty(s->res.buf)) {
+               offer = 1;
                b_free(&s->res.buf);
+       }
 
        /* if we're certain to have at least 1 buffer available, and there is
         * someone waiting, we can wake up a waiter and offer them.
         */
-       if (!LIST_ISEMPTY(&buffer_wq))
-               stream_offer_buffers();
-}
-
-/* Runs across the list of pending streams waiting for a buffer and wakes one
- * up if buffers are available. Will stop when the run queue reaches <rqlimit>.
- * Should not be called directly, use stream_offer_buffers() instead.
- */
-void __stream_offer_buffers(int rqlimit)
-{
-       struct stream *sess, *bak;
-
-       list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) {
-               if (rqlimit <= tasks_run_queue)
-                       break;
-
-               if (sess->task->state & TASK_RUNNING)
-                       continue;
-
-               LIST_DEL(&sess->buffer_wait);
-               LIST_INIT(&sess->buffer_wait);
-               task_wakeup(sess->task, TASK_WOKEN_RES);
-       }
+       if (offer)
+               offer_buffers(s, tasks_run_queue + applets_active_queue);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -2817,7 +2774,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
                        chunk_appendf(&trash,
                             "  txn=%p flags=0x%x meth=%d status=%d req.st=%s rsp.st=%s waiting=%d\n",
                              strm->txn, strm->txn->flags, strm->txn->meth, strm->txn->status,
-                             http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait));
+                             http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait.list));
 
                chunk_appendf(&trash,
                             "  si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n",
index e3e6cc66b1b05f14d506e2ae4a1f62f1630a65f8..d5f2c87ea1cec944b3e405bf38ea3666ff4f29e4 100644 (file)
@@ -538,8 +538,6 @@ void stream_int_notify(struct stream_interface *si)
        }
        if (ic->flags & CF_READ_ACTIVITY)
                ic->flags &= ~CF_READ_DONTWAIT;
-
-       stream_release_buffers(si_strm(si));
 }
 
 
@@ -571,6 +569,7 @@ static int si_conn_wake_cb(struct connection *conn)
         * stream-int status.
         */
        stream_int_notify(si);
+       channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
 
        /* Third step : update the connection's polling status based on what
         * was done above (eg: maybe some buffers got emptied).
@@ -1128,8 +1127,8 @@ static void si_conn_recv_cb(struct connection *conn)
                ic->pipe = NULL;
        }
 
-       /* now we'll need a buffer */
-       if (!stream_alloc_recv_buffer(ic)) {
+       /* now we'll need a input buffer for the stream */
+       if (!channel_alloc_buffer(ic, &(si_strm(si)->buffer_wait))) {
                si->flags |= SI_FL_WAIT_ROOM;
                goto end_recv;
        }