From 3372a2ea000dbfa29a1fcb5643e7e259603ec331 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Tue, 17 Dec 2024 13:30:46 +0000 Subject: [PATCH] BUG/MEDIUM: queues: Stricly respect maxconn for outgoing connections The "served" field of struct server is used to know how many connections are currently in use for a server. But served used to be incremented way after the server was picked, so there were race conditions that could lead more than maxconn connections to be allocated for one server. To fix this, increment served way earlier, and make sure at the time that it never goes past maxconn. We now should never have more outgoing connections than set by maxconn. --- src/backend.c | 62 ++++++++++++++++++++++++++++++++++----------------- src/queue.c | 28 +++++++++++++++++++++-- src/stream.c | 9 ++++++-- 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/backend.c b/src/backend.c index c8bb33e81c..55aeacd085 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1029,28 +1029,50 @@ int assign_server_and_queue(struct stream *s) * is set on the server, we must also check that the server's queue is * not full, in which case we have to return FULL. */ - if (srv->maxconn && - (srv->queue.length || srv->served >= srv_dynamic_maxconn(srv))) { - - if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue) - return SRV_STATUS_FULL; - - p = pendconn_add(s); - if (p) { - /* There's a TOCTOU here: it may happen that between the - * moment we decided to queue the request and the moment - * it was done, the last active request on the server - * ended and no new one will be able to dequeue that one. - * Since we already have our server we don't care, this - * will be handled by the caller which will check for - * this condition and will immediately dequeue it if - * possible. + if (srv->maxconn) { + int served; + int got_it = 0; + + /* + * Make sure that there's still a slot on the server. + * Try to increment its served, while making sure + * it is < maxconn. + */ + if (!srv->queue.length && + (served = srv->served) < srv_dynamic_maxconn(srv)) { + /* + * Attempt to increment served, while + * making sure it is always below maxconn */ - return SRV_STATUS_QUEUED; + + do { + got_it = _HA_ATOMIC_CAS(&srv->served, + &served, served + 1); + } while (!got_it && served < srv_dynamic_maxconn(srv) && + __ha_cpu_relax()); } - else - return SRV_STATUS_INTERNAL; - } + if (!got_it) { + if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue) + return SRV_STATUS_FULL; + + p = pendconn_add(s); + if (p) { + /* There's a TOCTOU here: it may happen that between the + * moment we decided to queue the request and the moment + * it was done, the last active request on the server + * ended and no new one will be able to dequeue that one. + * Since we already have our server we don't care, this + * will be handled by the caller which will check for + * this condition and will immediately dequeue it if + * possible. + */ + return SRV_STATUS_QUEUED; + } + else + return SRV_STATUS_INTERNAL; + } + } else + _HA_ATOMIC_INC(&srv->served); /* OK, we can use this server. Let's reserve our place */ sess_change_server(s, srv); diff --git a/src/queue.c b/src/queue.c index c2c702522b..234b01a3b5 100644 --- a/src/queue.c +++ b/src/queue.c @@ -259,6 +259,9 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int struct pendconn *p = NULL; struct pendconn *pp = NULL; u32 pkey, ppkey; + int served; + int maxconn; + int got_it = 0; p = NULL; if (srv->queue.length) @@ -277,7 +280,25 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int if (!p && !pp) return 0; - else if (!pp) + + served = _HA_ATOMIC_LOAD(&srv->served); + maxconn = srv_dynamic_maxconn(srv); + + while (served < maxconn && !got_it) + got_it = _HA_ATOMIC_CAS(&srv->served, &served, served + 1); + + /* No more slot available, give up */ + if (!got_it) { + if (pp) + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + return 0; + } + + /* + * Now we know we'll have something available. + * Let's try to allocate a slot on the server. + */ + if (!pp) goto use_p; /* p != NULL */ else if (!p) goto use_pp; /* pp != NULL */ @@ -394,10 +415,13 @@ int process_srv_queue(struct server *s) HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock); while (s->served < maxconn) { + /* + * pendconn_process_next_strm() will increment + * the served field, only if it is < maxconn. + */ stop = !pendconn_process_next_strm(s, p, px_ok); if (stop) break; - _HA_ATOMIC_INC(&s->served); done++; if (done >= global.tune.maxpollevents) break; diff --git a/src/stream.c b/src/stream.c index 73c182b686..9ffdf07d9f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2743,11 +2743,17 @@ void sess_change_server(struct stream *strm, struct server *newsrv) * invocation of sess_change_server(). */ + /* + * It is assumed if the stream has a non-NULL srv_conn, then its + * served field has been incremented, so we have to decrement it now. + */ + if (oldsrv) + _HA_ATOMIC_DEC(&oldsrv->served); + if (oldsrv == newsrv) return; if (oldsrv) { - _HA_ATOMIC_DEC(&oldsrv->served); _HA_ATOMIC_DEC(&oldsrv->proxy->served); __ha_barrier_atomic_store(); if (oldsrv->proxy->lbprm.server_drop_conn) @@ -2756,7 +2762,6 @@ void sess_change_server(struct stream *strm, struct server *newsrv) } if (newsrv) { - _HA_ATOMIC_INC(&newsrv->served); _HA_ATOMIC_INC(&newsrv->proxy->served); __ha_barrier_atomic_store(); if (newsrv->proxy->lbprm.server_take_conn) -- 2.39.5