From 2104659cd5a7d7547e171a814cd85c2e402b8d86 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Wed, 26 Feb 2020 10:39:36 +0100 Subject: [PATCH] MEDIUM: buffer: remove the buffer_wq lock This lock was only needed to protect the buffer_wq list, but now we have the mt_list for this. This patch simply turns the buffer_wq list to an mt_list and gets rid of the lock. It's worth noting that the whole buffer_wait thing still looks totally wrong especially in a threaded context: the wakeup_cb() callback is called synchronously from any thread and may end up calling some connection code that was not expected to run on a given thread. The whole thing should probably be reworked to use tasklets instead and be a bit more centralized. --- include/common/buffer.h | 11 +++-------- include/common/hathreads.h | 2 -- include/proto/applet.h | 10 +++------- include/proto/channel.h | 7 ++----- src/buffer.c | 11 +++++------ src/flt_spoe.c | 24 +++++++----------------- src/mux_fcgi.c | 15 +++++---------- src/mux_h1.c | 16 +++++----------- src/mux_h2.c | 15 +++++---------- src/stream.c | 25 ++++++++----------------- 10 files changed, 43 insertions(+), 93 deletions(-) diff --git a/include/common/buffer.h b/include/common/buffer.h index 0d44ec8d5d..779186f605 100644 --- a/include/common/buffer.h +++ b/include/common/buffer.h @@ -40,11 +40,11 @@ struct buffer_wait { void *target; /* The waiting object that should be woken up */ int (*wakeup_cb)(void *); /* The function used to wake up the , passed as argument */ - struct list list; /* Next element in the list */ + struct mt_list list; /* Next element in the list */ }; extern struct pool_head *pool_head_buffer; -extern struct list buffer_wq; +extern struct mt_list buffer_wq; __decl_hathreads(extern HA_SPINLOCK_T buffer_wq_lock); int init_buffer(); @@ -203,13 +203,8 @@ void __offer_buffer(void *from, unsigned int threshold); static inline void offer_buffers(void *from, unsigned int threshold) { - if (LIST_ISEMPTY(&buffer_wq)) - return; - - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - if (!LIST_ISEMPTY(&buffer_wq)) + if (!MT_LIST_ISEMPTY(&buffer_wq)) __offer_buffer(from, threshold); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); } diff --git a/include/common/hathreads.h b/include/common/hathreads.h index daef3a440d..afd52d0339 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -542,7 +542,6 @@ enum lock_label { STK_SESS_LOCK, APPLETS_LOCK, PEER_LOCK, - BUF_WQ_LOCK, STRMS_LOCK, SSL_LOCK, SSL_GEN_CERTS_LOCK, @@ -661,7 +660,6 @@ static inline const char *lock_label(enum lock_label label) case STK_SESS_LOCK: return "STK_SESS"; case APPLETS_LOCK: return "APPLETS"; case PEER_LOCK: return "PEER"; - case BUF_WQ_LOCK: return "BUF_WQ"; case STRMS_LOCK: return "STRMS"; case SSL_LOCK: return "SSL"; case SSL_GEN_CERTS_LOCK: return "SSL_GEN_CERTS"; diff --git a/include/proto/applet.h b/include/proto/applet.h index b4e9396e0d..31b4d90252 100644 --- a/include/proto/applet.h +++ b/include/proto/applet.h @@ -75,7 +75,7 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr } appctx->t->process = task_run_applet; appctx->t->context = appctx; - LIST_INIT(&appctx->buffer_wait.list); + MT_LIST_INIT(&appctx->buffer_wait.list); appctx->buffer_wait.target = appctx; appctx->buffer_wait.wakeup_cb = appctx_buf_available; _HA_ATOMIC_ADD(&nb_applets, 1); @@ -87,12 +87,8 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr static inline void __appctx_free(struct appctx *appctx) { task_destroy(appctx->t); - if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_DEL(&appctx->buffer_wait.list); - LIST_INIT(&appctx->buffer_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (MT_LIST_ADDED(&appctx->buffer_wait.list)) + MT_LIST_DEL(&appctx->buffer_wait.list); pool_free(pool_head_appctx, appctx); _HA_ATOMIC_SUB(&nb_applets, 1); diff --git a/include/proto/channel.h b/include/proto/channel.h index 5411a74fc2..88f4f25312 100644 --- a/include/proto/channel.h +++ b/include/proto/channel.h @@ -855,11 +855,8 @@ static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait * if (b_alloc_margin(&chn->buf, margin) != NULL) return 1; - if (LIST_ISEMPTY(&wait->list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_ADDQ(&buffer_wq, &wait->list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (!MT_LIST_ADDED(&wait->list)) + MT_LIST_ADDQ(&buffer_wq, &wait->list); return 0; } diff --git a/src/buffer.c b/src/buffer.c index 6211d147c5..9c1c9b4842 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -23,7 +23,7 @@ struct pool_head *pool_head_buffer; /* list of objects waiting for at least one buffer */ -struct list buffer_wq = LIST_HEAD_INIT(buffer_wq); +struct mt_list buffer_wq = LIST_HEAD_INIT(buffer_wq); __decl_aligned_spinlock(buffer_wq_lock); /* perform minimal intializations, report 0 in case of error, 1 if OK. */ @@ -98,7 +98,8 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to) /* see offer_buffer() for details */ void __offer_buffer(void *from, unsigned int threshold) { - struct buffer_wait *wait, *bak; + struct buffer_wait *wait; + struct mt_list *elt1, elt2; int avail; /* For now, we consider that all objects need 1 buffer, so we can stop @@ -112,16 +113,14 @@ void __offer_buffer(void *from, unsigned int threshold) */ avail = pool_head_buffer->allocated - pool_head_buffer->used - global.tune.reserved_bufs / 2; - list_for_each_entry_safe(wait, bak, &buffer_wq, list) { + mt_list_for_each_entry_safe(wait, &buffer_wq, list, elt1, elt2) { if (avail <= threshold) break; if (wait->target == from || !wait->wakeup_cb(wait->target)) continue; - LIST_DEL(&wait->list); - LIST_INIT(&wait->list); - + MT_LIST_DEL_SAFE(&wait->list); avail--; } } diff --git a/src/flt_spoe.c b/src/flt_spoe.c index ecad5c4461..d54fcd4370 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1988,7 +1988,7 @@ spoe_create_appctx(struct spoe_config *conf) SPOE_APPCTX(appctx)->buffer = BUF_NULL; SPOE_APPCTX(appctx)->cur_fpa = 0; - LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list); + MT_LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list); SPOE_APPCTX(appctx)->buffer_wait.target = appctx; SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_appctx; @@ -2834,31 +2834,21 @@ spoe_acquire_buffer(struct buffer *buf, struct buffer_wait *buffer_wait) if (buf->size) return 1; - if (!LIST_ISEMPTY(&buffer_wait->list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_DEL(&buffer_wait->list); - LIST_INIT(&buffer_wait->list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (MT_LIST_ADDED(&buffer_wait->list)) + MT_LIST_DEL(&buffer_wait->list); if (b_alloc_margin(buf, global.tune.reserved_bufs)) return 1; - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_ADDQ(&buffer_wq, &buffer_wait->list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); + MT_LIST_ADDQ(&buffer_wq, &buffer_wait->list); return 0; } static void spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait) { - if (!LIST_ISEMPTY(&buffer_wait->list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_DEL(&buffer_wait->list); - LIST_INIT(&buffer_wait->list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (MT_LIST_ADDED(&buffer_wait->list)) + MT_LIST_DEL(&buffer_wait->list); /* Release the buffer if needed */ if (buf->size) { @@ -2892,7 +2882,7 @@ spoe_create_context(struct stream *s, struct filter *filter) ctx->events = conf->agent->events; ctx->groups = &conf->agent->groups; ctx->buffer = BUF_NULL; - LIST_INIT(&ctx->buffer_wait.list); + MT_LIST_INIT(&ctx->buffer_wait.list); ctx->buffer_wait.target = ctx; ctx->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_context; LIST_INIT(&ctx->list); diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index bfe44017ef..6a7a13f927 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -601,13 +601,11 @@ static inline struct buffer *fcgi_get_buf(struct fcgi_conn *fconn, struct buffer { struct buffer *buf = NULL; - if (likely(!LIST_ADDED(&fconn->buf_wait.list)) && + if (likely(!MT_LIST_ADDED(&fconn->buf_wait.list)) && unlikely((buf = b_alloc_margin(bptr, 0)) == NULL)) { fconn->buf_wait.target = fconn; fconn->buf_wait.wakeup_cb = fcgi_buf_available; - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_ADDQ(&buffer_wq, &fconn->buf_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); + MT_LIST_ADDQ(&buffer_wq, &fconn->buf_wait.list); } return buf; } @@ -759,7 +757,7 @@ static int fcgi_init(struct connection *conn, struct proxy *px, struct session * br_init(fconn->mbuf, sizeof(fconn->mbuf) / sizeof(fconn->mbuf[0])); fconn->streams_by_id = EB_ROOT; LIST_INIT(&fconn->send_list); - LIST_INIT(&fconn->buf_wait.list); + MT_LIST_INIT(&fconn->buf_wait.list); conn->ctx = fconn; @@ -838,11 +836,8 @@ static void fcgi_release(struct fcgi_conn *fconn) TRACE_DEVEL("freeing fconn", FCGI_EV_FCONN_END, conn); - if (LIST_ADDED(&fconn->buf_wait.list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_DEL(&fconn->buf_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (MT_LIST_ADDED(&fconn->buf_wait.list)) + MT_LIST_DEL(&fconn->buf_wait.list); fcgi_release_buf(fconn, &fconn->dbuf); fcgi_release_mbuf(fconn); diff --git a/src/mux_h1.c b/src/mux_h1.c index 3128501c56..d135b9b8d7 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -415,13 +415,11 @@ static inline struct buffer *h1_get_buf(struct h1c *h1c, struct buffer *bptr) { struct buffer *buf = NULL; - if (likely(LIST_ISEMPTY(&h1c->buf_wait.list)) && + if (likely(!MT_LIST_ADDED(&h1c->buf_wait.list)) && unlikely((buf = b_alloc_margin(bptr, 0)) == NULL)) { h1c->buf_wait.target = h1c; h1c->buf_wait.wakeup_cb = h1_buf_available; - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_ADDQ(&buffer_wq, &h1c->buf_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); + MT_LIST_ADDQ(&buffer_wq, &h1c->buf_wait.list); } return buf; } @@ -659,7 +657,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session h1c->h1s = NULL; h1c->task = NULL; - LIST_INIT(&h1c->buf_wait.list); + MT_LIST_INIT(&h1c->buf_wait.list); h1c->wait_event.tasklet = tasklet_new(); if (!h1c->wait_event.tasklet) goto fail; @@ -747,12 +745,8 @@ static void h1_release(struct h1c *h1c) } - if (!LIST_ISEMPTY(&h1c->buf_wait.list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_DEL(&h1c->buf_wait.list); - LIST_INIT(&h1c->buf_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (MT_LIST_ADDED(&h1c->buf_wait.list)) + MT_LIST_DEL(&h1c->buf_wait.list); h1_release_buf(h1c, &h1c->ibuf); h1_release_buf(h1c, &h1c->obuf); diff --git a/src/mux_h2.c b/src/mux_h2.c index a73ab0538a..478f3c4fcf 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -679,13 +679,11 @@ static inline struct buffer *h2_get_buf(struct h2c *h2c, struct buffer *bptr) { struct buffer *buf = NULL; - if (likely(!LIST_ADDED(&h2c->buf_wait.list)) && + if (likely(!MT_LIST_ADDED(&h2c->buf_wait.list)) && unlikely((buf = b_alloc_margin(bptr, 0)) == NULL)) { h2c->buf_wait.target = h2c; h2c->buf_wait.wakeup_cb = h2_buf_available; - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_ADDQ(&buffer_wq, &h2c->buf_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); + MT_LIST_ADDQ(&buffer_wq, &h2c->buf_wait.list); } return buf; } @@ -856,7 +854,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s LIST_INIT(&h2c->send_list); LIST_INIT(&h2c->fctl_list); LIST_INIT(&h2c->blocked_list); - LIST_INIT(&h2c->buf_wait.list); + MT_LIST_INIT(&h2c->buf_wait.list); conn->ctx = h2c; @@ -940,11 +938,8 @@ static void h2_release(struct h2c *h2c) TRACE_DEVEL("freeing h2c", H2_EV_H2C_END, conn); hpack_dht_free(h2c->ddht); - if (LIST_ADDED(&h2c->buf_wait.list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_DEL(&h2c->buf_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (MT_LIST_ADDED(&h2c->buf_wait.list)) + MT_LIST_DEL(&h2c->buf_wait.list); h2_release_buf(h2c, &h2c->dbuf); h2_release_mbuf(h2c); diff --git a/src/stream.c b/src/stream.c index b36d709659..9798c5f0f8 100644 --- a/src/stream.c +++ b/src/stream.c @@ -391,7 +391,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) /* OK, we're keeping the stream, so let's properly initialize the stream */ LIST_INIT(&s->back_refs); - LIST_INIT(&s->buffer_wait.list); + MT_LIST_INIT(&s->buffer_wait.list); s->buffer_wait.target = s; s->buffer_wait.wakeup_cb = stream_buf_available; @@ -595,12 +595,9 @@ static void stream_free(struct stream *s) put_pipe(s->res.pipe); /* We may still be present in the buffer wait queue */ - if (!LIST_ISEMPTY(&s->buffer_wait.list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_DEL(&s->buffer_wait.list); - LIST_INIT(&s->buffer_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (MT_LIST_ADDED(&s->buffer_wait.list)) + MT_LIST_DEL(&s->buffer_wait.list); + if (s->req.buf.size || s->res.buf.size) { b_free(&s->req.buf); b_free(&s->res.buf); @@ -727,19 +724,13 @@ static void stream_free(struct stream *s) */ static int stream_alloc_work_buffer(struct stream *s) { - if (!LIST_ISEMPTY(&s->buffer_wait.list)) { - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_DEL(&s->buffer_wait.list); - LIST_INIT(&s->buffer_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); - } + if (MT_LIST_ADDED(&s->buffer_wait.list)) + MT_LIST_DEL(&s->buffer_wait.list); if (b_alloc_margin(&s->res.buf, 0)) return 1; - HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); - LIST_ADDQ(&buffer_wq, &s->buffer_wait.list); - HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); + MT_LIST_ADDQ(&buffer_wq, &s->buffer_wait.list); return 0; } @@ -2788,7 +2779,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st chunk_appendf(&trash, " flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p waiting=%d\n", strm->flags, strm->si[1].conn_retries, strm->srv_conn, strm->pend_pos, - !LIST_ISEMPTY(&strm->buffer_wait.list)); + MT_LIST_ADDED(&strm->buffer_wait.list)); chunk_appendf(&trash, " frontend=%s (id=%u mode=%s), listener=%s (id=%u)", -- 2.39.5