From: Aurelien DARRAGON Date: Mon, 22 Jul 2024 15:52:18 +0000 (+0200) Subject: OPTIM: sink: consider threads' current load when rebalancing applets X-Git-Tag: v3.1-dev4~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2513bd257f2db7fc48fc50f7227a8fa13aeb2680;p=thirdparty%2Fhaproxy.git OPTIM: sink: consider threads' current load when rebalancing applets In c454296f0 ("OPTIM: sink: balance applets accross threads"), we already made sure to balance applets accross threads by picking a random thread to spawn the new applet. Also, thanks to the previous commit, we also have the ability to destroy the applet when a certain amount of messages were processed to help distribute the load during runtime. Let's improve that by trying up to 3 different threads in the hope to pick a non-overloaded one in the best scenario, and the least over loaded one in the worst case. This should help to better distribute the load over multiple threads when high loads are expected. Logic was greatly inspired from thread migration logic used by server health checks, but it was simpliflied for sink's use case. --- diff --git a/src/sink.c b/src/sink.c index 86356ad21f..1b09c04aeb 100644 --- a/src/sink.c +++ b/src/sink.c @@ -548,11 +548,43 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink { struct appctx *appctx; struct applet *applet = &sink_forward_applet; + uint best_tid, best_load; + int attempts, first; if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) applet = &sink_forward_oc_applet; - appctx = appctx_new_on(applet, NULL, statistical_prng_range(global.nbthread)); + BUG_ON(!global.nbthread); + attempts = MIN(global.nbthread, 3); + first = 1; + + /* to shut gcc warning */ + best_tid = best_load = 0; + + /* to help spread the load over multiple threads, try to find a + * non-overloaded thread by picking a random thread and checking + * its load. If we fail to find a non-overloaded thread after 3 + * attempts, let's pick the least overloaded one. + */ + while (attempts-- > 0) { + uint cur_tid; + uint cur_load; + + cur_tid = statistical_prng_range(global.nbthread); + cur_load = HA_ATOMIC_LOAD(&ha_thread_ctx[cur_tid].rq_total); + + if (first || cur_load < best_load) { + best_tid = cur_tid; + best_load = cur_load; + } + first = 0; + + /* if we already found a non-overloaded thread, stop now */ + if (HA_ATOMIC_LOAD(&ha_thread_ctx[best_tid].rq_total) < 3) + break; + } + + appctx = appctx_new_on(applet, NULL, best_tid); if (!appctx) goto out_close; appctx->svcctx = (void *)sft;