]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: dynbuf: make the buffer_wq an array of list heads
authorWilly Tarreau <w@1wt.eu>
Mon, 22 Apr 2024 16:39:06 +0000 (18:39 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 10 May 2024 15:18:13 +0000 (17:18 +0200)
Let's turn the buffer_wq into an array of 4 list heads. These are chosen
by criticality. The DB_CRIT_TO_QUEUE() macro maps each criticality level
into one of these 4 queues. The goal here clearly is to make it possible
to wake up the most critical queues in priority in order to let some tasks
finish their job and release buffers that others can use.

In order to avoid having to look up all queues, a bit map indicates which
queues are in use, which also allows to avoid looping in the most common
case where queues are empty..

include/haproxy/dynbuf-t.h
include/haproxy/dynbuf.h
include/haproxy/tinfo-t.h
src/dynbuf.c

index b64cda144f41621340217a2fb6e152857c5be0e5..b93c6e0a4be55bfa10b00712690329608c1eeddd 100644 (file)
@@ -48,6 +48,9 @@
  *     snd_buf() handler to encode the outgoing channel's data.
  *   - buffer permanently allocated at boot (e.g. temporary compression
  *     buffers). If these fail, we can't boot.
+ *
+ * Please DO NOT CHANGE THESE LEVELS without first getting a full understanding
+ * of how all this works and touching the DB_CRIT_TO_QUEUE() macro below!
  */
 enum dynbuf_crit {
        DB_GROW_RING = 0, // used to grow an existing buffer ring
@@ -61,6 +64,29 @@ enum dynbuf_crit {
        DB_PERMANENT,     // buffers permanently allocated.
 };
 
+/* We'll deal with 4 queues, with indexes numbered from 0 to 3 based on the
+ * criticality of the allocation. All criticality levels are mapped to a 2-bit
+ * queue index. While some levels never use the queue (the first two), some of
+ * the others will share a same queue, and all levels will define a ratio of
+ * allocated emergency buffers below which we refrain from trying to allocate.
+ * In practice, for now the thresholds will just be the queue number times 33%
+ * so that queue 0 is allowed to deplete emergency buffers and queue 3 not at
+ * all. This gives us: queue idx=3 for DB_MUX_RX and below, 2 for DB_SE_RX,
+ * 1 for DB_CHANNEL, 0 for DB_MUX_TX and above. This must match the DYNBUF_NBQ
+ * in tinfo-t.h.
+ */
+
+#define DB_CRIT_TO_QUEUE(crit) ((0x000001BF >> ((crit) * 2)) & 3)
+
+#define DB_GROW_RING_Q      DB_CRIT_TO_QUEUE(DB_GROW_RING)
+#define DB_UNLIKELY_Q       DB_CRIT_TO_QUEUE(DB_UNLIKELY)
+#define DB_MUX_RX_Q         DB_CRIT_TO_QUEUE(DB_MUX_RX)
+#define DB_SE_RX_Q          DB_CRIT_TO_QUEUE(DB_SE_RX)
+#define DB_CHANNEL_Q        DB_CRIT_TO_QUEUE(DB_CHANNEL)
+#define DB_MUX_TX_Q         DB_CRIT_TO_QUEUE(DB_MUX_TX)
+#define DB_PERMANENT_Q      DB_CRIT_TO_QUEUE(DB_PERMANENT)
+
+
 /* an element of the <buffer_wq> list. It represents an object that need to
  * acquire a buffer to continue its process. */
 struct buffer_wait {
index 4541281ee9f09e02af5371217f9fdbffbeaeb86f..d8a7552fc57eef9f9d3bcadf8f5fe9d39249288d 100644 (file)
@@ -117,8 +117,19 @@ void __offer_buffers(void *from, unsigned int count);
 
 static inline void offer_buffers(void *from, unsigned int count)
 {
-       if (!LIST_ISEMPTY(&th_ctx->buffer_wq))
+       int q;
+
+       if (likely(!th_ctx->bufq_map))
+               return;
+
+       for (q = 0; q < DYNBUF_NBQ; q++) {
+               if (!(th_ctx->bufq_map & (1 << q)))
+                       continue;
+
+               BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q]));
                __offer_buffers(from, count);
+               break;
+       }
 }
 
 /* Queues a buffer request for the current thread via <bw>, and returns
@@ -130,6 +141,8 @@ static inline void offer_buffers(void *from, unsigned int count)
  */
 static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw)
 {
+       int q = DB_CRIT_TO_QUEUE(crit);
+
        if (LIST_INLIST(&bw->list))
                return 1;
 
@@ -137,7 +150,8 @@ static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw)
        if (crit < DB_MUX_RX)
                return 0;
 
-       LIST_APPEND(&th_ctx->buffer_wq, &bw->list);
+       th_ctx->bufq_map |= 1 << q;
+       LIST_APPEND(&th_ctx->buffer_wq[q], &bw->list);
        return 1;
 }
 
