From: Willy Tarreau Date: Mon, 27 Jun 2022 14:02:24 +0000 (+0200) Subject: MEDIUM: tinfo: add a dynamic thread-group context X-Git-Tag: v2.7-dev2~121 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=03f9b35114bec1efaa9cbca21c62b0a588ca3036;p=thirdparty%2Fhaproxy.git MEDIUM: tinfo: add a dynamic thread-group context The thread group info is not sufficient to represent a thread group's current state as it's read-only. We also need something comparable to the thread context to represent the aggregate state of the threads in that group. This patch introduces ha_tgroup_ctx[] and tg_ctx for this. It's indexed on the group id and must be cache-line aligned. The thread masks that were global and that do not need to remain global were moved there (want_rdv, harmless, idle). Given that all the masks placed there now become group-specific, the associated thread mask (tid_bit) now switches to the thread's local bit (ltid_bit). Both are the same for nbtgroups 1 but will differ for other values. There's also a tg_ctx pointer in the thread so that it can be reached from other threads. --- diff --git a/include/haproxy/thread.h b/include/haproxy/thread.h index d9c280da9c..5f7f515178 100644 --- a/include/haproxy/thread.h +++ b/include/haproxy/thread.h @@ -57,9 +57,6 @@ extern int thread_cpus_enabled_at_boot; */ enum { all_threads_mask = 1UL }; enum { all_tgroups_mask = 1UL }; -enum { threads_harmless_mask = 0 }; -enum { threads_idle_mask = 0 }; -enum { threads_want_rdv_mask = 0 }; enum { tid_bit = 1UL }; enum { tid = 0 }; enum { tgid = 1 }; @@ -176,17 +173,14 @@ unsigned long long ha_get_pthread_id(unsigned int thr); extern volatile unsigned long all_threads_mask; extern volatile unsigned long all_tgroups_mask; -extern volatile unsigned long threads_harmless_mask; -extern volatile unsigned long threads_idle_mask; -extern volatile unsigned long threads_want_rdv_mask; extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the thread id */ extern THREAD_LOCAL unsigned int tid; /* The thread id */ extern THREAD_LOCAL unsigned int tgid; /* The thread group id (starts at 1) */ -/* explanation for threads_want_rdv_mask, and threads_harmless_mask: - * - threads_want_rdv_mask is a bit field indicating all threads that have +/* explanation for tg_ctx->threads_want_rdv, and tg_ctx->threads_harmless: + * - tg_ctx->threads_want_rdv is a bit field indicating all threads that have * requested a rendez-vous of other threads using thread_isolate(). - * - threads_harmless_mask is a bit field indicating all threads that are + * - tg_ctx->threads_harmless is a bit field indicating all threads that are * currently harmless in that they promise not to access a shared resource. * * For a given thread, its bits in want_rdv and harmless can be translated like @@ -221,6 +215,7 @@ static inline void ha_set_thread(const struct thread_info *thr) tgid = thr->tgid; tid_bit = 1UL << tid; /* FIXME: must become thr->ltid_bit */ th_ctx = &ha_thread_ctx[tid]; + tg_ctx = &ha_tgroup_ctx[tgid-1]; } else { tgid = 1; tid = 0; @@ -228,6 +223,7 @@ static inline void ha_set_thread(const struct thread_info *thr) ti = &ha_thread_info[0]; tg = &ha_tgroup_info[0]; th_ctx = &ha_thread_ctx[0]; + tg_ctx = &ha_tgroup_ctx[0]; } } @@ -240,7 +236,7 @@ static inline void ha_set_thread(const struct thread_info *thr) */ static inline void thread_idle_now() { - HA_ATOMIC_OR(&threads_idle_mask, tid_bit); + HA_ATOMIC_OR(&tg_ctx->threads_idle, ti->ltid_bit); } /* Ends the harmless period started by thread_idle_now(), i.e. the thread is @@ -257,7 +253,7 @@ static inline void thread_idle_now() */ static inline void thread_idle_end() { - HA_ATOMIC_AND(&threads_idle_mask, ~tid_bit); + HA_ATOMIC_AND(&tg_ctx->threads_idle, ~ti->ltid_bit); } @@ -269,7 +265,7 @@ static inline void thread_idle_end() */ static inline void thread_harmless_now() { - HA_ATOMIC_OR(&threads_harmless_mask, tid_bit); + HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit); } /* Ends the harmless period started by thread_harmless_now(). Usually this is @@ -280,8 +276,9 @@ static inline void thread_harmless_now() static inline void thread_harmless_end() { while (1) { - HA_ATOMIC_AND(&threads_harmless_mask, ~tid_bit); - if (likely((threads_want_rdv_mask & all_threads_mask & ~tid_bit) == 0)) + HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~tid_bit); + if (likely((_HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) & + tg->threads_enabled & ~ti->ltid_bit) == 0)) break; thread_harmless_till_end(); } @@ -290,7 +287,8 @@ static inline void thread_harmless_end() /* an isolated thread has harmless cleared and want_rdv set */ static inline unsigned long thread_isolated() { - return threads_want_rdv_mask & ~threads_harmless_mask & tid_bit; + return _HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) & + ~_HA_ATOMIC_LOAD(&tg_ctx->threads_harmless) & ti->ltid_bit; } /* Returns 1 if the cpu set is currently restricted for the process else 0. diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index f7a7494b69..7eb798e6b4 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -62,6 +62,18 @@ struct tgroup_info { char __end[0] __attribute__((aligned(64))); }; +/* This structure describes the group-specific context (e.g. active threads + * etc). It uses one cache line per thread to limit false sharing. + */ +struct tgroup_ctx { + ulong threads_want_rdv; /* mask of threads that wand a rendez-vous */ + ulong threads_harmless; /* mask of threads that are not modifying anything */ + ulong threads_idle; /* mask of threads idling in the poller */ + /* pad to cache line (64B) */ + char __pad[0]; /* unused except to check remaining room */ + char __end[0] __attribute__((aligned(64))); +}; + /* This structure describes all the per-thread info we need. When threads are * disabled, it contains the same info for the single running thread. This is * stable across all of a thread's life, and is being pointed to by the @@ -69,6 +81,7 @@ struct tgroup_info { */ struct thread_info { const struct tgroup_info *tg; /* config of the thread-group this thread belongs to */ + struct tgroup_ctx *tg_ctx; /* context of the thread-group this thread belongs to */ uint tid, ltid; /* process-wide and group-wide thread ID (start at 0) */ ulong ltid_bit; /* bit masks for the tid/ltid */ uint tgid; /* ID of the thread group this thread belongs to (starts at 1; 0=unset) */ diff --git a/include/haproxy/tinfo.h b/include/haproxy/tinfo.h index 6b654f9873..ee62bbae18 100644 --- a/include/haproxy/tinfo.h +++ b/include/haproxy/tinfo.h @@ -32,6 +32,9 @@ extern THREAD_LOCAL const struct tgroup_info *tg; extern struct thread_info ha_thread_info[MAX_THREADS]; extern THREAD_LOCAL const struct thread_info *ti; /* thread_info for the current thread */ +extern struct tgroup_ctx ha_tgroup_ctx[MAX_TGROUPS]; +extern THREAD_LOCAL struct tgroup_ctx *tg_ctx; /* ha_tgroup_ctx for the current thread */ + extern struct thread_ctx ha_thread_ctx[MAX_THREADS]; extern THREAD_LOCAL struct thread_ctx *th_ctx; /* ha_thread_ctx for the current thread */ diff --git a/src/debug.c b/src/debug.c index 50d239e154..7b0711e087 100644 --- a/src/debug.c +++ b/src/debug.c @@ -162,6 +162,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) unsigned long long p = ha_thread_ctx[thr].prev_cpu_time; unsigned long long n = now_cpu_time_thread(thr); int stuck = !!(ha_thread_ctx[thr].flags & TH_FL_STUCK); + int tgrp = ha_thread_info[thr].tgid; chunk_appendf(buf, "%c%cThread %-2u: id=0x%llx act=%d glob=%d wq=%d rq=%d tl=%d tlsz=%d rqsz=%d\n" @@ -184,7 +185,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) chunk_appendf(buf, " harmless=%d wantrdv=%d", - !!(threads_harmless_mask & thr_bit), + !!(_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp-1].threads_harmless) & thr_bit), !!(th_ctx->flags & TH_FL_TASK_PROFILING)); chunk_appendf(buf, "\n"); @@ -1365,7 +1366,7 @@ void debug_handler(int sig, siginfo_t *si, void *arg) /* mark the current thread as stuck to detect it upon next invocation * if it didn't move. */ - if (!(threads_harmless_mask & tid_bit) && + if (!(_HA_ATOMIC_LOAD(&tg_ctx->threads_harmless) & ti->ltid_bit) && !(_HA_ATOMIC_LOAD(&th_ctx->flags) & TH_FL_SLEEPING)) _HA_ATOMIC_OR(&th_ctx->flags, TH_FL_STUCK); } diff --git a/src/thread.c b/src/thread.c index 042ad965ed..cc18b76d2f 100644 --- a/src/thread.c +++ b/src/thread.c @@ -55,14 +55,14 @@ THREAD_LOCAL const struct tgroup_info *tg = &ha_tgroup_info[0]; struct thread_info ha_thread_info[MAX_THREADS] = { }; THREAD_LOCAL const struct thread_info *ti = &ha_thread_info[0]; +struct tgroup_ctx ha_tgroup_ctx[MAX_TGROUPS] = { }; +THREAD_LOCAL struct tgroup_ctx *tg_ctx = &ha_tgroup_ctx[0]; + struct thread_ctx ha_thread_ctx[MAX_THREADS] = { }; THREAD_LOCAL struct thread_ctx *th_ctx = &ha_thread_ctx[0]; #ifdef USE_THREAD -volatile unsigned long threads_want_rdv_mask __read_mostly = 0; -volatile unsigned long threads_harmless_mask = 0; -volatile unsigned long threads_idle_mask = 0; volatile unsigned long all_threads_mask __read_mostly = 1; // nbthread 1 assumed by default volatile unsigned long all_tgroups_mask __read_mostly = 1; // nbtgroup 1 assumed by default THREAD_LOCAL unsigned int tgid = 1; // thread ID starts at 1 @@ -79,8 +79,8 @@ static pthread_t ha_pthread[MAX_THREADS] = { }; */ void thread_harmless_till_end() { - _HA_ATOMIC_OR(&threads_harmless_mask, tid_bit); - while (threads_want_rdv_mask & all_threads_mask & ~tid_bit) { + _HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit); + while (_HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) & tg->threads_enabled & ~ti->ltid_bit) { ha_thread_relax(); } } @@ -88,23 +88,23 @@ void thread_harmless_till_end() /* Isolates the current thread : request the ability to work while all other * threads are harmless, as defined by thread_harmless_now() (i.e. they're not * going to touch any visible memory area). Only returns once all of them are - * harmless, with the current thread's bit in threads_harmless_mask cleared. + * harmless, with the current thread's bit in &tg_ctx->threads_harmless cleared. * Needs to be completed using thread_release(). */ void thread_isolate() { unsigned long old; - _HA_ATOMIC_OR(&threads_harmless_mask, tid_bit); + _HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit); __ha_barrier_atomic_store(); - _HA_ATOMIC_OR(&threads_want_rdv_mask, tid_bit); + _HA_ATOMIC_OR(&tg_ctx->threads_want_rdv, ti->ltid_bit); /* wait for all threads to become harmless */ - old = threads_harmless_mask; + old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless); while (1) { - if (unlikely((old & all_threads_mask) != all_threads_mask)) - old = threads_harmless_mask; - else if (_HA_ATOMIC_CAS(&threads_harmless_mask, &old, old & ~tid_bit)) + if (unlikely((old & tg->threads_enabled) != tg->threads_enabled)) + old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless); + else if (_HA_ATOMIC_CAS(&tg_ctx->threads_harmless, &old, old & ~ti->ltid_bit)) break; ha_thread_relax(); @@ -118,7 +118,7 @@ void thread_isolate() /* Isolates the current thread : request the ability to work while all other * threads are idle, as defined by thread_idle_now(). It only returns once * all of them are both harmless and idle, with the current thread's bit in - * threads_harmless_mask and idle_mask cleared. Needs to be completed using + * &tg_ctx->threads_harmless and idle_mask cleared. Needs to be completed using * thread_release(). By doing so the thread also engages in being safe against * any actions that other threads might be about to start under the same * conditions. This specifically targets destruction of any internal structure, @@ -133,20 +133,20 @@ void thread_isolate_full() { unsigned long old; - _HA_ATOMIC_OR(&threads_idle_mask, tid_bit); - _HA_ATOMIC_OR(&threads_harmless_mask, tid_bit); + _HA_ATOMIC_OR(&tg_ctx->threads_idle, ti->ltid_bit); + _HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit); __ha_barrier_atomic_store(); - _HA_ATOMIC_OR(&threads_want_rdv_mask, tid_bit); + _HA_ATOMIC_OR(&tg_ctx->threads_want_rdv, ti->ltid_bit); /* wait for all threads to become harmless */ - old = threads_harmless_mask; + old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless); while (1) { - unsigned long idle = _HA_ATOMIC_LOAD(&threads_idle_mask); + unsigned long idle = _HA_ATOMIC_LOAD(&tg_ctx->threads_idle); - if (unlikely((old & all_threads_mask) != all_threads_mask)) - old = _HA_ATOMIC_LOAD(&threads_harmless_mask); - else if ((idle & all_threads_mask) == all_threads_mask && - _HA_ATOMIC_CAS(&threads_harmless_mask, &old, old & ~tid_bit)) + if (unlikely((old & tg->threads_enabled) != tg->threads_enabled)) + old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless); + else if ((idle & tg->threads_enabled) == tg->threads_enabled && + _HA_ATOMIC_CAS(&tg_ctx->threads_harmless, &old, old & ~ti->ltid_bit)) break; ha_thread_relax(); @@ -156,17 +156,17 @@ void thread_isolate_full() * condition will need to wait until out next pass to the poller, or * our next call to thread_isolate_full(). */ - _HA_ATOMIC_AND(&threads_idle_mask, ~tid_bit); + _HA_ATOMIC_AND(&tg_ctx->threads_idle, ~ti->ltid_bit); } /* Cancels the effect of thread_isolate() by releasing the current thread's bit - * in threads_want_rdv_mask. This immediately allows other threads to expect be + * in &tg_ctx->threads_want_rdv. This immediately allows other threads to expect be * executed, though they will first have to wait for this thread to become * harmless again (possibly by reaching the poller again). */ void thread_release() { - _HA_ATOMIC_AND(&threads_want_rdv_mask, ~tid_bit); + _HA_ATOMIC_AND(&tg_ctx->threads_want_rdv, ~ti->ltid_bit); } /* Sets up threads, signals and masks, and starts threads 2 and above. @@ -1036,6 +1036,7 @@ int thread_map_to_groups() ha_tgroup_info[g].count++; ha_thread_info[t].tgid = g + 1; ha_thread_info[t].tg = &ha_tgroup_info[g]; + ha_thread_info[t].tg_ctx = &ha_tgroup_ctx[g]; ut--; /* switch to next unassigned thread */ @@ -1248,6 +1249,7 @@ static int cfg_parse_thread_group(char **args, int section_type, struct proxy *c if (ha_thread_info[tnum-1].tg == &ha_tgroup_info[tgroup-1]) { ha_thread_info[tnum-1].tg = NULL; ha_thread_info[tnum-1].tgid = 0; + ha_thread_info[tnum-1].tg_ctx = NULL; } } ha_tgroup_info[tgroup-1].count = ha_tgroup_info[tgroup-1].base = 0; @@ -1289,6 +1291,7 @@ static int cfg_parse_thread_group(char **args, int section_type, struct proxy *c ha_thread_info[tnum-1].tgid = tgroup; ha_thread_info[tnum-1].tg = &ha_tgroup_info[tgroup-1]; + ha_thread_info[tnum-1].tg_ctx = &ha_tgroup_ctx[tgroup-1]; tot++; } } diff --git a/src/wdt.c b/src/wdt.c index 20297bcb9c..4e9b7c3adf 100644 --- a/src/wdt.c +++ b/src/wdt.c @@ -54,7 +54,7 @@ void wdt_handler(int sig, siginfo_t *si, void *arg) { unsigned long long n, p; ulong thr_bit; - int thr; + int thr, tgrp; switch (si->si_code) { case SI_TIMER: @@ -72,6 +72,7 @@ void wdt_handler(int sig, siginfo_t *si, void *arg) if (thr < 0 || thr >= global.nbthread) break; + tgrp = ha_thread_info[thr].tgid; thr_bit = ha_thread_info[thr].ltid_bit; p = ha_thread_ctx[thr].prev_cpu_time; n = now_cpu_time_thread(thr); @@ -83,7 +84,7 @@ void wdt_handler(int sig, siginfo_t *si, void *arg) goto update_and_leave; if ((_HA_ATOMIC_LOAD(&th_ctx->flags) & TH_FL_SLEEPING) && - ((threads_harmless_mask|threads_to_dump) & thr_bit)) { + ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp-1].threads_harmless) | threads_to_dump) & thr_bit)) { /* This thread is currently doing exactly nothing * waiting in the poll loop (unlikely but possible), * waiting for all other threads to join the rendez-vous