]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: listener: switch bind_thread from global to group-local
authorWilly Tarreau <w@1wt.eu>
Tue, 28 Jun 2022 06:30:43 +0000 (08:30 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 15 Jul 2022 18:16:30 +0000 (20:16 +0200)
It requires to both adapt the parser and change the algorithm to
redispatch incoming traffic so that local threads IDs may always
be used.

The internal structures now only reference thread group IDs and
group-local masks which are compatible with those now used by the
FD layer and the rest of the code.

src/cfgparse.c
src/listener.c

index 8fb866b7829ac1aefe5e4c04addf4c51c28a2b32..2bd498ac763f821c8efd7085c06854045dc3769f 100644 (file)
@@ -2645,17 +2645,19 @@ int check_config_validity()
                                           curproxy->id, err, bind_conf->arg, bind_conf->file, bind_conf->line);
                                free(err);
                                cfgerr++;
-                       } else if (!((mask = bind_conf->bind_thread) & all_threads_mask)) {
+                       } else if (!((mask = bind_conf->bind_thread) & ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled)) {
                                unsigned long new_mask = 0;
+                               ulong thr_mask = ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled;
 
                                while (mask) {
-                                       new_mask |= mask & all_threads_mask;
-                                       mask >>= global.nbthread;
+                                       new_mask |= mask & thr_mask;
+                                       mask >>= ha_tgroup_info[bind_conf->bind_tgroup-1].count;
                                }
 
                                bind_conf->bind_thread = new_mask;
-                               ha_warning("Proxy '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range defined by the global 'nbthread' directive. The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
-                                          curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line, new_mask);
+                               ha_warning("Proxy '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range supported by thread group %d (%d). The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
+                                          curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line,
+                                          bind_conf->bind_tgroup, ha_tgroup_info[bind_conf->bind_tgroup-1].count, new_mask);
                        }
 
                        /* apply thread masks and groups to all receivers */
@@ -4102,17 +4104,19 @@ out_uri_auth_compat:
                                                         curpeers->peers_fe->id, err, bind_conf->arg, bind_conf->file, bind_conf->line);
                                                free(err);
                                                cfgerr++;
-                                       } else if (!((mask = bind_conf->bind_thread) & all_threads_mask)) {
+                                       } else if (!((mask = bind_conf->bind_thread) & ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled)) {
                                                unsigned long new_mask = 0;
+                                               ulong thr_mask = ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled;
 
                                                while (mask) {
-                                                       new_mask |= mask & all_threads_mask;
-                                                       mask >>= global.nbthread;
+                                                       new_mask |= mask & thr_mask;
+                                                       mask >>= ha_tgroup_info[bind_conf->bind_tgroup-1].count;
                                                }
 
                                                bind_conf->bind_thread = new_mask;
-                                               ha_warning("Peers section '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range defined by the global 'nbthread' directive. The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
-                                                          curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line, new_mask);
+                                               ha_warning("Peers section '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range supported by thread group %d (%d). The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
+                                                          curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line,
+                                                          bind_conf->bind_tgroup, ha_tgroup_info[bind_conf->bind_tgroup-1].count, new_mask);
                                        }
 
                                        /* apply thread masks and groups to all receivers */
index 5b91faf3eac90359cd22dfd5e36a41aa80a0f077..6f8d4ad6206eaf27a40b3dc9ae7c82b447a565ea 100644 (file)
@@ -992,10 +992,11 @@ void listener_accept(struct listener *l)
                if (l->rx.flags & RX_F_LOCAL_ACCEPT)
                        goto local_accept;
 
-               mask = l->rx.bind_thread & all_threads_mask;
+               mask = l->rx.bind_thread & tg->threads_enabled;
                if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) {
                        struct accept_queue_ring *ring;
                        unsigned int t, t0, t1, t2;
+                       int base = tg->base;
 
                        /* The principle is that we have two running indexes,
                         * each visiting in turn all threads bound to this
@@ -1042,11 +1043,11 @@ void listener_accept(struct listener *l)
                                }
 
                                /* now we have two distinct thread IDs belonging to the mask */
-                               q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
+                               q1 = accept_queue_rings[base + t1].tail - accept_queue_rings[base + t1].head + ACCEPT_QUEUE_SIZE;
                                if (q1 >= ACCEPT_QUEUE_SIZE)
                                        q1 -= ACCEPT_QUEUE_SIZE;
 
-                               q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
+                               q2 = accept_queue_rings[base + t2].tail - accept_queue_rings[base + t2].head + ACCEPT_QUEUE_SIZE;
                                if (q2 >= ACCEPT_QUEUE_SIZE)
                                        q2 -= ACCEPT_QUEUE_SIZE;
 
@@ -1062,8 +1063,8 @@ void listener_accept(struct listener *l)
                                 *             than t2.
                                 */
 
-                               q1 += l->thr_conn[t1];
-                               q2 += l->thr_conn[t2];
+                               q1 += l->thr_conn[base + t1];
+                               q2 += l->thr_conn[base + t2];
 
                                if (q1 - q2 < 0) {
                                        t = t1;
@@ -1092,16 +1093,16 @@ void listener_accept(struct listener *l)
                         * performing model, likely due to better cache locality
                         * when processing this loop.
                         */
-                       ring = &accept_queue_rings[t];
+                       ring = &accept_queue_rings[base + t];
                        if (accept_queue_push_mp(ring, cli_conn)) {
-                               _HA_ATOMIC_INC(&activity[t].accq_pushed);
+                               _HA_ATOMIC_INC(&activity[base + t].accq_pushed);
                                tasklet_wakeup(ring->tasklet);
                                continue;
                        }
                        /* If the ring is full we do a synchronous accept on
                         * the local thread here.
                         */
-                       _HA_ATOMIC_INC(&activity[t].accq_full);
+                       _HA_ATOMIC_INC(&activity[base + t].accq_full);
                }
 #endif // USE_THREAD