]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: hathreads: implement a more flexible rendez-vous point
authorWilly Tarreau <w@1wt.eu>
Thu, 2 Aug 2018 08:16:17 +0000 (10:16 +0200)
committerWilly Tarreau <w@1wt.eu>
Thu, 2 Aug 2018 15:51:45 +0000 (17:51 +0200)
The current synchronization point enforces certain restrictions which
are hard to workaround in certain areas of the code. The fact that the
critical code can only be called from the sync point itself is a problem
for some callback-driven parts. The "show fd" command for example is
fragile regarding this.

Also it is expensive in terms of CPU usage because it wakes every other
thread just to be sure all of them join to the rendez-vous point. It's a
problem because the sleeping threads would not need to be woken up just
to know they're doing nothing.

Here we implement a different approach. We keep track of harmless threads,
which are defined as those either doing nothing, or doing harmless things.
The rendez-vous is used "for others" as a way for a thread to isolate itself.
A thread then requests to be alone using thread_isolate() when approaching
the dangerous area, and then waits until all other threads are either doing
the same or are doing something harmless (typically polling). The function
only returns once the thread is guaranteed to be alone, and the critical
section is terminated using thread_release().

include/common/hathreads.h
src/ev_epoll.c
src/ev_kqueue.c
src/ev_poll.c
src/ev_select.c
src/hathreads.c

index 4cf3db9bee839672d9c40eab7e8ead81cbe771e3..574547f3670b6746803a4f7c46462fd2a914a333 100644 (file)
@@ -135,6 +135,27 @@ static inline void __ha_barrier_full(void)
 {
 }
 
+static inline void thread_harmless_now()
+{
+}
+
+static inline void thread_harmless_end()
+{
+}
+
+static inline void thread_isolate()
+{
+}
+
+static inline void thread_release()
+{
+}
+
+static inline unsigned long thread_isolated()
+{
+       return 1;
+}
+
 #else /* USE_THREAD */
 
 #include <stdio.h>
@@ -272,10 +293,34 @@ void thread_enter_sync(void);
 void thread_exit_sync(void);
 int  thread_no_sync(void);
 int  thread_need_sync(void);
+void thread_harmless_till_end();
+void thread_isolate();
+void thread_release();
 
 extern THREAD_LOCAL unsigned int tid;     /* The thread id */
 extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the thread id */
 extern volatile unsigned long all_threads_mask;
+extern volatile unsigned long threads_want_rdv_mask;
+extern volatile unsigned long threads_harmless_mask;
+
+/* explanation for threads_want_rdv_mask and threads_harmless_mask :
+ * - threads_want_rdv_mask is a bit field indicating all threads that have
+ *   requested a rendez-vous of other threads using thread_isolate().
+ * - threads_harmless_mask is a bit field indicating all threads that are
+ *   currently harmless in that they promise not to access a shared resource.
+ *
+ * For a given thread, its bits in want_rdv and harmless can be translated like
+ * this :
+ *
+ *  ----------+----------+----------------------------------------------------
+ *   want_rdv | harmless | description
+ *  ----------+----------+----------------------------------------------------
+ *       0    |     0    | thread not interested in RDV, possibly harmful
+ *       0    |     1    | thread not interested in RDV but harmless
+ *       1    |     1    | thread interested in RDV and waiting for its turn
+ *       1    |     0    | thread currently working isolated from others
+ *  ----------+----------+----------------------------------------------------
+ */
 
 #define ha_sigmask(how, set, oldset)  pthread_sigmask(how, set, oldset)
 
@@ -286,6 +331,38 @@ static inline void ha_set_tid(unsigned int data)
        tid_bit = (1UL << tid);
 }
 
+/* Marks the thread as harmless. Note: this must be true, i.e. the thread must
+ * not be touching any unprotected shared resource during this period. Usually
+ * this is called before poll(), but it may also be placed around very slow
+ * calls (eg: some crypto operations). Needs to be terminated using
+ * thread_harmless_end().
+ */
+static inline void thread_harmless_now()
+{
+       HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
+}
+
+/* Ends the harmless period started by thread_harmless_now(). Usually this is
+ * placed after the poll() call. If it is discovered that a job was running and
+ * is relying on the thread still being harmless, the thread waits for the
+ * other one to finish.
+ */
+static inline void thread_harmless_end()
+{
+       while (1) {
+               HA_ATOMIC_AND(&threads_harmless_mask, ~tid_bit);
+               if (likely((threads_want_rdv_mask & all_threads_mask) == 0))
+                       break;
+               thread_harmless_till_end();
+       }
+}
+
+/* an isolated thread has harmless cleared and want_rdv set */
+static inline unsigned long thread_isolated()
+{
+       return threads_want_rdv_mask & ~threads_harmless_mask & tid_bit;
+}
+
 
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 
index abc22ba76e77188da44a690e05cb90faf003f838..f672c66cbf4a6f3d7fecd56644caf5d983878968 100644 (file)
@@ -17,6 +17,7 @@
 #include <common/config.h>
 #include <common/debug.h>
 #include <common/epoll.h>
+#include <common/hathreads.h>
 #include <common/standard.h>
 #include <common/ticks.h>
 #include <common/time.h>
@@ -141,6 +142,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                _update_fd(fd);
        }
 
+       thread_harmless_now();
+
        /* compute the epoll_wait() timeout */
        if (!exp)
                wait_time = MAX_DELAY_MS;
