]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
BUG/MEDIUM: threads: Run the poll loop on the main thread too
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 27 Oct 2017 11:53:47 +0000 (13:53 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:33 +0000 (13:58 +0100)
There was a flaw in the way the threads was created. the main one was just used
to create all the others and just wait to exit. Now, it is used to run a poll
loop. So we only create nbthread-1 threads.

This also fixes a bug about the compression filter when there is only 1 thread
(nbthread == 1 or no threads support). The bug was in the way thread-local
resources was initialized. per-thread init/deinit callbacks were never called
for the main process. So, with nthread set to 1, some buffers remained
uninitialized.

include/common/chunk.h
src/buffer.c
src/chunk.c
src/ev_epoll.c
src/ev_kqueue.c
src/ev_poll.c
src/ev_select.c
src/fd.c
src/haproxy.c
src/log.c

index 82667733a952b06dcf4d21d3d825d15a8da8cfce..1a12d0d13032eff4bbb27197a2f171897ab8ed16 100644 (file)
@@ -52,7 +52,7 @@ int chunk_strcmp(const struct chunk *chk, const char *str);
 int chunk_strcasecmp(const struct chunk *chk, const char *str);
 struct chunk *get_trash_chunk(void);
 struct chunk *alloc_trash_chunk(void);
-int init_trash_buffers(void);
+int init_trash_buffers(int first);
 void deinit_trash_buffers(void);
 
 /*
index e892d1e4d4bf87295e719a64eaaa1e873f2fbe36..db2e053e721b8ece63b3c124cb9df9c9cb862468 100644 (file)
@@ -83,19 +83,13 @@ int init_buffer()
 
        pool_free2(pool2_buffer, buffer);
 
-       if (global.nbthread > 1) {
-               hap_register_per_thread_init(init_buffer_per_thread);
-               hap_register_per_thread_deinit(deinit_buffer_per_thread);
-       }
-       else if (!init_buffer_per_thread())
-                    return 0;
-
+       hap_register_per_thread_init(init_buffer_per_thread);
+       hap_register_per_thread_deinit(deinit_buffer_per_thread);
        return 1;
 }
 
 void deinit_buffer()
 {
-       deinit_buffer_per_thread();
        pool_destroy2(pool2_buffer);
 }
 
index 1fdcebb0389f0092b40eeea0efe2148e9c962b4a..15c681a1b2dcb2bdb0fd56b21c2158df30e93c94 100644 (file)
@@ -68,23 +68,37 @@ struct chunk *get_trash_chunk(void)
  */
 static int alloc_trash_buffers(int bufsize)
 {
+       chunk_init(&trash, my_realloc2(trash.str, bufsize), bufsize);
        trash_size = bufsize;
        trash_buf1 = (char *)my_realloc2(trash_buf1, bufsize);
        trash_buf2 = (char *)my_realloc2(trash_buf2, bufsize);
-       pool2_trash = create_pool("trash", sizeof(struct chunk) + bufsize, MEM_F_EXACT);
-       return trash_buf1 && trash_buf2 && pool2_trash;
+       return trash.str && trash_buf1 && trash_buf2;
+}
+
+static int init_trash_buffers_per_thread()
+{
+       return alloc_trash_buffers(global.tune.bufsize);
+}
+
+static void deinit_trash_buffers_per_thread()
+{
+       chunk_destroy(&trash);
+       free(trash_buf2);
+       free(trash_buf1);
+       trash_buf2 = NULL;
+       trash_buf1 = NULL;
 }
 
 /* Initialize the trash buffers. It returns 0 if an error occurred. */
-int init_trash_buffers()
+int init_trash_buffers(int first)
 {
-       if (global.nbthread > 1 && tid == (unsigned int)(-1)) {
-               hap_register_per_thread_init(init_trash_buffers);
-               hap_register_per_thread_deinit(deinit_trash_buffers);
+       if (!first) {
+               hap_register_per_thread_init(init_trash_buffers_per_thread);
+               hap_register_per_thread_deinit(deinit_trash_buffers_per_thread);
        }
-
-       chunk_init(&trash, my_realloc2(trash.str, global.tune.bufsize), global.tune.bufsize);
-       if (!trash.str || !alloc_trash_buffers(global.tune.bufsize))
+       pool_destroy2(pool2_trash);
+       pool2_trash = create_pool("trash", sizeof(struct chunk) + global.tune.bufsize, MEM_F_EXACT);
+       if (!pool2_trash || !alloc_trash_buffers(global.tune.bufsize))
                return 0;
        return 1;
 }
@@ -94,11 +108,7 @@ int init_trash_buffers()
  */
 void deinit_trash_buffers(void)
 {
-       chunk_destroy(&trash);
-       free(trash_buf2);
-       free(trash_buf1);
-       trash_buf2 = NULL;
-       trash_buf1 = NULL;
+       pool_destroy2(pool2_trash);
 }
 
 /*
index 642b4d61f6f0bae02e7ca56cc9c177f8726d093b..d26c3a21f12f13eded54e61fc2cf7e69bc30c703 100644 (file)
@@ -176,6 +176,7 @@ static int init_epoll_per_thread()
 static void deinit_epoll_per_thread()
 {
        free(epoll_events);
+       epoll_events = NULL;
 }
 
 /*
@@ -191,18 +192,11 @@ REGPRM1 static int _do_init(struct poller *p)
        if (epoll_fd < 0)
                goto fail_fd;
 
-       if (global.nbthread > 1) {
-               hap_register_per_thread_init(init_epoll_per_thread);
-               hap_register_per_thread_deinit(deinit_epoll_per_thread);
-       }
-       else if (!init_epoll_per_thread())
-               goto fail_ee;
+       hap_register_per_thread_init(init_epoll_per_thread);
+       hap_register_per_thread_deinit(deinit_epoll_per_thread);
 
        return 1;
 
- fail_ee:
-       close(epoll_fd);
-       epoll_fd = -1;
  fail_fd:
        p->pref = 0;
        return 0;
@@ -214,14 +208,11 @@ REGPRM1 static int _do_init(struct poller *p)
  */
 REGPRM1 static void _do_term(struct poller *p)
 {
-       free(epoll_events);
-
        if (epoll_fd >= 0) {
                close(epoll_fd);
                epoll_fd = -1;
        }
 
-       epoll_events = NULL;
        p->private = NULL;
        p->pref = 0;
 }
index 8f20db26a53880df2281fde7c44d629e9460617f..2ae27748a16742212e9d2de42401af48e4b4416b 100644 (file)
@@ -154,6 +154,7 @@ static int init_kqueue_per_thread()
 static void deinit_kqueue_per_thread()
 {
        free(kev);
+       kev = NULL;
 }
 
 /*
@@ -169,18 +170,10 @@ REGPRM1 static int _do_init(struct poller *p)
        if (kqueue_fd < 0)
                goto fail_fd;
 
-       if (global.nbthread > 1) {
-               hap_register_per_thread_init(init_kqueue_per_thread);
-               hap_register_per_thread_deinit(deinit_kqueue_per_thread);
-       }
-       else if (!init_kqueue_per_thread())
-               goto fail_kev;
-
+       hap_register_per_thread_init(init_kqueue_per_thread);
+       hap_register_per_thread_deinit(deinit_kqueue_per_thread);
        return 1;
 
- fail_kev:
-       close(kqueue_fd);
-       kqueue_fd = -1;
  fail_fd:
        p->pref = 0;
        return 0;
@@ -192,8 +185,6 @@ REGPRM1 static int _do_init(struct poller *p)
  */
 REGPRM1 static void _do_term(struct poller *p)
 {
-       free(kev);
-
        if (kqueue_fd >= 0) {
                close(kqueue_fd);
                kqueue_fd = -1;
index 455c4e1e916cef43fb3c58ac8bb30f30d3181e6f..f7632b61fbdf8cc11a41ef3f6692c86825a633cc 100644 (file)
@@ -191,6 +191,7 @@ static int init_poll_per_thread()
 static void deinit_poll_per_thread()
 {
        free(poll_events);
+       poll_events = NULL;
 }
 
 /*
@@ -200,31 +201,26 @@ static void deinit_poll_per_thread()
  */
 REGPRM1 static int _do_init(struct poller *p)
 {
-       __label__ fail_swevt, fail_srevt, fail_pe;
+       __label__ fail_swevt, fail_srevt;
        int fd_evts_bytes;
 
        p->private = NULL;
        fd_evts_bytes = (global.maxsock + sizeof(**fd_evts) - 1) / sizeof(**fd_evts) * sizeof(**fd_evts);
 
-       if (global.nbthread > 1) {
-               hap_register_per_thread_init(init_poll_per_thread);
-               hap_register_per_thread_deinit(deinit_poll_per_thread);
-       }
-       else if (!init_poll_per_thread())
-               goto fail_pe;
-
        if ((fd_evts[DIR_RD] = calloc(1, fd_evts_bytes)) == NULL)
                goto fail_srevt;
        if ((fd_evts[DIR_WR] = calloc(1, fd_evts_bytes)) == NULL)
                goto fail_swevt;
 
+       hap_register_per_thread_init(init_poll_per_thread);
+       hap_register_per_thread_deinit(deinit_poll_per_thread);
+
        return 1;
 
  fail_swevt:
        free(fd_evts[DIR_RD]);
  fail_srevt:
        free(poll_events);
- fail_pe:
        p->pref = 0;
        return 0;
 }
@@ -237,7 +233,6 @@ REGPRM1 static void _do_term(struct poller *p)
 {
        free(fd_evts[DIR_WR]);
        free(fd_evts[DIR_RD]);
-       free(poll_events);
        p->private = NULL;
        p->pref = 0;
 }
index 49e980f00807ac79e3c82a729614430ab0f06926..4f5a2d1126c520e0b7b56f883268e8b2cbd9ee1b 100644 (file)
@@ -173,8 +173,8 @@ static int init_select_per_thread()
 
 static void deinit_select_per_thread()
 {
-       free(tmp_evts[DIR_WR]);
-       free(tmp_evts[DIR_RD]);
+       free(tmp_evts[DIR_WR]); tmp_evts[DIR_WR] = NULL;
+       free(tmp_evts[DIR_RD]); tmp_evts[DIR_RD] = NULL;
 }
 
 /*
@@ -193,18 +193,15 @@ REGPRM1 static int _do_init(struct poller *p)
                goto fail_revt;
 
        fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
-       if (global.nbthread > 1) {
-               hap_register_per_thread_init(init_select_per_thread);
-               hap_register_per_thread_deinit(deinit_select_per_thread);
-       }
-       else if (!init_select_per_thread())
-               goto fail_revt;
 
        if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
                goto fail_srevt;
        if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
                goto fail_swevt;
 
+       hap_register_per_thread_init(init_select_per_thread);
+       hap_register_per_thread_deinit(deinit_select_per_thread);
+
        return 1;
 
  fail_swevt:
@@ -225,8 +222,6 @@ REGPRM1 static void _do_term(struct poller *p)
 {
        free(fd_evts[DIR_WR]);
        free(fd_evts[DIR_RD]);
-       free(tmp_evts[DIR_WR]);
-       free(tmp_evts[DIR_RD]);
        p->private = NULL;
        p->pref = 0;
 }
index 58bc8638194f16f62070c0352b58620d5c655611..ea5d683b802e7a7ef7cb586421ac61c813af88f6 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
@@ -325,18 +325,12 @@ int init_pollers()
        if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
                goto fail_cache;
 
-       if (global.nbthread > 1) {
-               hap_register_per_thread_init(init_pollers_per_thread);
-               hap_register_per_thread_deinit(deinit_pollers_per_thread);
-       }
-       else if (!init_pollers_per_thread())
-               goto fail_updt;
+       hap_register_per_thread_init(init_pollers_per_thread);
+       hap_register_per_thread_deinit(deinit_pollers_per_thread);
 
        for (p = 0; p < global.maxsock; p++)
                SPIN_INIT(&fdtab[p].lock);
 
-       //memset(fd_cache, -1, global.maxsock);
-
        SPIN_INIT(&fdtab_lock);
        RWLOCK_INIT(&fdcache_lock);
        SPIN_INIT(&poll_lock);
@@ -356,8 +350,6 @@ int init_pollers()
        } while (!bp || bp->pref == 0);
        return 0;
 
- fail_updt:
-       free(fd_cache);
  fail_cache:
        free(fdinfo);
  fail_info:
@@ -384,7 +376,6 @@ void deinit_pollers() {
                        bp->term(bp);
        }
 
-       free(fd_updt);  fd_updt  = NULL;
        free(fd_cache); fd_cache = NULL;
        free(fdinfo);   fdinfo   = NULL;
        free(fdtab);    fdtab    = NULL;
index f4353e16549b9e39386813fdee24851995a9ce5c..2d2d8772398b43eb5f7893a30c9f15c4cb8935ac 100644 (file)
@@ -1179,7 +1179,7 @@ static void init(int argc, char **argv)
        global.mode = MODE_STARTING;
        next_argv = copy_argv(argc, argv);
 
-       if (!init_trash_buffers()) {
+       if (!init_trash_buffers(1)) {
                Alert("failed to initialize trash buffers.\n");
                exit(1);
        }
@@ -1196,10 +1196,10 @@ static void init(int argc, char **argv)
        /*
         * Initialize the previously static variables.
         */
-    
+
        totalconn = actconn = maxfd = listeners = stopping = 0;
        killed = 0;
-    
+
 
 #ifdef HAPROXY_MEMMAX
        global.rlimit_memmax_all = HAPROXY_MEMMAX;
@@ -1760,16 +1760,11 @@ static void init(int argc, char **argv)
                global.nbthread = 1;
 
        /* Realloc trash buffers because global.tune.bufsize may have changed */
-       if (!init_trash_buffers()) {
+       if (!init_trash_buffers(0)) {
                Alert("failed to initialize trash buffers.\n");
                exit(1);
        }
 
-       if (!init_log_buffers()) {
-               Alert("failed to initialize log buffers.\n");
-               exit(1);
-       }
-
        /*
         * Note: we could register external pollers here.
         * Built-in pollers have been registered before main().
@@ -2194,7 +2189,6 @@ void deinit(void)
        pool_destroy2(pool2_stream);
        pool_destroy2(pool2_session);
        pool_destroy2(pool2_connection);
-       pool_destroy2(pool2_trash);
        pool_destroy2(pool2_requri);
        pool_destroy2(pool2_task);
        pool_destroy2(pool2_capture);
@@ -2291,7 +2285,6 @@ static void run_poll_loop()
        }
 }
 
-#ifdef USE_THREAD
 static void *run_thread_poll_loop(void *data)
 {
        struct per_thread_init_fct   *ptif;
@@ -2318,9 +2311,12 @@ static void *run_thread_poll_loop(void *data)
        list_for_each_entry(ptdf, &per_thread_deinit_list, list)
                ptdf->fct();
 
-       pthread_exit(NULL);
-}
+#ifdef USE_THREAD
+       if (tid > 0)
+               pthread_exit(NULL);
 #endif
+       return NULL;
+}
 
 /* This is the global management task for listeners. It enables listeners waiting
  * for global resources when there are enough free resource, or at least once in
@@ -2804,17 +2800,24 @@ int main(int argc, char **argv)
        /*
         * That's it : the central polling loop. Run until we stop.
         */
-       if (global.nbthread > 1) {
 #ifdef USE_THREAD
+       {
                unsigned int *tids    = calloc(global.nbthread, sizeof(unsigned int));
                pthread_t    *threads = calloc(global.nbthread, sizeof(pthread_t));
                int          i;
 
-               THREAD_SYNC_INIT((1UL << global.nbthread) - 1);
-               for (i = 0; i < global.nbthread; i++) {
+               /* Init tids array */
+               for (i = 0; i < global.nbthread; i++)
                        tids[i] = i;
+
+               /* Create nbthread-1 thread. The first thread is the current process */
+               threads[0] = pthread_self();
+               for (i = 1; i < global.nbthread; i++)
                        pthread_create(&threads[i], NULL, &run_thread_poll_loop, &tids[i]);
+
 #ifdef USE_CPU_AFFINITY
+               /* Now the CPU affinity for all threads */
+               for (i = 0; i < global.nbthread; i++) {
                        if (global.cpu_map[relative_pid-1])
                                global.thread_map[relative_pid-1][i] &= global.cpu_map[relative_pid-1];
 
@@ -2822,9 +2825,14 @@ int main(int argc, char **argv)
                            global.thread_map[relative_pid-1][i]) /* only do this if the thread has a THREAD map */
                                pthread_setaffinity_np(threads[i],
                                                       sizeof(unsigned long), (void *)&global.thread_map[relative_pid-1][i]);
-#endif
                }
-               for (i = 0; i < global.nbthread; i++)
+#endif /* !USE_CPU_AFFINITY */
+
+               /* Finally, start the poll loop for the first thread */
+               run_thread_poll_loop(&tids[0]);
+
+               /* Wait the end of other threads */
+               for (i = 1; i < global.nbthread; i++)
                        pthread_join(threads[i], NULL);
 
                free(tids);
@@ -2833,31 +2841,13 @@ int main(int argc, char **argv)
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
                show_lock_stats();
 #endif
-
-#endif /* USE_THREAD */
        }
-       else {
-               tid = 0;
+#else /* ! USE_THREAD */
 
-#ifdef USE_THREAD
-#ifdef USE_CPU_AFFINITY
-               if (global.cpu_map[relative_pid-1])
-                       global.thread_map[relative_pid-1][tid] &= global.cpu_map[relative_pid-1];
+       run_thread_poll_loop((int []){0});
 
-               if (global.thread_map[relative_pid-1][tid]) /* only do this if the thread has a THREAD map */
-                       pthread_setaffinity_np(pthread_self(),
-                                              sizeof(unsigned long), (void *)&global.thread_map[relative_pid-1][tid]);
-#endif
 #endif
 
-               if (global.mode & MODE_MWORKER)
-                       mworker_pipe_register(mworker_pipe);
-
-               protocol_enable_all();
-
-               run_poll_loop();
-       }
-
        /* Do some cleanup */
        deinit();
 
index 864560b5c4e9b27077360da8f877127fdf0280f1..773662b2b88b7b90073a204d1ffa8aae98063031 100644 (file)
--- a/src/log.c
+++ b/src/log.c
@@ -1344,14 +1344,19 @@ void init_log()
        }
 }
 
+static int init_log_buffers_per_thread()
+{
+       return init_log_buffers();
+}
+
+static void deinit_log_buffers_per_thread()
+{
+       deinit_log_buffers();
+}
+
 /* Initialize log buffers used for syslog messages */
 int init_log_buffers()
 {
-       if (global.nbthread > 1 && tid == (unsigned int)(-1)) {
-               hap_register_per_thread_init(init_log_buffers);
-               hap_register_per_thread_deinit(deinit_log_buffers);
-       }
-
        logheader = my_realloc2(logheader, global.max_syslog_len + 1);
        logheader_rfc5424 = my_realloc2(logheader_rfc5424, global.max_syslog_len + 1);
        logline = my_realloc2(logline, global.max_syslog_len + 1);
@@ -2413,6 +2418,8 @@ static struct cli_kw_list cli_kws = {{ },{
 __attribute__((constructor))
 static void __log_init(void)
 {
+       hap_register_per_thread_init(init_log_buffers_per_thread);
+       hap_register_per_thread_deinit(deinit_log_buffers_per_thread);
        cli_register_kw(&cli_kws);
 }
 /*