]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
OPTIM: sink: consider threads' current load when rebalancing applets
authorAurelien DARRAGON <adarragon@haproxy.com>
Mon, 22 Jul 2024 15:52:18 +0000 (17:52 +0200)
committerAurelien DARRAGON <adarragon@haproxy.com>
Wed, 24 Jul 2024 15:59:18 +0000 (17:59 +0200)
In c454296f0 ("OPTIM: sink: balance applets accross threads"), we already
made sure to balance applets accross threads by picking a random thread
to spawn the new applet.

Also, thanks to the previous commit, we also have the ability to destroy
the applet when a certain amount of messages were processed to help
distribute the load during runtime.

Let's improve that by trying up to 3 different threads in the hope to pick
a non-overloaded one in the best scenario, and the least over loaded one
in the worst case. This should help to better distribute the load over
multiple threads when high loads are expected.

Logic was greatly inspired from thread migration logic used by server
health checks, but it was simpliflied for sink's use case.

src/sink.c

index 86356ad21f80cf444ac9ac6729b35e101253e798..1b09c04aeb7fdb88da78891511bdc755e9491da1 100644 (file)
@@ -548,11 +548,43 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
 {
        struct appctx *appctx;
        struct applet *applet = &sink_forward_applet;
+       uint best_tid, best_load;
+       int attempts, first;
 
        if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
                applet = &sink_forward_oc_applet;
 
-       appctx = appctx_new_on(applet, NULL, statistical_prng_range(global.nbthread));
+       BUG_ON(!global.nbthread);
+       attempts = MIN(global.nbthread, 3);
+       first = 1;
+
+       /* to shut gcc warning */
+       best_tid = best_load = 0;
+
+       /* to help spread the load over multiple threads, try to find a
+        * non-overloaded thread by picking a random thread and checking
+        * its load. If we fail to find a non-overloaded thread after 3
+        * attempts, let's pick the least overloaded one.
+        */
+       while (attempts-- > 0) {
+               uint cur_tid;
+               uint cur_load;
+
+               cur_tid = statistical_prng_range(global.nbthread);
+               cur_load = HA_ATOMIC_LOAD(&ha_thread_ctx[cur_tid].rq_total);
+
+               if (first || cur_load < best_load) {
+                       best_tid = cur_tid;
+                       best_load = cur_load;
+               }
+               first = 0;
+
+               /* if we already found a non-overloaded thread, stop now */
+               if (HA_ATOMIC_LOAD(&ha_thread_ctx[best_tid].rq_total) < 3)
+                       break;
+       }
+
+       appctx = appctx_new_on(applet, NULL, best_tid);
        if (!appctx)
                goto out_close;
        appctx->svcctx = (void *)sft;