index 8e7638e2b96a6d0868eaed7f38b9fa1b14f6a314..4ed78756afc6c2a9b67c09752c29557d65257a64 100644 (file)
@@ -65,6 +65,8 @@ enum {
 #define TH_FL_STARTED           0x00000010  /* set once the thread starts */
 #define TH_FL_IN_LOOP           0x00000020  /* set only inside the polling loop */
 
+/* we have 4 buffer-wait queues, in highest to lowest emergency order */
+#define DYNBUF_NBQ              4
 
 /* Thread group information. This defines a base and a count of global thread
  * IDs which belong to it, and which can be looked up into thread_info/ctx. It
@@ -133,14 +135,15 @@ struct thread_ctx {
        int current_queue;                  /* points to current tasklet list being run, -1 if none */
        unsigned int nb_tasks;              /* number of tasks allocated on this thread */
        uint8_t tl_class_mask;              /* bit mask of non-empty tasklets classes */
+       uint8_t bufq_map;                   /* one bit per non-empty buffer_wq */
 
-       // 7 bytes hole here
+       // 6 bytes hole here
        struct list pool_lru_head;          /* oldest objects in thread-local pool caches */
-       struct list buffer_wq;              /* buffer waiters */
        struct list streams;                /* list of streams attached to this thread */
        struct list quic_conns;             /* list of active quic-conns attached to this thread */
        struct list quic_conns_clo;         /* list of closing quic-conns attached to this thread */
        struct list queued_checks;          /* checks waiting for a connection slot */
+       struct list buffer_wq[DYNBUF_NBQ];  /* buffer waiters, 4 criticality-based queues */
        unsigned int nb_rhttp_conns;        /* count of current conns used for active reverse HTTP */
 
        ALWAYS_ALIGN(2*sizeof(void*));
index 7a9deb886414d857e9d3ff50c3fe989d24af691d..a849aeeb93b92250486e922a911d750fcbdb0fba 100644 (file)
@@ -30,13 +30,24 @@ int init_buffer()
        void *buffer;
        int thr;
        int done;
+       int i;
 
        pool_head_buffer = create_pool("buffer", global.tune.bufsize, MEM_F_SHARED|MEM_F_EXACT);
        if (!pool_head_buffer)
                return 0;
 
-       for (thr = 0; thr < MAX_THREADS; thr++)
-               LIST_INIT(&ha_thread_ctx[thr].buffer_wq);
+       /* make sure any change to the queues assignment isn't overlooked */
+       BUG_ON(DB_PERMANENT - DB_UNLIKELY - 1 != DYNBUF_NBQ);
+       BUG_ON(DB_MUX_RX_Q  < DB_SE_RX_Q   || DB_MUX_RX_Q  >= DYNBUF_NBQ);
+       BUG_ON(DB_SE_RX_Q   < DB_CHANNEL_Q || DB_SE_RX_Q   >= DYNBUF_NBQ);
+       BUG_ON(DB_CHANNEL_Q < DB_MUX_TX_Q  || DB_CHANNEL_Q >= DYNBUF_NBQ);
+       BUG_ON(DB_MUX_TX_Q >= DYNBUF_NBQ);
+
+       for (thr = 0; thr < MAX_THREADS; thr++) {
+               for (i = 0; i < DYNBUF_NBQ; i++)
+                       LIST_INIT(&ha_thread_ctx[thr].buffer_wq[i]);
+               ha_thread_ctx[thr].bufq_map = 0;
+       }
 
 
        /* The reserved buffer is what we leave behind us. Thus we always need
@@ -104,6 +115,7 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to)
 void __offer_buffers(void *from, unsigned int count)
 {
        struct buffer_wait *wait, *wait_back;
+       int q;
 
        /* For now, we consider that all objects need 1 buffer, so we can stop
         * waking up them once we have enough of them to eat all the available
@@ -111,15 +123,23 @@ void __offer_buffers(void *from, unsigned int count)
         * other tasks, but that's a rough estimate. Similarly, for each cached
         * event we'll need 1 buffer.
         */
-       list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq, list) {
-               if (!count)
-                       break;
-
-               if (wait->target == from || !wait->wakeup_cb(wait->target))
+       for (q = 0; q < DYNBUF_NBQ; q++) {
+               if (!(th_ctx->bufq_map & (1 << q)))
                        continue;
+               BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q]));
+
+               list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq[q], list) {
+                       if (!count)
+                               break;
 
-               LIST_DEL_INIT(&wait->list);
-               count--;
+                       if (wait->target == from || !wait->wakeup_cb(wait->target))
+                               continue;
+
+                       LIST_DEL_INIT(&wait->list);
+                       count--;
+               }
+               if (LIST_ISEMPTY(&th_ctx->buffer_wq[q]))
+                       th_ctx->bufq_map &= ~(1 << q);
        }
 }