From: Willy Tarreau Date: Mon, 22 Apr 2024 16:39:06 +0000 (+0200) Subject: MEDIUM: dynbuf: make the buffer_wq an array of list heads X-Git-Tag: v3.0-dev11~21 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a5d6a7998669bffdc5c4138f5dfbc4a25e119ca3;p=thirdparty%2Fhaproxy.git MEDIUM: dynbuf: make the buffer_wq an array of list heads Let's turn the buffer_wq into an array of 4 list heads. These are chosen by criticality. The DB_CRIT_TO_QUEUE() macro maps each criticality level into one of these 4 queues. The goal here clearly is to make it possible to wake up the most critical queues in priority in order to let some tasks finish their job and release buffers that others can use. In order to avoid having to look up all queues, a bit map indicates which queues are in use, which also allows to avoid looping in the most common case where queues are empty.. --- diff --git a/include/haproxy/dynbuf-t.h b/include/haproxy/dynbuf-t.h index b64cda144f..b93c6e0a4b 100644 --- a/include/haproxy/dynbuf-t.h +++ b/include/haproxy/dynbuf-t.h @@ -48,6 +48,9 @@ * snd_buf() handler to encode the outgoing channel's data. * - buffer permanently allocated at boot (e.g. temporary compression * buffers). If these fail, we can't boot. + * + * Please DO NOT CHANGE THESE LEVELS without first getting a full understanding + * of how all this works and touching the DB_CRIT_TO_QUEUE() macro below! */ enum dynbuf_crit { DB_GROW_RING = 0, // used to grow an existing buffer ring @@ -61,6 +64,29 @@ enum dynbuf_crit { DB_PERMANENT, // buffers permanently allocated. }; +/* We'll deal with 4 queues, with indexes numbered from 0 to 3 based on the + * criticality of the allocation. All criticality levels are mapped to a 2-bit + * queue index. While some levels never use the queue (the first two), some of + * the others will share a same queue, and all levels will define a ratio of + * allocated emergency buffers below which we refrain from trying to allocate. + * In practice, for now the thresholds will just be the queue number times 33% + * so that queue 0 is allowed to deplete emergency buffers and queue 3 not at + * all. This gives us: queue idx=3 for DB_MUX_RX and below, 2 for DB_SE_RX, + * 1 for DB_CHANNEL, 0 for DB_MUX_TX and above. This must match the DYNBUF_NBQ + * in tinfo-t.h. + */ + +#define DB_CRIT_TO_QUEUE(crit) ((0x000001BF >> ((crit) * 2)) & 3) + +#define DB_GROW_RING_Q DB_CRIT_TO_QUEUE(DB_GROW_RING) +#define DB_UNLIKELY_Q DB_CRIT_TO_QUEUE(DB_UNLIKELY) +#define DB_MUX_RX_Q DB_CRIT_TO_QUEUE(DB_MUX_RX) +#define DB_SE_RX_Q DB_CRIT_TO_QUEUE(DB_SE_RX) +#define DB_CHANNEL_Q DB_CRIT_TO_QUEUE(DB_CHANNEL) +#define DB_MUX_TX_Q DB_CRIT_TO_QUEUE(DB_MUX_TX) +#define DB_PERMANENT_Q DB_CRIT_TO_QUEUE(DB_PERMANENT) + + /* an element of the list. It represents an object that need to * acquire a buffer to continue its process. */ struct buffer_wait { diff --git a/include/haproxy/dynbuf.h b/include/haproxy/dynbuf.h index 4541281ee9..d8a7552fc5 100644 --- a/include/haproxy/dynbuf.h +++ b/include/haproxy/dynbuf.h @@ -117,8 +117,19 @@ void __offer_buffers(void *from, unsigned int count); static inline void offer_buffers(void *from, unsigned int count) { - if (!LIST_ISEMPTY(&th_ctx->buffer_wq)) + int q; + + if (likely(!th_ctx->bufq_map)) + return; + + for (q = 0; q < DYNBUF_NBQ; q++) { + if (!(th_ctx->bufq_map & (1 << q))) + continue; + + BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q])); __offer_buffers(from, count); + break; + } } /* Queues a buffer request for the current thread via , and returns @@ -130,6 +141,8 @@ static inline void offer_buffers(void *from, unsigned int count) */ static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw) { + int q = DB_CRIT_TO_QUEUE(crit); + if (LIST_INLIST(&bw->list)) return 1; @@ -137,7 +150,8 @@ static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw) if (crit < DB_MUX_RX) return 0; - LIST_APPEND(&th_ctx->buffer_wq, &bw->list); + th_ctx->bufq_map |= 1 << q; + LIST_APPEND(&th_ctx->buffer_wq[q], &bw->list); return 1; } diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index 8e7638e2b9..4ed78756af 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -65,6 +65,8 @@ enum { #define TH_FL_STARTED 0x00000010 /* set once the thread starts */ #define TH_FL_IN_LOOP 0x00000020 /* set only inside the polling loop */ +/* we have 4 buffer-wait queues, in highest to lowest emergency order */ +#define DYNBUF_NBQ 4 /* Thread group information. This defines a base and a count of global thread * IDs which belong to it, and which can be looked up into thread_info/ctx. It @@ -133,14 +135,15 @@ struct thread_ctx { int current_queue; /* points to current tasklet list being run, -1 if none */ unsigned int nb_tasks; /* number of tasks allocated on this thread */ uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */ + uint8_t bufq_map; /* one bit per non-empty buffer_wq */ - // 7 bytes hole here + // 6 bytes hole here struct list pool_lru_head; /* oldest objects in thread-local pool caches */ - struct list buffer_wq; /* buffer waiters */ struct list streams; /* list of streams attached to this thread */ struct list quic_conns; /* list of active quic-conns attached to this thread */ struct list quic_conns_clo; /* list of closing quic-conns attached to this thread */ struct list queued_checks; /* checks waiting for a connection slot */ + struct list buffer_wq[DYNBUF_NBQ]; /* buffer waiters, 4 criticality-based queues */ unsigned int nb_rhttp_conns; /* count of current conns used for active reverse HTTP */ ALWAYS_ALIGN(2*sizeof(void*)); diff --git a/src/dynbuf.c b/src/dynbuf.c index 7a9deb8864..a849aeeb93 100644 --- a/src/dynbuf.c +++ b/src/dynbuf.c @@ -30,13 +30,24 @@ int init_buffer() void *buffer; int thr; int done; + int i; pool_head_buffer = create_pool("buffer", global.tune.bufsize, MEM_F_SHARED|MEM_F_EXACT); if (!pool_head_buffer) return 0; - for (thr = 0; thr < MAX_THREADS; thr++) - LIST_INIT(&ha_thread_ctx[thr].buffer_wq); + /* make sure any change to the queues assignment isn't overlooked */ + BUG_ON(DB_PERMANENT - DB_UNLIKELY - 1 != DYNBUF_NBQ); + BUG_ON(DB_MUX_RX_Q < DB_SE_RX_Q || DB_MUX_RX_Q >= DYNBUF_NBQ); + BUG_ON(DB_SE_RX_Q < DB_CHANNEL_Q || DB_SE_RX_Q >= DYNBUF_NBQ); + BUG_ON(DB_CHANNEL_Q < DB_MUX_TX_Q || DB_CHANNEL_Q >= DYNBUF_NBQ); + BUG_ON(DB_MUX_TX_Q >= DYNBUF_NBQ); + + for (thr = 0; thr < MAX_THREADS; thr++) { + for (i = 0; i < DYNBUF_NBQ; i++) + LIST_INIT(&ha_thread_ctx[thr].buffer_wq[i]); + ha_thread_ctx[thr].bufq_map = 0; + } /* The reserved buffer is what we leave behind us. Thus we always need @@ -104,6 +115,7 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to) void __offer_buffers(void *from, unsigned int count) { struct buffer_wait *wait, *wait_back; + int q; /* For now, we consider that all objects need 1 buffer, so we can stop * waking up them once we have enough of them to eat all the available @@ -111,15 +123,23 @@ void __offer_buffers(void *from, unsigned int count) * other tasks, but that's a rough estimate. Similarly, for each cached * event we'll need 1 buffer. */ - list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq, list) { - if (!count) - break; - - if (wait->target == from || !wait->wakeup_cb(wait->target)) + for (q = 0; q < DYNBUF_NBQ; q++) { + if (!(th_ctx->bufq_map & (1 << q))) continue; + BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q])); + + list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq[q], list) { + if (!count) + break; - LIST_DEL_INIT(&wait->list); - count--; + if (wait->target == from || !wait->wakeup_cb(wait->target)) + continue; + + LIST_DEL_INIT(&wait->list); + count--; + } + if (LIST_ISEMPTY(&th_ctx->buffer_wq[q])) + th_ctx->bufq_map &= ~(1 << q); } }