]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: servers/proxies: Switch to using per-tgroup queues.
authorOlivier Houchard <ohouchard@haproxy.com>
Wed, 15 Jan 2025 15:44:05 +0000 (16:44 +0100)
committerOlivier Houchard <cognet@ci0.org>
Tue, 28 Jan 2025 11:49:41 +0000 (12:49 +0100)
For both servers and proxies, use one connection queue per thread-group,
instead of only one. Having only one can lead to severe performance
issues on NUMA machines, it is actually trivial to get the watchdog to
trigger on an AMD machine, having a server with a maxconn of 96, and an
injector that uses 160 concurrent connections.
We now have one queue per thread-group, however when dequeueing, we're
dequeuing MAX_SELF_USE_QUEUE (currently 9) pendconns from our own queue,
before dequeueing one from another thread group, if available, to make
sure everybody is still running.

include/haproxy/defaults.h
include/haproxy/proxy-t.h
include/haproxy/server-t.h
src/backend.c
src/proxy.c
src/queue.c
src/server.c

index 88ab35b467d3ca4095866bb181fa1b1e9eb276c3..f70bf430961405517eda55fdd33719796a326da3 100644 (file)
 # define DEBUG_MEMORY_POOLS 1
 #endif
 
+#ifndef MAX_SELF_USE_QUEUE
+#define MAX_SELF_USE_QUEUE 9
+#endif
+
 #endif /* _HAPROXY_DEFAULTS_H */
