]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: threads/buffer: Make buffer wait queue thread safe
authorEmeric Brun <ebrun@haproxy.com>
Wed, 21 Jun 2017 13:42:52 +0000 (15:42 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:31 +0000 (13:58 +0100)
Adds a global lock to protect the buffer wait queue.

include/common/buffer.h
include/common/hathreads.h
include/proto/applet.h
include/proto/channel.h
src/buffer.c
src/flt_spoe.c
src/stream.c

index 17931cf20931f163aef4bd7772e5f5caab2a2b0f..f11d6a9621bfa9695d40c76522eaa148b09d7cd6 100644 (file)
@@ -52,6 +52,9 @@ extern struct pool_head *pool2_buffer;
 extern struct buffer buf_empty;
 extern struct buffer buf_wanted;
 extern struct list buffer_wq;
+#ifdef USE_THREAD
+extern HA_SPINLOCK_T buffer_wq_lock;
+#endif
 
 int init_buffer();
 void deinit_buffer();
@@ -748,9 +751,13 @@ void __offer_buffer(void *from, unsigned int threshold);
 
 static inline void offer_buffers(void *from, unsigned int threshold)
 {
-       if (LIST_ISEMPTY(&buffer_wq))
+       SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+       if (LIST_ISEMPTY(&buffer_wq)) {
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                return;
+       }
        __offer_buffer(from, threshold);
+       SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
 }
 
 /*************************************************************************/
index 3a77bd175738f8a80118a9080bdc6dd56448c7d0..1717cc9b78aa16f8a3862834c7f982b95ef8d1c2 100644 (file)
@@ -156,6 +156,7 @@ enum lock_label {
        STK_SESS_LOCK,
        APPLETS_LOCK,
        PEER_LOCK,
+       BUF_WQ_LOCK,
        LOCK_LABELS
 };
 struct lock_stat {
@@ -242,7 +243,7 @@ static inline void show_lock_stats()
                                           "TASK_RQ", "TASK_WQ", "POOL",
                                           "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
                                           "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
-                                          "APPLETS", "PEER" };
+                                          "APPLETS", "PEER", "BUF_WQ" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
index d9f0ce2dff8d9f5fbbbe5784baa59868c9801c4f..766fc92311c01fc3bf2fe21b1cbd060ad566850a 100644 (file)
@@ -88,8 +88,10 @@ static inline void __appctx_free(struct appctx *appctx)
        }
 
        if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
+               SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                LIST_DEL(&appctx->buffer_wait.list);
                LIST_INIT(&appctx->buffer_wait.list);
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        }
 
        pool_free2(pool2_connection, appctx);
index 9e12b5efc41d1d7ca78155315dedacf7da497ca1..83ad0aab067b2e41006817ee3583e22c584b3e3d 100644 (file)
@@ -440,8 +440,12 @@ static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait *
        if (b_alloc_margin(&chn->buf, margin) != NULL)
                return 1;
 
-       if (LIST_ISEMPTY(&wait->list))
+       if (LIST_ISEMPTY(&wait->list)) {
+               SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                LIST_ADDQ(&buffer_wq, &wait->list);
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
+       }
+
        return 0;
 }
 
index 83e4e9e36f5468fc1f137c13577dd104f5e2948d..e892d1e4d4bf87295e719a64eaaa1e873f2fbe36 100644 (file)
@@ -33,6 +33,9 @@ struct buffer buf_wanted = { .p = buf_wanted.data };
 
 /* list of objects waiting for at least one buffer */
 struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
+#ifdef USE_THREAD
+HA_SPINLOCK_T buffer_wq_lock;
+#endif
 
 /* this buffer is always the same size as standard buffers and is used for
  * swapping data inside a buffer.
@@ -72,6 +75,8 @@ int init_buffer()
        if (global.tune.buf_limit)
                pool2_buffer->limit = global.tune.buf_limit;
 
+       SPIN_INIT(&buffer_wq_lock);
+
        buffer = pool_refill_alloc(pool2_buffer, pool2_buffer->minavail - 1);
        if (!buffer)
                return 0;
index aa3f37a103a30de4c9dfcf967a5b5157d3532468..7fc4ed87f3370eeea757e53f237ce0c04d1dc1b4 100644 (file)
@@ -18,6 +18,7 @@
 #include <common/debug.h>
 #include <common/memory.h>
 #include <common/time.h>
+#include <common/hathreads.h>
 
 #include <types/arg.h>
 #include <types/global.h>
@@ -2685,14 +2686,18 @@ spoe_acquire_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
                return 1;
 
        if (!LIST_ISEMPTY(&buffer_wait->list)) {
+               SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                LIST_DEL(&buffer_wait->list);
                LIST_INIT(&buffer_wait->list);
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        }
 
        if (b_alloc_margin(buf, global.tune.reserved_bufs))
                return 1;
 
+       SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        LIST_ADDQ(&buffer_wq, &buffer_wait->list);
+       SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        return 0;
 }
 
@@ -2700,8 +2705,10 @@ static void
 spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
 {
        if (!LIST_ISEMPTY(&buffer_wait->list)) {
+               SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                LIST_DEL(&buffer_wait->list);
                LIST_INIT(&buffer_wait->list);
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        }
 
        /* Release the buffer if needed */
index 8975638ac56b9e1b8c8f751ffbafc39633d1ff59..51d235454ff506b5659230cfde9f87dfe6dea5c4 100644 (file)
@@ -320,8 +320,10 @@ static void stream_free(struct stream *s)
 
        /* We may still be present in the buffer wait queue */
        if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+               SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                LIST_DEL(&s->buffer_wait.list);
                LIST_INIT(&s->buffer_wait.list);
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        }
        if (s->req.buf->size || s->res.buf->size) {
                b_drop(&s->req.buf);
@@ -415,14 +417,18 @@ static void stream_free(struct stream *s)
 static int stream_alloc_work_buffer(struct stream *s)
 {
        if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+               SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
                LIST_DEL(&s->buffer_wait.list);
                LIST_INIT(&s->buffer_wait.list);
+               SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        }
 
        if (b_alloc_margin(&s->res.buf, 0))
                return 1;
 
+       SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
+       SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
        return 0;
 }