]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: checks: search more aggressively for another thread on overload
authorWilly Tarreau <w@1wt.eu>
Wed, 23 Aug 2023 09:39:00 +0000 (11:39 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 1 Sep 2023 06:26:06 +0000 (08:26 +0200)
When the current check is overloaded (more running checks than the
configured limit), we'll try more aggressively to find another thread.
Instead of just opportunistically looking for one half as loaded, now if
the current thread has more than 1% more active checks than another one,
or has more than a configured limit of concurrent running checks, it will
search for a more suitable thread among 3 other random ones in order to
migrate the check there. The number of migrations remains very low (~1%)
and the checks load very fair across all threads (~1% as well). The new
parameter is called tune.max-checks-per-thread.

doc/configuration.txt
include/haproxy/global-t.h
src/check.c

index 28ca4c6bceba03121b2760703a0b5091306e1487..78216b7e2ddfaa40b57985bf782d6fb85a323aab 100644 (file)
@@ -1192,6 +1192,7 @@ The following keywords are supported in the "global" section :
    - tune.lua.service-timeout
    - tune.lua.session-timeout
    - tune.lua.task-timeout
+   - tune.max-checks-per-thread
    - tune.maxaccept
    - tune.maxpollevents
    - tune.maxrewrite
@@ -3280,6 +3281,17 @@ tune.lua.task-timeout <timeout>
   remain alive during of the lifetime of HAProxy. For example, a task used to
   check servers.
 
+tune.max-checks-per-thread <number>
+  Sets the number of active checks per thread above which a thread will
+  actively try to search a less loaded thread to run the health check. The
+  default value is zero, meaning no such limit is set. It may be needed in
+  certain environments running an extremely large number of expensive checks
+  with many threads when the load appears unequal and may make health checks
+  to randomly time out on startup, typically when using OpenSSL 3.0 which is
+  about 20 times more CPU-intensive on health checks than older ones. This will
+  have for result to try to level the health check work across all threads. The
+  vast majority of configurations do not need to touch this parameter.
+
 tune.maxaccept <number>
   Sets the maximum number of consecutive connections a process may accept in a
   row before switching to other work. In single process mode, higher numbers
index 3523f631fb6386b9d4c5bfd1bbd12fd963917d87..3f025c59cbe62c26624c1c8d6614ba94a2e4584d 100644 (file)
@@ -169,6 +169,7 @@ struct global {
                unsigned short idle_timer; /* how long before an empty buffer is considered idle (ms) */
                int nb_stk_ctr;       /* number of stick counters, defaults to MAX_SESS_STKCTR */
                int default_shards; /* default shards for listeners, or -1 (by-thread) or -2 (by-group) */
+               uint max_checks_per_thread; /* if >0, no more than this concurrent checks per thread */
 #ifdef USE_QUIC
                unsigned int quic_backend_max_idle_timeout;
                unsigned int quic_frontend_max_idle_timeout;
index d19ec26d3e96fdfe94141d45230317e9fca4a8e0..290d432c98737a11847e8f702787b30de31e5ec0 100644 (file)
@@ -1138,6 +1138,24 @@ static inline int check_thread_cmp_load(int thr1, int thr2)
        return 0;
 }
 
+/* returns <0, 0, >0 if check thread 1's active checks count is respectively
+ * higher than, equal, or lower than thread 2's. This is made to decide on
+ * forced migrations upon overload, so only a very little margin is applied
+ * here (~1%). For ease of remembering the direction, consider this returns
+ * active1 - active2.
+ */
+static inline int check_thread_cmp_active(int thr1, int thr2)
+{
+       uint t1_act  = _HA_ATOMIC_LOAD(&ha_thread_ctx[thr1].active_checks);
+       uint t2_act  = _HA_ATOMIC_LOAD(&ha_thread_ctx[thr2].active_checks);
+
+       if (t1_act * 128 >= t2_act * 129)
+               return 1;
+       if (t2_act * 128 >= t1_act * 129)
+               return -1;
+       return 0;
+}
+
 
 /* manages a server health-check that uses a connection. Returns
  * the time the task accepts to wait, or TIME_ETERNITY for infinity.
@@ -1168,35 +1186,50 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
                 * the task again because we're setting CHK_ST_READY indicating
                 * a migration.
                 */
+               uint run_checks = _HA_ATOMIC_LOAD(&th_ctx->running_checks);
                uint my_load = HA_ATOMIC_LOAD(&th_ctx->rq_total);
+               uint attempts = MIN(global.nbthread, 3);
 
                if (check->state & CHK_ST_READY) {
                        /* check was migrated, active already counted */
                        activity[tid].check_adopted++;
                }
-               else if (my_load >= 3 && th_ctx->active_checks >= 3) {
-                       uint new_tid  = statistical_prng_range(global.nbthread);
-
-                       if (check_thread_cmp_load(tid, new_tid) > 0) {
-                               /* Found one. Let's migrate the task over there. We have to
-                                * remove it from the WQ first and kill its expire time
-                                * otherwise the scheduler will reinsert it and trigger a
-                                * BUG_ON() as we're not allowed to call task_queue() for a
-                                * foreign thread. The recipient will restore the expiration.
-                                */
-                               check->state |= CHK_ST_READY;
-                               HA_ATOMIC_INC(&ha_thread_ctx[new_tid].active_checks);
-                               task_unlink_wq(t);
-                               t->expire = TICK_ETERNITY;
-                               task_set_thread(t, new_tid);
-                               task_wakeup(t, TASK_WOKEN_MSG);
-                               TRACE_LEAVE(CHK_EV_TASK_WAKE, check);
-                               return t;
-                       }
-                       /* check just woke up, count it as active */
-                       _HA_ATOMIC_INC(&th_ctx->active_checks);
-               }
                else {
+                       /* first wakeup, let's check if another thread is less loaded
+                        * than this one in order to smooth the load. If the current
+                        * thread is not yet overloaded, we attempt an opportunistic
+                        * migration to another thread that is not full and that is
+                        * significantly less loaded. And if the current thread is
+                        * already overloaded, we attempt a forced migration to a
+                        * thread with less active checks. We try at most 3 random
+                        * other thread.
+                        */
+                       while (attempts-- > 0 &&
+                              my_load >= 3 && _HA_ATOMIC_LOAD(&th_ctx->active_checks) >= 3) {
+                               uint new_tid  = statistical_prng_range(global.nbthread);
+
+                               if (new_tid == tid)
+                                       continue;
+
+                               if (check_thread_cmp_active(tid, new_tid) > 0 &&
+                                   (run_checks >= global.tune.max_checks_per_thread ||
+                                    check_thread_cmp_load(tid, new_tid) > 0)) {
+                                       /* Found one. Let's migrate the task over there. We have to
+                                        * remove it from the WQ first and kill its expire time
+                                        * otherwise the scheduler will reinsert it and trigger a
+                                        * BUG_ON() as we're not allowed to call task_queue() for a
+                                        * foreign thread. The recipient will restore the expiration.
+                                        */
+                                       check->state |= CHK_ST_READY;
+                                       HA_ATOMIC_INC(&ha_thread_ctx[new_tid].active_checks);
+                                       task_unlink_wq(t);
+                                       t->expire = TICK_ETERNITY;
+                                       task_set_thread(t, new_tid);
+                                       task_wakeup(t, TASK_WOKEN_MSG);
+                                       TRACE_LEAVE(CHK_EV_TASK_WAKE, check);
+                                       return t;
+                               }
+                       }
                        /* check just woke up, count it as active */
                        _HA_ATOMIC_INC(&th_ctx->active_checks);
                }
@@ -2501,6 +2534,26 @@ static int srv_parse_check_port(char **args, int *cur_arg, struct proxy *curpx,
        goto out;
 }
 
+/* config parser for global "tune.max-checks-per-thread" */
+static int check_parse_global_max_checks(char **args, int section_type, struct proxy *curpx,
+                                       const struct proxy *defpx, const char *file, int line,
+                                       char **err)
+{
+       if (too_many_args(1, args, err, NULL))
+               return -1;
+       global.tune.max_checks_per_thread = atoi(args[1]);
+       return 0;
+}
+
+/* register "global" section keywords */
+static struct cfg_kw_list chk_cfg_kws = {ILH, {
+       { CFG_GLOBAL, "tune.max-checks-per-thread", check_parse_global_max_checks },
+       { 0, NULL, NULL }
+}};
+
+INITCALL1(STG_REGISTER, cfg_register_keywords, &chk_cfg_kws);
+
+/* register "server" line keywords */
 static struct srv_kw_list srv_kws = { "CHK", { }, {
        { "addr",                srv_parse_addr,                1,  1,  1 }, /* IP address to send health to or to probe from agent-check */
        { "agent-addr",          srv_parse_agent_addr,          1,  1,  1 }, /* Enable an auxiliary agent check */