index d6f9cef4d9a26dc54bfb390f637b9f6d392126ef..d243d072e874b4202f9418b70b3669d5f7628eb7 100644 (file)
@@ -362,7 +362,6 @@ struct proxy {
        __decl_thread(HA_RWLOCK_T lock);        /* may be taken under the server's lock */
 
        char *id, *desc;                        /* proxy id (name) and description */
-       struct queue queue;                     /* queued requests (pendconns) */
        struct proxy_per_tgroup *per_tgrp;      /* array of per-tgroup stuff such as queues */
        unsigned int queueslength;              /* Sum of the length of each queue */
        int totpend;                            /* total number of pending connections on this instance (for stats) */
index ee266c2077c54db3e75517da20135085d3f9f61b..d75171d937cd8a5eae6a358e3a844ca9fa8efc11 100644 (file)
@@ -369,9 +369,7 @@ struct server {
        unsigned int max_used_conns;            /* Max number of used connections (the counter is reset at each connection purges */
        unsigned int est_need_conns;            /* Estimate on the number of needed connections (max of curr and previous max_used) */
 
-       struct queue queue;                     /* pending connections */
        struct mt_list sess_conns;              /* list of private conns managed by a session on this server */
-       unsigned int dequeuing;                 /* non-zero = dequeuing in progress (atomic) */
 
        /* Element below are usd by LB algorithms and must be doable in
         * parallel to other threads reusing connections above.
index f0dba4d18247a20f684537c6d543ef0f094eebe2..b1703e295140a3aea2107c1c8f91be6571652963 100644 (file)
@@ -1029,6 +1029,7 @@ int assign_server_and_queue(struct stream *s)
                 * not full, in which case we have to return FULL.
                 */
                if (srv->maxconn) {
+                       struct queue *queue = &srv->per_tgrp[tgid - 1].queue;
                        int served;
                        int got_it = 0;
 
@@ -1037,7 +1038,7 @@ int assign_server_and_queue(struct stream *s)
                         * Try to increment its served, while making sure
                         * it is < maxconn.
                         */
-                       if (!srv->queue.length &&
+                       if (!queue->length &&
                            (served = srv->served) < srv_dynamic_maxconn(srv)) {
                                /*
                                 * Attempt to increment served, while
index a1a4ee7343e67b45b86014b35bc293fb20fc3724..a1143fc686315fe2739576d68fd1439a6c48e3a4 100644 (file)
@@ -1408,7 +1408,6 @@ void init_new_proxy(struct proxy *p)
 {
        memset(p, 0, sizeof(struct proxy));
        p->obj_type = OBJ_TYPE_PROXY;
-       queue_init(&p->queue, p, NULL);
        LIST_INIT(&p->acl);
        LIST_INIT(&p->http_req_rules);
        LIST_INIT(&p->http_res_rules);
index f5ea31de4998d3341b09686d7b46737935e213c6..0bfdd09f026e5de6794f6233d1f01ffaac8b6db3 100644 (file)
@@ -254,7 +254,7 @@ static struct pendconn *pendconn_first(struct eb_root *pendconns)
  * When a pending connection is dequeued, this function returns 1 if a pendconn
  * is dequeued, otherwise 0.
  */
-static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok)
+static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok, int tgrp)
 {
        struct pendconn *p = NULL;
        struct pendconn *pp = NULL;
@@ -264,18 +264,18 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
        int got_it = 0;
 
        p = NULL;
-       if (srv->queue.length)
-               p = pendconn_first(&srv->queue.head);
+       if (srv->per_tgrp[tgrp - 1].queue.length)
+               p = pendconn_first(&srv->per_tgrp[tgrp - 1].queue.head);
 
        pp = NULL;
-       if (px_ok && px->queue.length) {
+       if (px_ok && px->per_tgrp[tgrp - 1].queue.length) {
                /* the lock only remains held as long as the pp is
                 * in the proxy's queue.
                 */
-               HA_SPIN_LOCK(QUEUE_LOCK,  &px->queue.lock);
-               pp = pendconn_first(&px->queue.head);
+               HA_SPIN_LOCK(QUEUE_LOCK,  &px->per_tgrp[tgrp - 1].queue.lock);
+               pp = pendconn_first(&px->per_tgrp[tgrp - 1].queue.head);
                if (!pp)
-                       HA_SPIN_UNLOCK(QUEUE_LOCK,  &px->queue.lock);
+                       HA_SPIN_UNLOCK(QUEUE_LOCK,  &px->per_tgrp[tgrp - 1].queue.lock);
        }
 
        if (!p && !pp)
@@ -290,7 +290,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
        /* No more slot available, give up */
        if (!got_it) {
                if (pp)
-                       HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+                       HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
                return 0;
        }
 
@@ -332,7 +332,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
 
        /* now the element won't go, we can release the proxy */
        __pendconn_unlink_prx(pp);
-       HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+       HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
 
        pp->strm_flags |= SF_ASSIGNED;
        pp->target = srv;
@@ -346,15 +346,15 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
        task_wakeup(pp->strm->task, TASK_WOKEN_RES);
        HA_SPIN_UNLOCK(QUEUE_LOCK, &pp->del_lock);
 
-       _HA_ATOMIC_DEC(&px->queue.length);
-       _HA_ATOMIC_INC(&px->queue.idx);
+       _HA_ATOMIC_DEC(&px->per_tgrp[tgrp - 1].queue.length);
+       _HA_ATOMIC_INC(&px->per_tgrp[tgrp - 1].queue.idx);
        _HA_ATOMIC_DEC(&px->queueslength);
        return 1;
 
  use_p:
        /* we don't need the px queue lock anymore, we have the server's lock */
        if (pp)
-               HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+               HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
 
        p->strm_flags |= SF_ASSIGNED;
        p->target = srv;
@@ -368,8 +368,8 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
        task_wakeup(p->strm->task, TASK_WOKEN_RES);
        __pendconn_unlink_srv(p);
 
-       _HA_ATOMIC_DEC(&srv->queue.length);
-       _HA_ATOMIC_INC(&srv->queue.idx);
+       _HA_ATOMIC_DEC(&srv->per_tgrp[tgrp - 1].queue.length);
+       _HA_ATOMIC_INC(&srv->per_tgrp[tgrp - 1].queue.idx);
        _HA_ATOMIC_DEC(&srv->queueslength);
        return 1;
 }
@@ -381,10 +381,11 @@ int process_srv_queue(struct server *s)
 {
        struct server *ref = s->track ? s->track : s;
        struct proxy  *p = s->proxy;
+       uint64_t non_empty_tgids = all_tgroups_mask;
        int maxconn;
-       int stop = 0;
        int done = 0;
        int px_ok;
+       int cur_tgrp;
 
        /* if a server is not usable or backup and must not be used
         * to dequeue backend requests.
@@ -409,27 +410,82 @@ int process_srv_queue(struct server *s)
         * and would occasionally leave entries in the queue that are never
         * dequeued. Nobody else uses the dequeuing flag so when seeing it
         * non-null, we're certain that another thread is waiting on it.
+        *
+        * We'll dequeue MAX_SELF_USE_QUEUE items from the queue corresponding
+        * to our thread group, then we'll get one from a different one, to
+        * be sure those actually get processsed too.
         */
-       while (!stop && (done < global.tune.maxpollevents || !s->served) &&
+       while (non_empty_tgids != 0
+              && (done < global.tune.maxpollevents || !s->served) &&
               s->served < (maxconn = srv_dynamic_maxconn(s))) {
-               if (HA_ATOMIC_XCHG(&s->dequeuing, 1))
-                       break;
+              int self_served;
+              int to_dequeue;
+
+              /*
+               * self_served contains the number of times we dequeued items
+               * from our own thread-group queue.
+               */
+              self_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].self_served) % (MAX_SELF_USE_QUEUE + 1);
+              if ((self_served == MAX_SELF_USE_QUEUE && non_empty_tgids != (1UL << (tgid - 1))) ||
+                   !(non_empty_tgids & (1UL << (tgid - 1)))) {
+                       int old_served, new_served;
 
-               HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock);
-               while (s->served < maxconn) {
+                       /*
+                        * We want to dequeue from another queue. The last
+                        * one we used is stored in last_other_tgrp_served.
+                        */
+                       old_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].last_other_tgrp_served);
+                       do {
+                               new_served = old_served + 1;
+
+                               /*
+                                * Find the next tgrp to dequeue from.
+                                * If we're here then we know there is
+                                * at least one tgrp that is not the current
+                                * tgrp that we can dequeue from, so that
+                                * loop will end eventually.
+                                */
+                               while (new_served == tgid ||
+                                      new_served == global.nbtgroups + 1 ||
+                                      !(non_empty_tgids & (1UL << (new_served - 1)))) {
+                                       if (new_served == global.nbtgroups + 1)
+                                               new_served = 1;
+                                       else
+                                               new_served++;
+                               }
+                       } while (!_HA_ATOMIC_CAS(&s->per_tgrp[tgid - 1].last_other_tgrp_served, &old_served, new_served));
+                       cur_tgrp = new_served;
+                       to_dequeue = 1;
+               } else {
+                       cur_tgrp = tgid;
+                       if (self_served == MAX_SELF_USE_QUEUE)
+                               self_served = 0;
+                       to_dequeue = MAX_SELF_USE_QUEUE - self_served;
+               }
+               if (HA_ATOMIC_XCHG(&s->per_tgrp[cur_tgrp - 1].dequeuing, 1)) {
+                       non_empty_tgids &= ~(1UL << (cur_tgrp - 1));
+                       continue;
+               }
+
+               HA_SPIN_LOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock);
+               while (to_dequeue > 0 && s->served < maxconn) {
                        /*
                         * pendconn_process_next_strm() will increment
                         * the served field, only if it is < maxconn.
                         */
-                       stop = !pendconn_process_next_strm(s, p, px_ok);
-                       if (stop)
+                       if (!pendconn_process_next_strm(s, p, px_ok, cur_tgrp)) {
+                               non_empty_tgids &= ~(1UL << (cur_tgrp - 1));
                                break;
+                       }
+                       to_dequeue--;
+                       if (cur_tgrp == tgid)
+                               _HA_ATOMIC_INC(&s->per_tgrp[tgid - 1].self_served);
                        done++;
                        if (done >= global.tune.maxpollevents)
                                break;
                }
-               HA_ATOMIC_STORE(&s->dequeuing, 0);
-               HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock);
+               HA_ATOMIC_STORE(&s->per_tgrp[cur_tgrp - 1].dequeuing, 0);
+               HA_SPIN_UNLOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock);
        }
 
        if (done) {
@@ -440,6 +496,8 @@ int process_srv_queue(struct server *s)
                        p->lbprm.server_take_conn(s);
        }
        if (s->served == 0 && p->served == 0 && !HA_ATOMIC_LOAD(&p->ready_srv)) {
+               int i;
+
                /*
                 * If there is no task running on the server, and the proxy,
                 * let it known that we are ready, there is a small race
@@ -454,10 +512,13 @@ int process_srv_queue(struct server *s)
                 * checked, but before we set ready_srv so it would not see it,
                 * just in case try to run one more stream.
                 */
-               if (pendconn_process_next_strm(s, p, px_ok)) {
-                       _HA_ATOMIC_SUB(&p->totpend, 1);
-                       _HA_ATOMIC_ADD(&p->served, 1);
-                       done++;
+               for (i = 0; i < global.nbtgroups; i++) {
+                       if (pendconn_process_next_strm(s, p, px_ok, i + 1)) {
+                               _HA_ATOMIC_SUB(&p->totpend, 1);
+                               _HA_ATOMIC_ADD(&p->served, 1);
+                               done++;
+                               break;
+                       }
                }
        }
        return done;
@@ -510,12 +571,12 @@ struct pendconn *pendconn_add(struct stream *strm)
                srv = NULL;
 
        if (srv) {
-               q = &srv->queue;
+               q = &srv->per_tgrp[tgid - 1].queue;
                max_ptr = &srv->counters.nbpend_max;
                queueslength = &srv->queueslength;
        }
        else {
-               q = &px->queue;
+               q = &px->per_tgrp[tgid - 1].queue;
                max_ptr = &px->be_counters.nbpend_max;
                queueslength = &px->queueslength;
        }
@@ -550,6 +611,7 @@ int pendconn_redistribute(struct server *s)
        struct proxy *px = s->proxy;
        int px_xferred = 0;
        int xferred = 0;
+       int i;
 
        /* The REDISP option was specified. We will ignore cookie and force to
         * balance or use the dispatcher.
@@ -558,26 +620,33 @@ int pendconn_redistribute(struct server *s)
            (s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP)
                goto skip_srv_queue;
 
-       HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock);
-       for (node = eb32_first(&s->queue.head); node; node = nodeb) {
-               nodeb = eb32_next(node);
+       for (i = 0; i < global.nbtgroups; i++) {
+               struct queue *queue = &s->per_tgrp[i].queue;
+               int local_xferred = 0;
 
-               p = eb32_entry(node, struct pendconn, node);
-               if (p->strm_flags & SF_FORCE_PRST)
-                       continue;
+               HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock);
+               for (node = eb32_first(&queue->head); node; node = nodeb) {
+                       nodeb = eb32_next(node);
 
-               /* it's left to the dispatcher to choose a server */
-               __pendconn_unlink_srv(p);
-               if (!(s->proxy->options & PR_O_REDISP))
-                       p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED);
+                       p = eb32_entry(node, struct pendconn, node);
+                       if (p->strm_flags & SF_FORCE_PRST)
+                               continue;
 
