* snd_buf() handler to encode the outgoing channel's data.
* - buffer permanently allocated at boot (e.g. temporary compression
* buffers). If these fail, we can't boot.
+ *
+ * Please DO NOT CHANGE THESE LEVELS without first getting a full understanding
+ * of how all this works and touching the DB_CRIT_TO_QUEUE() macro below!
*/
enum dynbuf_crit {
DB_GROW_RING = 0, // used to grow an existing buffer ring
DB_PERMANENT, // buffers permanently allocated.
};
+/* We'll deal with 4 queues, with indexes numbered from 0 to 3 based on the
+ * criticality of the allocation. All criticality levels are mapped to a 2-bit
+ * queue index. While some levels never use the queue (the first two), some of
+ * the others will share a same queue, and all levels will define a ratio of
+ * allocated emergency buffers below which we refrain from trying to allocate.
+ * In practice, for now the thresholds will just be the queue number times 33%
+ * so that queue 0 is allowed to deplete emergency buffers and queue 3 not at
+ * all. This gives us: queue idx=3 for DB_MUX_RX and below, 2 for DB_SE_RX,
+ * 1 for DB_CHANNEL, 0 for DB_MUX_TX and above. This must match the DYNBUF_NBQ
+ * in tinfo-t.h.
+ */
+
+#define DB_CRIT_TO_QUEUE(crit) ((0x000001BF >> ((crit) * 2)) & 3)
+
+#define DB_GROW_RING_Q DB_CRIT_TO_QUEUE(DB_GROW_RING)
+#define DB_UNLIKELY_Q DB_CRIT_TO_QUEUE(DB_UNLIKELY)
+#define DB_MUX_RX_Q DB_CRIT_TO_QUEUE(DB_MUX_RX)
+#define DB_SE_RX_Q DB_CRIT_TO_QUEUE(DB_SE_RX)
+#define DB_CHANNEL_Q DB_CRIT_TO_QUEUE(DB_CHANNEL)
+#define DB_MUX_TX_Q DB_CRIT_TO_QUEUE(DB_MUX_TX)
+#define DB_PERMANENT_Q DB_CRIT_TO_QUEUE(DB_PERMANENT)
+
+
/* an element of the <buffer_wq> list. It represents an object that need to
* acquire a buffer to continue its process. */
struct buffer_wait {
static inline void offer_buffers(void *from, unsigned int count)
{
- if (!LIST_ISEMPTY(&th_ctx->buffer_wq))
+ int q;
+
+ if (likely(!th_ctx->bufq_map))
+ return;
+
+ for (q = 0; q < DYNBUF_NBQ; q++) {
+ if (!(th_ctx->bufq_map & (1 << q)))
+ continue;
+
+ BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q]));
__offer_buffers(from, count);
+ break;
+ }
}
/* Queues a buffer request for the current thread via <bw>, and returns
*/
static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw)
{
+ int q = DB_CRIT_TO_QUEUE(crit);
+
if (LIST_INLIST(&bw->list))
return 1;
if (crit < DB_MUX_RX)
return 0;
- LIST_APPEND(&th_ctx->buffer_wq, &bw->list);
+ th_ctx->bufq_map |= 1 << q;
+ LIST_APPEND(&th_ctx->buffer_wq[q], &bw->list);
return 1;
}
#define TH_FL_STARTED 0x00000010 /* set once the thread starts */
#define TH_FL_IN_LOOP 0x00000020 /* set only inside the polling loop */
+/* we have 4 buffer-wait queues, in highest to lowest emergency order */
+#define DYNBUF_NBQ 4
/* Thread group information. This defines a base and a count of global thread
* IDs which belong to it, and which can be looked up into thread_info/ctx. It
int current_queue; /* points to current tasklet list being run, -1 if none */
unsigned int nb_tasks; /* number of tasks allocated on this thread */
uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */
+ uint8_t bufq_map; /* one bit per non-empty buffer_wq */
- // 7 bytes hole here
+ // 6 bytes hole here
struct list pool_lru_head; /* oldest objects in thread-local pool caches */
- struct list buffer_wq; /* buffer waiters */
struct list streams; /* list of streams attached to this thread */
struct list quic_conns; /* list of active quic-conns attached to this thread */
struct list quic_conns_clo; /* list of closing quic-conns attached to this thread */
struct list queued_checks; /* checks waiting for a connection slot */
+ struct list buffer_wq[DYNBUF_NBQ]; /* buffer waiters, 4 criticality-based queues */
unsigned int nb_rhttp_conns; /* count of current conns used for active reverse HTTP */
ALWAYS_ALIGN(2*sizeof(void*));
void *buffer;
int thr;
int done;
+ int i;
pool_head_buffer = create_pool("buffer", global.tune.bufsize, MEM_F_SHARED|MEM_F_EXACT);
if (!pool_head_buffer)
return 0;
- for (thr = 0; thr < MAX_THREADS; thr++)
- LIST_INIT(&ha_thread_ctx[thr].buffer_wq);
+ /* make sure any change to the queues assignment isn't overlooked */
+ BUG_ON(DB_PERMANENT - DB_UNLIKELY - 1 != DYNBUF_NBQ);
+ BUG_ON(DB_MUX_RX_Q < DB_SE_RX_Q || DB_MUX_RX_Q >= DYNBUF_NBQ);
+ BUG_ON(DB_SE_RX_Q < DB_CHANNEL_Q || DB_SE_RX_Q >= DYNBUF_NBQ);
+ BUG_ON(DB_CHANNEL_Q < DB_MUX_TX_Q || DB_CHANNEL_Q >= DYNBUF_NBQ);
+ BUG_ON(DB_MUX_TX_Q >= DYNBUF_NBQ);
+
+ for (thr = 0; thr < MAX_THREADS; thr++) {
+ for (i = 0; i < DYNBUF_NBQ; i++)
+ LIST_INIT(&ha_thread_ctx[thr].buffer_wq[i]);
+ ha_thread_ctx[thr].bufq_map = 0;
+ }
/* The reserved buffer is what we leave behind us. Thus we always need
void __offer_buffers(void *from, unsigned int count)
{
struct buffer_wait *wait, *wait_back;
+ int q;
/* For now, we consider that all objects need 1 buffer, so we can stop
* waking up them once we have enough of them to eat all the available
* other tasks, but that's a rough estimate. Similarly, for each cached
* event we'll need 1 buffer.
*/
- list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq, list) {
- if (!count)
- break;
-
- if (wait->target == from || !wait->wakeup_cb(wait->target))
+ for (q = 0; q < DYNBUF_NBQ; q++) {
+ if (!(th_ctx->bufq_map & (1 << q)))
continue;
+ BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q]));
+
+ list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq[q], list) {
+ if (!count)
+ break;
- LIST_DEL_INIT(&wait->list);
- count--;
+ if (wait->target == from || !wait->wakeup_cb(wait->target))
+ continue;
+
+ LIST_DEL_INIT(&wait->list);
+ count--;
+ }
+ if (LIST_ISEMPTY(&th_ctx->buffer_wq[q]))
+ th_ctx->bufq_map &= ~(1 << q);
}
}