]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: h2: dynamically allocate the demux buffer on Rx
authorWilly Tarreau <w@1wt.eu>
Fri, 22 Sep 2017 07:13:49 +0000 (09:13 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 17:12:14 +0000 (18:12 +0100)
This patch implements a very basic Rx buffer management. The mux needs
an rx buffer to decode the connection's stream. If this buffer it
available upon Rx events, we fill it with whatever input data are
available. Otherwise we try to allocate it and subscribe to the buffer
wait queue in case of failure. In such a situation, a function
"h2_dbuf_available()" will be called once a buffer may be allocated.
The buffer is released if it's still empty after recv().

src/mux_h2.c

index aeea4ecf6eb0ca1c3af899a93a459f995abdb5e3..4517b2fe455ad391ddd75b280d42688be2342e88 100644 (file)
@@ -14,6 +14,7 @@
 #include <common/config.h>
 #include <common/h2.h>
 #include <common/hpack-tbl.h>
+#include <proto/applet.h>
 #include <proto/connection.h>
 #include <proto/stream.h>
 #include <eb32tree.h>
@@ -77,6 +78,7 @@ struct h2c {
        struct eb_root streams_by_id; /* all active streams by their ID */
        struct list send_list; /* list of blocked streams requesting to send */
        struct list fctl_list; /* list of streams blocked by connection's fctl */
+       struct buffer_wait dbuf_wait; /* wait list for demux buffer allocation */
 };
 
 /* H2 stream state, in h2s->st */
@@ -131,6 +133,52 @@ static int h2_settings_initial_window_size    = 65535; /* initial value */
 static int h2_settings_max_concurrent_streams =   100;
 
 
+/*****************************************************/
+/* functions below are for dynamic buffer management */
+/*****************************************************/
+
+/* re-enables receiving on mux <target> after a buffer was allocated. It returns
+ * 1 if the allocation succeeds, in which case the connection is woken up, or 0
+ * if it's impossible to wake up and we prefer to be woken up later.
+ */
+static int h2_dbuf_available(void *target)
+{
+       struct h2c *h2c = target;
+
+       /* take the buffer now as we'll get scheduled waiting for ->wake() */
+       if (b_alloc_margin(&h2c->dbuf, 0)) {
+               conn_xprt_want_recv(h2c->conn);
+               return 1;
+       }
+       return 0;
+}
+
+static inline struct buffer *h2_get_dbuf(struct h2c *h2c)
+{
+       struct buffer *buf = NULL;
+
+       if (likely(LIST_ISEMPTY(&h2c->dbuf_wait.list)) &&
+           unlikely((buf = b_alloc_margin(&h2c->dbuf, 0)) == NULL)) {
+               h2c->dbuf_wait.target = h2c->conn;
+               h2c->dbuf_wait.wakeup_cb = h2_dbuf_available;
+               SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+               LIST_ADDQ(&buffer_wq, &h2c->dbuf_wait.list);
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+               __conn_xprt_stop_recv(h2c->conn);
+       }
+       return buf;
+}
+
+static inline void h2_release_dbuf(struct h2c *h2c)
+{
+       if (h2c->dbuf->size) {
+               b_free(&h2c->dbuf);
+               offer_buffers(h2c->dbuf_wait.target,
+                             tasks_run_queue + applets_active_queue);
+       }
+}
+
+
 /*****************************************************************/
 /* functions below are dedicated to the mux setup and management */
 /*****************************************************************/
@@ -169,6 +217,7 @@ static int h2c_frt_init(struct connection *conn)
        h2c->streams_by_id = EB_ROOT_UNIQUE;
        LIST_INIT(&h2c->send_list);
        LIST_INIT(&h2c->fctl_list);
+       LIST_INIT(&h2c->dbuf_wait.list);
        conn->mux_ctx = h2c;
 
        conn_xprt_want_recv(conn);
@@ -205,6 +254,10 @@ static void h2_release(struct connection *conn)
 
        if (h2c) {
                hpack_dht_free(h2c->ddht);
+               h2_release_dbuf(h2c);
+               SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+               LIST_DEL(&h2c->dbuf_wait.list);
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                pool_free2(pool2_h2c, h2c);
        }
 
@@ -227,12 +280,16 @@ static void h2_release(struct connection *conn)
 static void h2_recv(struct connection *conn)
 {
        struct h2c *h2c = conn->mux_ctx;
-       struct buffer *buf = h2c->dbuf;
+       struct buffer *buf;
        int max;
 
        if (conn->flags & CO_FL_ERROR)
                goto error;
 
+       buf = h2_get_dbuf(h2c);
+       if (!buf)
+               return;
+
        /* note: buf->o == 0 */
        max = buf->size - buf->i;
        if (!max) {
@@ -245,6 +302,9 @@ static void h2_recv(struct connection *conn)
        if (conn->flags & CO_FL_ERROR)
                goto error;
 
+       if (!buf->i)
+               h2_release_dbuf(h2c);
+
        if (buf->i == buf->size) {
                /* buffer now full */
                __conn_xprt_stop_recv(conn);