]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: pollers: Add a way to wake a thread sleeping in the poller.
authorOlivier Houchard <ohouchard@haproxy.com>
Thu, 26 Jul 2018 15:55:11 +0000 (17:55 +0200)
committerWilly Tarreau <w@1wt.eu>
Thu, 26 Jul 2018 17:09:50 +0000 (19:09 +0200)
Add a new pipe, one per thread, so that we can write on it to wake a thread
sleeping in a poller, and use it to wake threads supposed to take care of a
task, if they are all sleeping.

include/proto/fd.h
include/types/global.h
src/cli.c
src/fd.c
src/haproxy.c
src/task.c

index c5a03f775b95fd2627f0dc68e1b8f6d215040341..a4cee3220ef33aff29d9a1c81ae763c0d33e6a95 100644 (file)
@@ -45,6 +45,8 @@ extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
 extern THREAD_LOCAL int *fd_updt;  // FD updates list
 extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
 
+extern int poller_wr_pipe[MAX_THREADS];
+
 __decl_hathreads(extern HA_RWLOCK_T   __attribute__((aligned(64))) fdcache_lock);    /* global lock to protect fd_cache array */
 
 /* Deletes an FD from the fdsets.
@@ -60,6 +62,8 @@ void fd_remove(int fd);
 /* disable the specified poller */
 void disable_poller(const char *poller_name);
 
+void poller_pipe_io_handler(int fd);
+
 /*
  * Initialize the pollers till the best one is found.
  * If none works, returns 0, otherwise 1.
@@ -516,6 +520,13 @@ static inline unsigned int hap_fd_isset(int fd, unsigned int *evts)
        return evts[fd / (8*sizeof(*evts))] & (1U << (fd & (8*sizeof(*evts) - 1)));
 }
 
+static inline void wake_thread(int tid)
+{
+       char c = 'c';
+
+       shut_your_big_mouth_gcc(write(poller_wr_pipe[tid], &c, 1));
+}
+
 
 #endif /* _PROTO_FD_H */
 
index a684ea6dd74c3a32398900e90c69ed03ea6a5bc2..616e8d3e5818b547404a5eadd40f3caec2a157a1 100644 (file)
@@ -219,6 +219,7 @@ extern char localpeer[MAX_HOSTNAME_LEN];
 extern struct list global_listener_queue; /* list of the temporarily limited listeners */
 extern struct task *global_listener_queue_task;
 extern unsigned int warned;     /* bitfield of a few warnings to emit just once */
+extern volatile unsigned long sleeping_thread_mask;
 
 /* bit values to go with "warned" above */
 #define WARN_BLOCK_DEPRECATED       0x00000001
index d028429605fdf8e537cf94bf8a6a11a59d37b6e5..739107dab633419d04af9d3585682deef1d9d827 100644 (file)
--- a/src/cli.c
+++ b/src/cli.c
@@ -890,6 +890,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx)
                             (fdt.iocb == dgram_fd_handler) ? "dgram_fd_handler" :
                             (fdt.iocb == listener_accept)  ? "listener_accept" :
                             (fdt.iocb == thread_sync_io_handler) ? "thread_sync_io_handler" :
+                            (fdt.iocb == poller_pipe_io_handler) ? "poller_pipe_io_handler" :
                             "unknown");
 
                if (fdt.iocb == conn_fd_handler) {
index 3b023a8b05bd4ef14f03f2bfcfbf815120982969..cbb7b478ecb4d68bad8375bad751b3aa4000dddd 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
+#include <fcntl.h>
 #include <sys/types.h>
 
 #include <common/compat.h>
@@ -176,6 +177,8 @@ unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
 
 THREAD_LOCAL int *fd_updt  = NULL;  // FD updates list
 THREAD_LOCAL int  fd_nbupdt = 0;   // number of updates in the list
+THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread
+int poller_wr_pipe[MAX_THREADS]; // Pipe to wake the threads
 
 #define _GET_NEXT(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->next
 #define _GET_PREV(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->prev
@@ -461,11 +464,31 @@ void disable_poller(const char *poller_name)
                        pollers[p].pref = 0;
 }
 