-               task_wakeup(p->strm->task, TASK_WOKEN_RES);
-               xferred++;
+                       /* it's left to the dispatcher to choose a server */
+                       __pendconn_unlink_srv(p);
+                       if (!(s->proxy->options & PR_O_REDISP))
+                               p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED);
+
+                       task_wakeup(p->strm->task, TASK_WOKEN_RES);
+                       local_xferred++;
+               }
+               HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock);
+               xferred += local_xferred;
+               if (local_xferred)
+                       _HA_ATOMIC_SUB(&queue->length, local_xferred);
        }
-       HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock);
 
        if (xferred) {
-               _HA_ATOMIC_SUB(&s->queue.length, xferred);
                _HA_ATOMIC_SUB(&s->queueslength, xferred);
                _HA_ATOMIC_SUB(&s->proxy->totpend, xferred);
        }
@@ -586,24 +655,31 @@ int pendconn_redistribute(struct server *s)
        if (px->lbprm.tot_wact || px->lbprm.tot_wbck)
                goto done;
 
-       HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock);
-       for (node = eb32_first(&px->queue.head); node; node = nodeb) {
-               nodeb = eb32_next(node);
-               p = eb32_entry(node, struct pendconn, node);
+       for (i = 0; i < global.nbtgroups; i++) {
+               struct queue *queue = &px->per_tgrp[i].queue;
+               int local_xferred = 0;
 
-               /* force-persist streams may occasionally appear in the
-                * proxy's queue, and we certainly don't want them here!
-                */
-               p->strm_flags &= ~SF_FORCE_PRST;
-               __pendconn_unlink_prx(p);
+               HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock);
+               for (node = eb32_first(&queue->head); node; node = nodeb) {
+                       nodeb = eb32_next(node);
+                       p = eb32_entry(node, struct pendconn, node);
+
+                       /* force-persist streams may occasionally appear in the
+                        * proxy's queue, and we certainly don't want them here!
+                        */
+                       p->strm_flags &= ~SF_FORCE_PRST;
+                       __pendconn_unlink_prx(p);
 
-               task_wakeup(p->strm->task, TASK_WOKEN_RES);
-               px_xferred++;
+                       task_wakeup(p->strm->task, TASK_WOKEN_RES);
+                       local_xferred++;
+               }
+               HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock);
+               if (local_xferred)
+                       _HA_ATOMIC_SUB(&queue->length, local_xferred);
+               px_xferred += local_xferred;
        }
-       HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
 
        if (px_xferred) {
-               _HA_ATOMIC_SUB(&px->queue.length, px_xferred);
                _HA_ATOMIC_SUB(&px->queueslength, px_xferred);
                _HA_ATOMIC_SUB(&px->totpend, px_xferred);
        }
index 6d15de3714d6e5425b21edf280effa337b22a564..82f237dc8602672f3f987e7492b6c4f54ce5e95e 100644 (file)
@@ -2961,7 +2961,6 @@ struct server *new_server(struct proxy *proxy)
 
        srv->obj_type = OBJ_TYPE_SERVER;
        srv->proxy = proxy;
-       queue_init(&srv->queue, proxy, srv);
        MT_LIST_APPEND(&servers_list, &srv->global_list);
        LIST_INIT(&srv->srv_rec_item);
        LIST_INIT(&srv->ip_rec_item);