]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: threads: Adds a set of functions to handle sync-point
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 19 Oct 2017 09:59:15 +0000 (11:59 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:29 +0000 (13:58 +0100)
A sync-point is a protected area where you have the warranty that no concurrency
access is possible. It is implementated as a thread barrier to enter in the
sync-point and another one to exit from it. Inside the sync-point, all threads
that must do some syncrhonous processing will be called one after the other
while all other threads will wait. All threads will then exit from the
sync-point at the same time.

A sync-point will be evaluated only when necessary because it is a costly
operation. To limit the waiting time of each threads, we must have a mechanism
to wakeup all threads. This is done with a pipe shared by all threads. By
writting in this pipe, we will interrupt all threads blocked on a poller. The
pipe is then flushed before exiting from the sync-point.

include/common/hathreads.h
src/hathreads.c

index 6323e9a369dcc89d456fb044ff86b78b2a96cf02..257f09b8aeb8e8e3f2a3a092b47e546ffc55e4d1 100644 (file)
@@ -60,6 +60,15 @@ extern THREAD_LOCAL unsigned int tid_bit; /* The bit corresponding to the thread
                *(val);                                                 \
        })
 
+
+#define THREAD_SYNC_INIT(m)  do { /* do nothing */ } while(0)
+#define THREAD_SYNC_ENABLE() do { /* do nothing */ } while(0)
+#define THREAD_WANT_SYNC()   do { /* do nothing */ } while(0)
+#define THREAD_ENTER_SYNC()  do { /* do nothing */ } while(0)
+#define THREAD_EXIT_SYNC()   do { /* do nothing */ } while(0)
+#define THREAD_NO_SYNC()     ({ 0; })
+#define THREAD_NEED_SYNC()   ({ 1; })
+
 #define SPIN_INIT(l)         do { /* do nothing */ } while(0)
 #define SPIN_DESTROY(l)      do { /* do nothing */ } while(0)
 #define SPIN_LOCK(lbl, l)    do { /* do nothing */ } while(0)
@@ -109,10 +118,27 @@ extern THREAD_LOCAL unsigned int tid_bit; /* The bit corresponding to the thread
                (*val);                                                 \
        })
 
+#define THREAD_SYNC_INIT(m)   thread_sync_init(m)
+#define THREAD_SYNC_ENABLE()  thread_sync_enable()
+#define THREAD_WANT_SYNC()    thread_want_sync()
+#define THREAD_ENTER_SYNC()   thread_enter_sync()
+#define THREAD_EXIT_SYNC()    thread_exit_sync()
+#define THREAD_NO_SYNC()      thread_no_sync()
+#define THREAD_NEED_SYNC()    thread_need_sync()
+
+int  thread_sync_init(unsigned long mask);
+void thread_sync_enable(void);
+void thread_want_sync(void);
+void thread_enter_sync(void);
+void thread_exit_sync(void);
+int  thread_no_sync(void);
+int  thread_need_sync(void);
+
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 
 enum lock_label {
-       LOCK_LABELS = 0
+       THREAD_SYNC_LOCK = 0,
+       LOCK_LABELS
 };
 struct lock_stat {
        uint64_t nsec_wait_for_write;
@@ -194,7 +220,7 @@ struct ha_rwlock {
 
 static inline void show_lock_stats()
 {
-       const char *labels[LOCK_LABELS] = {};
+       const char *labels[LOCK_LABELS] = {"THREAD_SYNC" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
index ea48ce58ee6e976d12202f2453467ce9cc47185e..4c795b12411a20518e84a5e72f05f6fd79c5fcbe 100644 (file)
  *
  */
 
+#include <unistd.h>
+#include <fcntl.h>
+
 #include <common/hathreads.h>
+#include <common/standard.h>
+#include <proto/fd.h>
 
 THREAD_LOCAL unsigned int tid     = 0;
 THREAD_LOCAL unsigned int tid_bit = (1UL << 0);
 
 #ifdef USE_THREAD
 
+static HA_SPINLOCK_T sync_lock;
+static int           threads_sync_pipe[2];
+static unsigned long threads_want_sync = 0;
+static unsigned long all_threads_mask  = 0;
+
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 struct lock_stat lock_stats[LOCK_LABELS];
 #endif
 