+void poller_pipe_io_handler(int fd)
+{
+       char buf[1024];
+       /* Flush the pipe */
+       while (read(fd, buf, sizeof(buf)) > 0);
+       fd_cant_recv(fd);
+}
+
 /* Initialize the pollers per thread */
 static int init_pollers_per_thread()
 {
+       int mypipe[2];
        if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
                return 0;
+       if (pipe(mypipe) < 0) {
+               free(fd_updt);
+               fd_updt = NULL;
+               return 0;
+       }
+       poller_rd_pipe = mypipe[0];
+       poller_wr_pipe[tid] = mypipe[1];
+       fcntl(poller_rd_pipe, F_SETFL, O_NONBLOCK);
+       fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler,
+           tid_bit);
+       fd_want_recv(poller_rd_pipe);
        return 1;
 }
 
index e0e8791f07babfdfb4d0d38ff43191063ccbf1aa..a1743916315abe9c432e937c7ec0b9004dbdd99a 100644 (file)
@@ -123,6 +123,7 @@ int  pid;                   /* current process id */
 int  relative_pid = 1;         /* process id starting at 1 */
 unsigned long pid_bit = 1;      /* bit corresponding to the process id */
 
+volatile unsigned long sleeping_thread_mask; /* Threads that are about to sleep in poll() */
 /* global options */
 struct global global = {
        .hard_stop_after = TICK_ETERNITY,
@@ -2427,11 +2428,20 @@ static void run_poll_loop()
                        activity[tid].wake_tasks++;
                else if (signal_queue_len && tid == 0)
                        activity[tid].wake_signal++;
-               else
-                       exp = next;
+               else {
+                       HA_ATOMIC_OR(&sleeping_thread_mask, tid_bit);
+                       __ha_barrier_store();
+                       if (active_tasks_mask & tid_bit) {
+                               activity[tid].wake_tasks++;
+                               HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
+                       } else
+                               exp = next;
+               }
 
                /* The poller will ensure it returns around <next> */
                cur_poller.poll(&cur_poller, exp);
+               if (sleeping_thread_mask & tid_bit)
+                       HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
                fd_process_cached_events();
 
 
index 672730b8ec3a2919efe990b76bef1edae8897112..6e7441f1bf689fe43a80d1744f3d8a97bee4bce4 100644 (file)
@@ -23,6 +23,7 @@
 #include <proto/proxy.h>
 #include <proto/stream.h>
 #include <proto/task.h>
+#include <proto/fd.h>
 
 struct pool_head *pool_head_task;
 struct pool_head *pool_head_tasklet;
@@ -70,6 +71,7 @@ void __task_wakeup(struct task *t, struct eb_root *root)
 {
        void *expected = NULL;
        int *rq_size;
+       unsigned long old_active_mask;
 
 #ifdef USE_THREAD
        if (root == &rqueue) {
@@ -125,6 +127,7 @@ redo:
                __ha_barrier_store();
        }
 #endif
+       old_active_mask = active_tasks_mask;
        HA_ATOMIC_OR(&active_tasks_mask, t->thread_mask);
        t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1);
 
@@ -152,6 +155,13 @@ redo:
 
                rqueue_size[nb]++;
        }
+       /* If all threads that are supposed to handle this task are sleeping,
+        * wake one.
+        */
+       if ((((t->thread_mask & all_threads_mask) & sleeping_thread_mask) ==
+           (t->thread_mask & all_threads_mask)) &&
+           !(t->thread_mask & old_active_mask))
+               wake_thread(my_ffsl((t->thread_mask & all_threads_mask) &~ tid_bit) - 1);
        return;
 }