Adds a global lock to protect the buffer wait queue.
extern struct buffer buf_empty;
extern struct buffer buf_wanted;
extern struct list buffer_wq;
+#ifdef USE_THREAD
+extern HA_SPINLOCK_T buffer_wq_lock;
+#endif
int init_buffer();
void deinit_buffer();
static inline void offer_buffers(void *from, unsigned int threshold)
{
- if (LIST_ISEMPTY(&buffer_wq))
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+ if (LIST_ISEMPTY(&buffer_wq)) {
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
return;
+ }
__offer_buffer(from, threshold);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
/*************************************************************************/
STK_SESS_LOCK,
APPLETS_LOCK,
PEER_LOCK,
+ BUF_WQ_LOCK,
LOCK_LABELS
};
struct lock_stat {
"TASK_RQ", "TASK_WQ", "POOL",
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
- "APPLETS", "PEER" };
+ "APPLETS", "PEER", "BUF_WQ" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
}
if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&appctx->buffer_wait.list);
LIST_INIT(&appctx->buffer_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
pool_free2(pool2_connection, appctx);
if (b_alloc_margin(&chn->buf, margin) != NULL)
return 1;
- if (LIST_ISEMPTY(&wait->list))
+ if (LIST_ISEMPTY(&wait->list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_ADDQ(&buffer_wq, &wait->list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+ }
+
return 0;
}
/* list of objects waiting for at least one buffer */
struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
+#ifdef USE_THREAD
+HA_SPINLOCK_T buffer_wq_lock;
+#endif
/* this buffer is always the same size as standard buffers and is used for
* swapping data inside a buffer.
if (global.tune.buf_limit)
pool2_buffer->limit = global.tune.buf_limit;
+ SPIN_INIT(&buffer_wq_lock);
+
buffer = pool_refill_alloc(pool2_buffer, pool2_buffer->minavail - 1);
if (!buffer)
return 0;
#include <common/debug.h>
#include <common/memory.h>
#include <common/time.h>
+#include <common/hathreads.h>
#include <types/arg.h>
#include <types/global.h>
return 1;
if (!LIST_ISEMPTY(&buffer_wait->list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&buffer_wait->list);
LIST_INIT(&buffer_wait->list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (b_alloc_margin(buf, global.tune.reserved_bufs))
return 1;
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_ADDQ(&buffer_wq, &buffer_wait->list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
return 0;
}
spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
if (!LIST_ISEMPTY(&buffer_wait->list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&buffer_wait->list);
LIST_INIT(&buffer_wait->list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
/* Release the buffer if needed */
/* We may still be present in the buffer wait queue */
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&s->buffer_wait.list);
LIST_INIT(&s->buffer_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (s->req.buf->size || s->res.buf->size) {
b_drop(&s->req.buf);
static int stream_alloc_work_buffer(struct stream *s)
{
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&s->buffer_wait.list);
LIST_INIT(&s->buffer_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (b_alloc_margin(&s->res.buf, 0))
return 1;
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
return 0;
}