#include <common/config.h>
#include <common/h2.h>
#include <common/hpack-tbl.h>
+#include <proto/applet.h>
#include <proto/connection.h>
#include <proto/stream.h>
#include <eb32tree.h>
struct eb_root streams_by_id; /* all active streams by their ID */
struct list send_list; /* list of blocked streams requesting to send */
struct list fctl_list; /* list of streams blocked by connection's fctl */
+ struct buffer_wait dbuf_wait; /* wait list for demux buffer allocation */
};
/* H2 stream state, in h2s->st */
static int h2_settings_max_concurrent_streams = 100;
+/*****************************************************/
+/* functions below are for dynamic buffer management */
+/*****************************************************/
+
+/* re-enables receiving on mux <target> after a buffer was allocated. It returns
+ * 1 if the allocation succeeds, in which case the connection is woken up, or 0
+ * if it's impossible to wake up and we prefer to be woken up later.
+ */
+static int h2_dbuf_available(void *target)
+{
+ struct h2c *h2c = target;
+
+ /* take the buffer now as we'll get scheduled waiting for ->wake() */
+ if (b_alloc_margin(&h2c->dbuf, 0)) {
+ conn_xprt_want_recv(h2c->conn);
+ return 1;
+ }
+ return 0;
+}
+
+static inline struct buffer *h2_get_dbuf(struct h2c *h2c)
+{
+ struct buffer *buf = NULL;
+
+ if (likely(LIST_ISEMPTY(&h2c->dbuf_wait.list)) &&
+ unlikely((buf = b_alloc_margin(&h2c->dbuf, 0)) == NULL)) {
+ h2c->dbuf_wait.target = h2c->conn;
+ h2c->dbuf_wait.wakeup_cb = h2_dbuf_available;
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+ LIST_ADDQ(&buffer_wq, &h2c->dbuf_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+ __conn_xprt_stop_recv(h2c->conn);
+ }
+ return buf;
+}
+
+static inline void h2_release_dbuf(struct h2c *h2c)
+{
+ if (h2c->dbuf->size) {
+ b_free(&h2c->dbuf);
+ offer_buffers(h2c->dbuf_wait.target,
+ tasks_run_queue + applets_active_queue);
+ }
+}
+
+
/*****************************************************************/
/* functions below are dedicated to the mux setup and management */
/*****************************************************************/
h2c->streams_by_id = EB_ROOT_UNIQUE;
LIST_INIT(&h2c->send_list);
LIST_INIT(&h2c->fctl_list);
+ LIST_INIT(&h2c->dbuf_wait.list);
conn->mux_ctx = h2c;
conn_xprt_want_recv(conn);
if (h2c) {
hpack_dht_free(h2c->ddht);
+ h2_release_dbuf(h2c);
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+ LIST_DEL(&h2c->dbuf_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
pool_free2(pool2_h2c, h2c);
}
static void h2_recv(struct connection *conn)
{
struct h2c *h2c = conn->mux_ctx;
- struct buffer *buf = h2c->dbuf;
+ struct buffer *buf;
int max;
if (conn->flags & CO_FL_ERROR)
goto error;
+ buf = h2_get_dbuf(h2c);
+ if (!buf)
+ return;
+
/* note: buf->o == 0 */
max = buf->size - buf->i;
if (!max) {
if (conn->flags & CO_FL_ERROR)
goto error;
+ if (!buf->i)
+ h2_release_dbuf(h2c);
+
if (buf->i == buf->size) {
/* buffer now full */
__conn_xprt_stop_recv(conn);