From: Christopher Faulet Date: Fri, 27 Oct 2017 11:53:47 +0000 (+0200) Subject: BUG/MEDIUM: threads: Run the poll loop on the main thread too X-Git-Tag: v1.8-rc1~112 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cd7879adc2c4;p=thirdparty%2Fhaproxy.git BUG/MEDIUM: threads: Run the poll loop on the main thread too There was a flaw in the way the threads was created. the main one was just used to create all the others and just wait to exit. Now, it is used to run a poll loop. So we only create nbthread-1 threads. This also fixes a bug about the compression filter when there is only 1 thread (nbthread == 1 or no threads support). The bug was in the way thread-local resources was initialized. per-thread init/deinit callbacks were never called for the main process. So, with nthread set to 1, some buffers remained uninitialized. --- diff --git a/include/common/chunk.h b/include/common/chunk.h index 82667733a9..1a12d0d130 100644 --- a/include/common/chunk.h +++ b/include/common/chunk.h @@ -52,7 +52,7 @@ int chunk_strcmp(const struct chunk *chk, const char *str); int chunk_strcasecmp(const struct chunk *chk, const char *str); struct chunk *get_trash_chunk(void); struct chunk *alloc_trash_chunk(void); -int init_trash_buffers(void); +int init_trash_buffers(int first); void deinit_trash_buffers(void); /* diff --git a/src/buffer.c b/src/buffer.c index e892d1e4d4..db2e053e72 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -83,19 +83,13 @@ int init_buffer() pool_free2(pool2_buffer, buffer); - if (global.nbthread > 1) { - hap_register_per_thread_init(init_buffer_per_thread); - hap_register_per_thread_deinit(deinit_buffer_per_thread); - } - else if (!init_buffer_per_thread()) - return 0; - + hap_register_per_thread_init(init_buffer_per_thread); + hap_register_per_thread_deinit(deinit_buffer_per_thread); return 1; } void deinit_buffer() { - deinit_buffer_per_thread(); pool_destroy2(pool2_buffer); } diff --git a/src/chunk.c b/src/chunk.c index 1fdcebb038..15c681a1b2 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -68,23 +68,37 @@ struct chunk *get_trash_chunk(void) */ static int alloc_trash_buffers(int bufsize) { + chunk_init(&trash, my_realloc2(trash.str, bufsize), bufsize); trash_size = bufsize; trash_buf1 = (char *)my_realloc2(trash_buf1, bufsize); trash_buf2 = (char *)my_realloc2(trash_buf2, bufsize); - pool2_trash = create_pool("trash", sizeof(struct chunk) + bufsize, MEM_F_EXACT); - return trash_buf1 && trash_buf2 && pool2_trash; + return trash.str && trash_buf1 && trash_buf2; +} + +static int init_trash_buffers_per_thread() +{ + return alloc_trash_buffers(global.tune.bufsize); +} + +static void deinit_trash_buffers_per_thread() +{ + chunk_destroy(&trash); + free(trash_buf2); + free(trash_buf1); + trash_buf2 = NULL; + trash_buf1 = NULL; } /* Initialize the trash buffers. It returns 0 if an error occurred. */ -int init_trash_buffers() +int init_trash_buffers(int first) { - if (global.nbthread > 1 && tid == (unsigned int)(-1)) { - hap_register_per_thread_init(init_trash_buffers); - hap_register_per_thread_deinit(deinit_trash_buffers); + if (!first) { + hap_register_per_thread_init(init_trash_buffers_per_thread); + hap_register_per_thread_deinit(deinit_trash_buffers_per_thread); } - - chunk_init(&trash, my_realloc2(trash.str, global.tune.bufsize), global.tune.bufsize); - if (!trash.str || !alloc_trash_buffers(global.tune.bufsize)) + pool_destroy2(pool2_trash); + pool2_trash = create_pool("trash", sizeof(struct chunk) + global.tune.bufsize, MEM_F_EXACT); + if (!pool2_trash || !alloc_trash_buffers(global.tune.bufsize)) return 0; return 1; } @@ -94,11 +108,7 @@ int init_trash_buffers() */ void deinit_trash_buffers(void) { - chunk_destroy(&trash); - free(trash_buf2); - free(trash_buf1); - trash_buf2 = NULL; - trash_buf1 = NULL; + pool_destroy2(pool2_trash); } /* diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 642b4d61f6..d26c3a21f1 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -176,6 +176,7 @@ static int init_epoll_per_thread() static void deinit_epoll_per_thread() { free(epoll_events); + epoll_events = NULL; } /* @@ -191,18 +192,11 @@ REGPRM1 static int _do_init(struct poller *p) if (epoll_fd < 0) goto fail_fd; - if (global.nbthread > 1) { - hap_register_per_thread_init(init_epoll_per_thread); - hap_register_per_thread_deinit(deinit_epoll_per_thread); - } - else if (!init_epoll_per_thread()) - goto fail_ee; + hap_register_per_thread_init(init_epoll_per_thread); + hap_register_per_thread_deinit(deinit_epoll_per_thread); return 1; - fail_ee: - close(epoll_fd); - epoll_fd = -1; fail_fd: p->pref = 0; return 0; @@ -214,14 +208,11 @@ REGPRM1 static int _do_init(struct poller *p) */ REGPRM1 static void _do_term(struct poller *p) { - free(epoll_events); - if (epoll_fd >= 0) { close(epoll_fd); epoll_fd = -1; } - epoll_events = NULL; p->private = NULL; p->pref = 0; } diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index 8f20db26a5..2ae27748a1 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -154,6 +154,7 @@ static int init_kqueue_per_thread() static void deinit_kqueue_per_thread() { free(kev); + kev = NULL; } /* @@ -169,18 +170,10 @@ REGPRM1 static int _do_init(struct poller *p) if (kqueue_fd < 0) goto fail_fd; - if (global.nbthread > 1) { - hap_register_per_thread_init(init_kqueue_per_thread); - hap_register_per_thread_deinit(deinit_kqueue_per_thread); - } - else if (!init_kqueue_per_thread()) - goto fail_kev; - + hap_register_per_thread_init(init_kqueue_per_thread); + hap_register_per_thread_deinit(deinit_kqueue_per_thread); return 1; - fail_kev: - close(kqueue_fd); - kqueue_fd = -1; fail_fd: p->pref = 0; return 0; @@ -192,8 +185,6 @@ REGPRM1 static int _do_init(struct poller *p) */ REGPRM1 static void _do_term(struct poller *p) { - free(kev); - if (kqueue_fd >= 0) { close(kqueue_fd); kqueue_fd = -1; diff --git a/src/ev_poll.c b/src/ev_poll.c index 455c4e1e91..f7632b61fb 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -191,6 +191,7 @@ static int init_poll_per_thread() static void deinit_poll_per_thread() { free(poll_events); + poll_events = NULL; } /* @@ -200,31 +201,26 @@ static void deinit_poll_per_thread() */ REGPRM1 static int _do_init(struct poller *p) { - __label__ fail_swevt, fail_srevt, fail_pe; + __label__ fail_swevt, fail_srevt; int fd_evts_bytes; p->private = NULL; fd_evts_bytes = (global.maxsock + sizeof(**fd_evts) - 1) / sizeof(**fd_evts) * sizeof(**fd_evts); - if (global.nbthread > 1) { - hap_register_per_thread_init(init_poll_per_thread); - hap_register_per_thread_deinit(deinit_poll_per_thread); - } - else if (!init_poll_per_thread()) - goto fail_pe; - if ((fd_evts[DIR_RD] = calloc(1, fd_evts_bytes)) == NULL) goto fail_srevt; if ((fd_evts[DIR_WR] = calloc(1, fd_evts_bytes)) == NULL) goto fail_swevt; + hap_register_per_thread_init(init_poll_per_thread); + hap_register_per_thread_deinit(deinit_poll_per_thread); + return 1; fail_swevt: free(fd_evts[DIR_RD]); fail_srevt: free(poll_events); - fail_pe: p->pref = 0; return 0; } @@ -237,7 +233,6 @@ REGPRM1 static void _do_term(struct poller *p) { free(fd_evts[DIR_WR]); free(fd_evts[DIR_RD]); - free(poll_events); p->private = NULL; p->pref = 0; } diff --git a/src/ev_select.c b/src/ev_select.c index 49e980f008..4f5a2d1126 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -173,8 +173,8 @@ static int init_select_per_thread() static void deinit_select_per_thread() { - free(tmp_evts[DIR_WR]); - free(tmp_evts[DIR_RD]); + free(tmp_evts[DIR_WR]); tmp_evts[DIR_WR] = NULL; + free(tmp_evts[DIR_RD]); tmp_evts[DIR_RD] = NULL; } /* @@ -193,18 +193,15 @@ REGPRM1 static int _do_init(struct poller *p) goto fail_revt; fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE; - if (global.nbthread > 1) { - hap_register_per_thread_init(init_select_per_thread); - hap_register_per_thread_deinit(deinit_select_per_thread); - } - else if (!init_select_per_thread()) - goto fail_revt; if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL) goto fail_srevt; if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL) goto fail_swevt; + hap_register_per_thread_init(init_select_per_thread); + hap_register_per_thread_deinit(deinit_select_per_thread); + return 1; fail_swevt: @@ -225,8 +222,6 @@ REGPRM1 static void _do_term(struct poller *p) { free(fd_evts[DIR_WR]); free(fd_evts[DIR_RD]); - free(tmp_evts[DIR_WR]); - free(tmp_evts[DIR_RD]); p->private = NULL; p->pref = 0; } diff --git a/src/fd.c b/src/fd.c index 58bc863819..ea5d683b80 100644 --- a/src/fd.c +++ b/src/fd.c @@ -325,18 +325,12 @@ int init_pollers() if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL) goto fail_cache; - if (global.nbthread > 1) { - hap_register_per_thread_init(init_pollers_per_thread); - hap_register_per_thread_deinit(deinit_pollers_per_thread); - } - else if (!init_pollers_per_thread()) - goto fail_updt; + hap_register_per_thread_init(init_pollers_per_thread); + hap_register_per_thread_deinit(deinit_pollers_per_thread); for (p = 0; p < global.maxsock; p++) SPIN_INIT(&fdtab[p].lock); - //memset(fd_cache, -1, global.maxsock); - SPIN_INIT(&fdtab_lock); RWLOCK_INIT(&fdcache_lock); SPIN_INIT(&poll_lock); @@ -356,8 +350,6 @@ int init_pollers() } while (!bp || bp->pref == 0); return 0; - fail_updt: - free(fd_cache); fail_cache: free(fdinfo); fail_info: @@ -384,7 +376,6 @@ void deinit_pollers() { bp->term(bp); } - free(fd_updt); fd_updt = NULL; free(fd_cache); fd_cache = NULL; free(fdinfo); fdinfo = NULL; free(fdtab); fdtab = NULL; diff --git a/src/haproxy.c b/src/haproxy.c index f4353e1654..2d2d877239 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -1179,7 +1179,7 @@ static void init(int argc, char **argv) global.mode = MODE_STARTING; next_argv = copy_argv(argc, argv); - if (!init_trash_buffers()) { + if (!init_trash_buffers(1)) { Alert("failed to initialize trash buffers.\n"); exit(1); } @@ -1196,10 +1196,10 @@ static void init(int argc, char **argv) /* * Initialize the previously static variables. */ - + totalconn = actconn = maxfd = listeners = stopping = 0; killed = 0; - + #ifdef HAPROXY_MEMMAX global.rlimit_memmax_all = HAPROXY_MEMMAX; @@ -1760,16 +1760,11 @@ static void init(int argc, char **argv) global.nbthread = 1; /* Realloc trash buffers because global.tune.bufsize may have changed */ - if (!init_trash_buffers()) { + if (!init_trash_buffers(0)) { Alert("failed to initialize trash buffers.\n"); exit(1); } - if (!init_log_buffers()) { - Alert("failed to initialize log buffers.\n"); - exit(1); - } - /* * Note: we could register external pollers here. * Built-in pollers have been registered before main(). @@ -2194,7 +2189,6 @@ void deinit(void) pool_destroy2(pool2_stream); pool_destroy2(pool2_session); pool_destroy2(pool2_connection); - pool_destroy2(pool2_trash); pool_destroy2(pool2_requri); pool_destroy2(pool2_task); pool_destroy2(pool2_capture); @@ -2291,7 +2285,6 @@ static void run_poll_loop() } } -#ifdef USE_THREAD static void *run_thread_poll_loop(void *data) { struct per_thread_init_fct *ptif; @@ -2318,9 +2311,12 @@ static void *run_thread_poll_loop(void *data) list_for_each_entry(ptdf, &per_thread_deinit_list, list) ptdf->fct(); - pthread_exit(NULL); -} +#ifdef USE_THREAD + if (tid > 0) + pthread_exit(NULL); #endif + return NULL; +} /* This is the global management task for listeners. It enables listeners waiting * for global resources when there are enough free resource, or at least once in @@ -2804,17 +2800,24 @@ int main(int argc, char **argv) /* * That's it : the central polling loop. Run until we stop. */ - if (global.nbthread > 1) { #ifdef USE_THREAD + { unsigned int *tids = calloc(global.nbthread, sizeof(unsigned int)); pthread_t *threads = calloc(global.nbthread, sizeof(pthread_t)); int i; - THREAD_SYNC_INIT((1UL << global.nbthread) - 1); - for (i = 0; i < global.nbthread; i++) { + /* Init tids array */ + for (i = 0; i < global.nbthread; i++) tids[i] = i; + + /* Create nbthread-1 thread. The first thread is the current process */ + threads[0] = pthread_self(); + for (i = 1; i < global.nbthread; i++) pthread_create(&threads[i], NULL, &run_thread_poll_loop, &tids[i]); + #ifdef USE_CPU_AFFINITY + /* Now the CPU affinity for all threads */ + for (i = 0; i < global.nbthread; i++) { if (global.cpu_map[relative_pid-1]) global.thread_map[relative_pid-1][i] &= global.cpu_map[relative_pid-1]; @@ -2822,9 +2825,14 @@ int main(int argc, char **argv) global.thread_map[relative_pid-1][i]) /* only do this if the thread has a THREAD map */ pthread_setaffinity_np(threads[i], sizeof(unsigned long), (void *)&global.thread_map[relative_pid-1][i]); -#endif } - for (i = 0; i < global.nbthread; i++) +#endif /* !USE_CPU_AFFINITY */ + + /* Finally, start the poll loop for the first thread */ + run_thread_poll_loop(&tids[0]); + + /* Wait the end of other threads */ + for (i = 1; i < global.nbthread; i++) pthread_join(threads[i], NULL); free(tids); @@ -2833,31 +2841,13 @@ int main(int argc, char **argv) #if defined(DEBUG_THREAD) || defined(DEBUG_FULL) show_lock_stats(); #endif - -#endif /* USE_THREAD */ } - else { - tid = 0; +#else /* ! USE_THREAD */ -#ifdef USE_THREAD -#ifdef USE_CPU_AFFINITY - if (global.cpu_map[relative_pid-1]) - global.thread_map[relative_pid-1][tid] &= global.cpu_map[relative_pid-1]; + run_thread_poll_loop((int []){0}); - if (global.thread_map[relative_pid-1][tid]) /* only do this if the thread has a THREAD map */ - pthread_setaffinity_np(pthread_self(), - sizeof(unsigned long), (void *)&global.thread_map[relative_pid-1][tid]); -#endif #endif - if (global.mode & MODE_MWORKER) - mworker_pipe_register(mworker_pipe); - - protocol_enable_all(); - - run_poll_loop(); - } - /* Do some cleanup */ deinit(); diff --git a/src/log.c b/src/log.c index 864560b5c4..773662b2b8 100644 --- a/src/log.c +++ b/src/log.c @@ -1344,14 +1344,19 @@ void init_log() } } +static int init_log_buffers_per_thread() +{ + return init_log_buffers(); +} + +static void deinit_log_buffers_per_thread() +{ + deinit_log_buffers(); +} + /* Initialize log buffers used for syslog messages */ int init_log_buffers() { - if (global.nbthread > 1 && tid == (unsigned int)(-1)) { - hap_register_per_thread_init(init_log_buffers); - hap_register_per_thread_deinit(deinit_log_buffers); - } - logheader = my_realloc2(logheader, global.max_syslog_len + 1); logheader_rfc5424 = my_realloc2(logheader_rfc5424, global.max_syslog_len + 1); logline = my_realloc2(logline, global.max_syslog_len + 1); @@ -2413,6 +2418,8 @@ static struct cli_kw_list cli_kws = {{ },{ __attribute__((constructor)) static void __log_init(void) { + hap_register_per_thread_init(init_log_buffers_per_thread); + hap_register_per_thread_deinit(deinit_log_buffers_per_thread); cli_register_kw(&cli_kws); } /*