@@ -161,6 +164,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
        tv_update_date(wait_time, status);
        measure_idle();
 
+       thread_harmless_end();
+
        /* process polled events */
 
        for (count = 0; count < status; count++) {
index bf7f666dc8147213626a6ff206e7dd48a9885a8f..087a07e7832db2d469e3dad1f4a010cf865af28a 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <common/compat.h>
 #include <common/config.h>
+#include <common/hathreads.h>
 #include <common/ticks.h>
 #include <common/time.h>
 #include <common/tools.h>
@@ -112,6 +113,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                changes = _update_fd(fd, changes);
        }
 
+       thread_harmless_now();
+
        if (changes) {
 #ifdef EV_RECEIPT
                kev[0].flags |= EV_RECEIPT;
@@ -154,6 +157,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
        tv_update_date(delta_ms, status);
        measure_idle();
 
+       thread_harmless_end();
+
        for (count = 0; count < status; count++) {
                unsigned int n = 0;
                fd = kev[count].ident;
index a2e8798e4a519d850382540baaf6aea34e2c9d1e..712bdf9918a947fa06ff4012f2cf5528a65df40f 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <common/compat.h>
 #include <common/config.h>
+#include <common/hathreads.h>
 #include <common/ticks.h>
 #include <common/time.h>
 
@@ -156,6 +157,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        break;
        } while (!HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd));
 
+       thread_harmless_now();
+
        fd_nbupdt = 0;
 
        nbfd = 0;
@@ -207,6 +210,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
        tv_update_date(wait_time, status);
        measure_idle();
 
+       thread_harmless_end();
+
        for (count = 0; status > 0 && count < nbfd; count++) {
                unsigned int n;
                int e = poll_events[count].revents;
index 4890c49ded9fc7017c39274b98ec8aac69cc8e2f..d248d6d44a1d53bf23e5e5f66abf0507ebfbcd29 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <common/compat.h>
 #include <common/config.h>
+#include <common/hathreads.h>
 #include <common/ticks.h>
 #include <common/time.h>
 
@@ -148,6 +149,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        break;
        } while (!HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd));
 
+       thread_harmless_now();
+
        fd_nbupdt = 0;
 
        /* let's restore fdset state */
@@ -186,6 +189,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
        tv_update_date(delta_ms, status);
        measure_idle();
 
+       thread_harmless_end();
+
        if (status <= 0)
                return;
 
index 595a71717c4015212a6cf3218b74a75716060143..69dcab9dcde3332a4d19ee82836612eafbbdaf09 100644 (file)
@@ -30,6 +30,8 @@ void thread_sync_io_handler(int fd)
 static HA_SPINLOCK_T sync_lock;
 static int           threads_sync_pipe[2];
 static unsigned long threads_want_sync = 0;
+volatile unsigned long threads_want_rdv_mask = 0;
+volatile unsigned long threads_harmless_mask = 0;
 volatile unsigned long all_threads_mask  = 1; // nbthread 1 assumed by default
 THREAD_LOCAL unsigned int  tid           = 0;
 THREAD_LOCAL unsigned long tid_bit       = (1UL << 0);
@@ -160,6 +162,68 @@ void thread_exit_sync()
        thread_sync_barrier(&barrier);
 }
 
+/* Marks the thread as harmless until the last thread using the rendez-vous
+ * point quits. Given that we can wait for a long time, sched_yield() is used
+ * when available to offer the CPU resources to competing threads if needed.
+ */
+void thread_harmless_till_end()
+{
+               HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
+               while (threads_want_rdv_mask & all_threads_mask) {
+#if _POSIX_PRIORITY_SCHEDULING
+                       sched_yield();
+#else
+                       pl_cpu_relax();
+#endif
+               }
+}
+
+/* Isolates the current thread : request the ability to work while all other
+ * threads are harmless. Only returns once all of them are harmless, with the
+ * current thread's bit in threads_harmless_mask cleared. Needs to be completed
+ * using thread_release().
+ */
+void thread_isolate()
+{
+       unsigned long old;
+
+       HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
+       __ha_barrier_store();
+       HA_ATOMIC_OR(&threads_want_rdv_mask, tid_bit);
+
+       /* wait for all threads to become harmless */
+       old = threads_harmless_mask;
+       while (1) {
+               if (unlikely((old & all_threads_mask) != all_threads_mask))
+                       old = threads_harmless_mask;
+               else if (HA_ATOMIC_CAS(&threads_harmless_mask, &old, old & ~tid_bit))
+                       break;
+
+#if _POSIX_PRIORITY_SCHEDULING
+               sched_yield();
+#else
+               pl_cpu_relax();
+#endif
+       }
+       /* one thread gets released at a time here, with its harmess bit off.
+        * The loss of this bit makes the other one continue to spin while the
+        * thread is working alone.
+        */
+}
+
+/* Cancels the effect of thread_isolate() by releasing the current thread's bit
+ * in threads_want_rdv_mask and by marking this thread as harmless until the
+ * last worker finishes.
+ */
+void thread_release()
+{
+       while (1) {
+               HA_ATOMIC_AND(&threads_want_rdv_mask, ~tid_bit);
+               if (!(threads_want_rdv_mask & all_threads_mask))
+                       break;
+               thread_harmless_till_end();
+       }
+}
 
 __attribute__((constructor))
 static void __hathreads_init(void)