+/* Dummy I/O handler used by the sync pipe.*/
+static void thread_sync_io_handler(int fd) { }
+
+/* Initializes the sync point. It creates a pipe used by threads to wakup all
+ * others when a sync is requested. It also initialize the mask of all create
+ * threads. It returns 0 on success and -1 if an error occurred.
+ */
+int thread_sync_init(unsigned long mask)
+{
+       int rfd;
+
+       if (pipe(threads_sync_pipe) < 0)
+               return -1;
+
+       rfd = threads_sync_pipe[0];
+       fcntl(rfd, F_SETFL, O_NONBLOCK);
+
+       fdtab[rfd].owner = NULL;
+       fdtab[rfd].iocb = thread_sync_io_handler;
+       fd_insert(rfd);
+
+       all_threads_mask = mask;
+       return 0;
+}
+
+/* Enables the sync point. */
+void thread_sync_enable(void)
+{
+       fd_want_recv(threads_sync_pipe[0]);
+}
+
+/* Called when a thread want to pass into the sync point. It subscribes the
+ * current thread in threads waiting for sync by update a bit-field. It this is
+ * the first one, it wakeup all other threads by writing on the sync pipe.
+ */
+void thread_want_sync()
+{
+       if (all_threads_mask) {
+               if (HA_ATOMIC_OR(&threads_want_sync, tid_bit) == tid_bit)
+                       shut_your_big_mouth_gcc(write(threads_sync_pipe[1], "S", 1));
+       }
+       else {
+               threads_want_sync = 1;
+       }
+}
+
+/* Returns 1 if no thread has requested a sync. Otherwise, it returns 0. */
+int thread_no_sync()
+{
+       return (threads_want_sync == 0);
+}
+
+/* Returns 1 if the current thread has requested a sync. Otherwise, it returns
+ * 0.
+ */
+int thread_need_sync()
+{
+       return (threads_want_sync & tid_bit);
+}
+
+/* Thread barrier. Synchronizes all threads at the barrier referenced by
+ * <barrier>. The calling thread shall block until all other threads have called
+ * thread_sync_barrier specifying the same barrier.
+ *
+ * If you need to use several barriers at differnt points, you need to use a
+ * different <barrier> for each point.
+ */
+static inline void thread_sync_barrier(volatile unsigned long *barrier)
+{
+       unsigned long old = all_threads_mask;
+
+       HA_ATOMIC_CAS(barrier, &old, 0);
+       HA_ATOMIC_OR(barrier, tid_bit;
+       while (*barrier != all_threads_mask)
+               pl_cpu_relax();
+}
+
+/* Enter into the sync point and lock it if the current thread has requested a
+ * sync. */
+void thread_enter_sync()
+{
+       static volatile unsigned long barrier = 0;
+
+       if (!all_threads_mask)
+               return;
+
+       thread_sync_barrier(&barrier);
+       if (threads_want_sync & tid_bit)
+               SPIN_LOCK(THREAD_SYNC_LOCK, &sync_lock);
+}
+
+/* Exit from the sync point and unlock it if it was previously locked. If the
+ * current thread is the last one to have requested a sync, the sync pipe is
+ * flushed.
+ */
+void thread_exit_sync()
+{
+       static volatile unsigned long barrier = 0;
+
+       if (!all_threads_mask)
+               return;
+
+       if (threads_want_sync & tid_bit)
+               SPIN_UNLOCK(THREAD_SYNC_LOCK, &sync_lock);
+
+       if (HA_ATOMIC_AND(&threads_want_sync, ~tid_bit) == 0) {
+               char c;
+
+               shut_your_big_mouth_gcc(read(threads_sync_pipe[0], &c, 1));
+               fd_done_recv(threads_sync_pipe[0]);
+       }
+
+       thread_sync_barrier(&barrier);
+}
+
+
 __attribute__((constructor))
 static void __hathreads_init(void)
 {
-
+       SPIN_INIT(&sync_lock);
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
        memset(lock_stats, 0, sizeof(lock_stats));
 #endif
-
 }
 
 #endif