From 6b96f7289c2f401deef4bdc6e20792360807dde4 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Wed, 25 Apr 2018 16:58:25 +0200 Subject: [PATCH] BUG/MEDIUM: pollers: Use a global list for fd shared between threads. With the old model, any fd shared by multiple threads, such as listeners or dns sockets, would only be updated on one threads, so that could lead to missed event, or spurious wakeups. To avoid this, add a global list for fd that are shared, using the same implementation as the fd cache, and only remove entries from this list when every thread as updated its poller. [wt: this will need to be backported to 1.8 but differently so this patch must not be backported as-is] --- include/common/hathreads.h | 2 + include/proto/fd.h | 56 +++++++++++++++++++--- include/types/fd.h | 1 + src/ev_epoll.c | 92 +++++++++++++++++++++++------------- src/ev_kqueue.c | 84 ++++++++++++++++++++++----------- src/ev_poll.c | 95 ++++++++++++++++++++++++------------- src/ev_select.c | 97 +++++++++++++++++++++++++------------- src/fd.c | 5 +- src/hathreads.c | 2 +- 9 files changed, 299 insertions(+), 135 deletions(-) diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 0f10b48ca0..e27ecc63f6 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -256,6 +256,8 @@ void thread_exit_sync(void); int thread_no_sync(void); int thread_need_sync(void); +extern unsigned long all_threads_mask; + #if defined(DEBUG_THREAD) || defined(DEBUG_FULL) /* WARNING!!! if you update this enum, please also keep lock_label() up to date below */ diff --git a/include/proto/fd.h b/include/proto/fd.h index 543a42007e..da09731d4d 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -36,6 +36,8 @@ extern volatile struct fdlist fd_cache; extern volatile struct fdlist fd_cache_local[MAX_THREADS]; +extern volatile struct fdlist update_list; + extern unsigned long fd_cache_mask; // Mask of threads with events in the cache extern THREAD_LOCAL int *fd_updt; // FD updates list @@ -101,15 +103,57 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off); */ static inline void updt_fd_polling(const int fd) { - unsigned int oldupdt; + if (fdtab[fd].thread_mask == tid_bit) { + unsigned int oldupdt; + + /* note: we don't have a test-and-set yet in hathreads */ + + if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) + return; + + oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1; + fd_updt[oldupdt] = fd; + } else { + unsigned long update_mask = fdtab[fd].update_mask; + do { + if (update_mask == fdtab[fd].thread_mask) + return; + } while (!HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, + fdtab[fd].thread_mask)); + fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update)); + } - /* note: we don't have a test-and-set yet in hathreads */ +} - if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) - return; +/* Called from the poller to acknoledge we read an entry from the global + * update list, to remove our bit from the update_mask, and remove it from + * the list if we were the last one. + */ +static inline void done_update_polling(int fd) +{ + unsigned long update_mask; + + update_mask = HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit); + while ((update_mask & all_threads_mask)== 0) { + /* If we were the last one that had to update that entry, remove it from the list */ + fd_rm_from_fd_list(&update_list, fd, offsetof(struct fdtab, update)); + if (update_list.first == fd) + abort(); + update_mask = (volatile unsigned long)fdtab[fd].update_mask; + if ((update_mask & all_threads_mask) != 0) { + /* Maybe it's been re-updated in the meanwhile, and we + * wrongly removed it from the list, if so, re-add it + */ + fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update)); + update_mask = (volatile unsigned long)(fdtab[fd].update_mask); + /* And then check again, just in case after all it + * should be removed, even if it's very unlikely, given + * the current thread wouldn't have been able to take + * care of it yet */ + } else + break; - oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1; - fd_updt[oldupdt] = fd; + } } /* Allocates a cache entry for a file descriptor if it does not yet have one. diff --git a/include/types/fd.h b/include/types/fd.h index 0902e7fc45..aa18ebefc4 100644 --- a/include/types/fd.h +++ b/include/types/fd.h @@ -117,6 +117,7 @@ struct fdtab { unsigned long polled_mask; /* mask of thread IDs currently polling this fd */ unsigned long update_mask; /* mask of thread IDs having an update for fd */ struct fdlist_entry cache; /* Entry in the fdcache */ + struct fdlist_entry update; /* Entry in the global update list */ void (*iocb)(int fd); /* I/O handler */ void *owner; /* the connection or listener associated with this fd, NULL if closed */ unsigned char state; /* FD state for read and write directions (2*3 bits) */ diff --git a/src/ev_epoll.c b/src/ev_epoll.c index a8e57973fb..584bf64c92 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -59,16 +59,55 @@ REGPRM1 static void __fd_clo(int fd) } } +static void _update_fd(int fd) +{ + int en, opcode; + + en = fdtab[fd].state; + + if (fdtab[fd].polled_mask & tid_bit) { + if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { + /* fd removed from poll list */ + opcode = EPOLL_CTL_DEL; + HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); + } + else { + /* fd status changed */ + opcode = EPOLL_CTL_MOD; + } + } + else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) { + /* new fd in the poll list */ + opcode = EPOLL_CTL_ADD; + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + } + else { + return; + } + + /* construct the epoll events based on new state */ + ev.events = 0; + if (en & FD_EV_POLLED_R) + ev.events |= EPOLLIN | EPOLLRDHUP; + + if (en & FD_EV_POLLED_W) + ev.events |= EPOLLOUT; + + ev.data.fd = fd; + epoll_ctl(epoll_fd[tid], opcode, fd, &ev); +} + /* * Linux epoll() poller */ REGPRM2 static void _do_poll(struct poller *p, int exp) { - int status, en; - int fd, opcode; + int status; + int fd; int count; int updt_idx; int wait_time; + int old_fd; /* first, scan the update list to find polling changes */ for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) { @@ -80,40 +119,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) continue; } - en = fdtab[fd].state; - - if (fdtab[fd].polled_mask & tid_bit) { - if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { - /* fd removed from poll list */ - opcode = EPOLL_CTL_DEL; - HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); - } - else { - /* fd status changed */ - opcode = EPOLL_CTL_MOD; - } - } - else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) { - /* new fd in the poll list */ - opcode = EPOLL_CTL_ADD; - HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); - } - else { + _update_fd(fd); + } + fd_nbupdt = 0; + /* Scan the global update list */ + for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) { + if (fd == -2) { + fd = old_fd; continue; } - - /* construct the epoll events based on new state */ - ev.events = 0; - if (en & FD_EV_POLLED_R) - ev.events |= EPOLLIN | EPOLLRDHUP; - - if (en & FD_EV_POLLED_W) - ev.events |= EPOLLOUT; - - ev.data.fd = fd; - epoll_ctl(epoll_fd[tid], opcode, fd, &ev); + else if (fd <= -3) + fd = -fd -4; + if (fd == -1) + break; + if (fdtab[fd].update_mask & tid_bit) + done_update_polling(fd); + else + continue; + if (!fdtab[fd].owner) + continue; + _update_fd(fd); } - fd_nbupdt = 0; /* compute the epoll_wait() timeout */ if (!exp) diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index ebfd5d210b..926f77c743 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -33,6 +33,41 @@ static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd static THREAD_LOCAL struct kevent *kev = NULL; static struct kevent *kev_out = NULL; // Trash buffer for kevent() to write the eventlist in +static int _update_fd(int fd) +{ + int en; + int changes = 0; + + en = fdtab[fd].state; + + if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { + if (!(fdtab[fd].polled_mask & tid_bit)) { + /* fd was not watched, it's still not */ + return 0; + } + /* fd totally removed from poll list */ + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); + } + else { + /* OK fd has to be monitored, it was either added or changed */ + + if (en & FD_EV_POLLED_R) + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + else if (fdtab[fd].polled_mask & tid_bit) + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + + if (en & FD_EV_POLLED_W) + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + else if (fdtab[fd].polled_mask & tid_bit) + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + } + return changes; +} + /* * kqueue() poller */ @@ -41,8 +76,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) int status; int count, fd, delta_ms; struct timespec timeout; - int updt_idx, en; + int updt_idx; int changes = 0; + int old_fd; timeout.tv_sec = 0; timeout.tv_nsec = 0; @@ -55,35 +91,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) activity[tid].poll_drop++; continue; } - - en = fdtab[fd].state; - - if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { - if (!(fdtab[fd].polled_mask & tid_bit)) { - /* fd was not watched, it's still not */ - continue; - } - /* fd totally removed from poll list */ - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); - } - else { - /* OK fd has to be monitored, it was either added or changed */ - - if (en & FD_EV_POLLED_R) - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); - else if (fdtab[fd].polled_mask & tid_bit) - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - - if (en & FD_EV_POLLED_W) - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); - else if (fdtab[fd].polled_mask & tid_bit) - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - - HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + changes += _update_fd(fd); + } + /* Scan the global update list */ + for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) { + if (fd == -2) { + fd = old_fd; + continue; } + else if (fd <= -3) + fd = -fd -4; + if (fd == -1) + break; + if (fdtab[fd].update_mask & tid_bit) + done_update_polling(fd); + else + continue; + if (!fdtab[fd].owner) + continue; + changes += _update_fd(fd); } + if (changes) { #ifdef EV_RECEIPT kev[0].flags |= EV_RECEIPT; diff --git a/src/ev_poll.c b/src/ev_poll.c index 6093b652bb..155ac821da 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -45,6 +45,44 @@ REGPRM1 static void __fd_clo(int fd) hap_fd_clr(fd, fd_evts[DIR_WR]); } +static void _update_fd(int fd, int *max_add_fd) +{ + int en; + + en = fdtab[fd].state; + + /* we have a single state for all threads, which is why we + * don't check the tid_bit. First thread to see the update + * takes it for every other one. + */ + if (!(en & FD_EV_POLLED_RW)) { + if (!fdtab[fd].polled_mask) { + /* fd was not watched, it's still not */ + return; + } + /* fd totally removed from poll list */ + hap_fd_clr(fd, fd_evts[DIR_RD]); + hap_fd_clr(fd, fd_evts[DIR_WR]); + HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0); + } + else { + /* OK fd has to be monitored, it was either added or changed */ + if (!(en & FD_EV_POLLED_R)) + hap_fd_clr(fd, fd_evts[DIR_RD]); + else + hap_fd_set(fd, fd_evts[DIR_RD]); + + if (!(en & FD_EV_POLLED_W)) + hap_fd_clr(fd, fd_evts[DIR_WR]); + else + hap_fd_set(fd, fd_evts[DIR_WR]); + + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + if (fd > *max_add_fd) + *max_add_fd = fd; + } +} + /* * Poll() poller */ @@ -53,11 +91,12 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) int status; int fd; int wait_time; - int updt_idx, en; + int updt_idx; int fds, count; int sr, sw; int old_maxfd, new_maxfd, max_add_fd; unsigned rn, wn; /* read new, write new */ + int old_fd; max_add_fd = -1; @@ -70,39 +109,31 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) activity[tid].poll_drop++; continue; } + _update_fd(fd, &max_add_fd); + } - en = fdtab[fd].state; - - /* we have a single state for all threads, which is why we - * don't check the tid_bit. First thread to see the update - * takes it for every other one. - */ - if (!(en & FD_EV_POLLED_RW)) { - if (!fdtab[fd].polled_mask) { - /* fd was not watched, it's still not */ - continue; - } - /* fd totally removed from poll list */ - hap_fd_clr(fd, fd_evts[DIR_RD]); - hap_fd_clr(fd, fd_evts[DIR_WR]); - HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0); - } - else { - /* OK fd has to be monitored, it was either added or changed */ - if (!(en & FD_EV_POLLED_R)) - hap_fd_clr(fd, fd_evts[DIR_RD]); - else - hap_fd_set(fd, fd_evts[DIR_RD]); - - if (!(en & FD_EV_POLLED_W)) - hap_fd_clr(fd, fd_evts[DIR_WR]); - else - hap_fd_set(fd, fd_evts[DIR_WR]); - - HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); - if (fd > max_add_fd) - max_add_fd = fd; + /* Now scan the global update list */ + for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) { + if (fd == -2) { + fd = old_fd; + continue; } + else if (fd <= -3) + fd = -fd -4; + if (fd == -1) + break; + if (fdtab[fd].update_mask & tid_bit) { + /* Cheat a bit, as the state is global to all pollers + * we don't need every thread ot take care of the + * update. + */ + HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask); + done_update_polling(fd); + } else + continue; + if (!fdtab[fd].owner) + continue; + _update_fd(fd, &max_add_fd); } /* maybe we added at least one fd larger than maxfd */ diff --git a/src/ev_select.c b/src/ev_select.c index 163a45839a..ac4a360640 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -36,6 +36,44 @@ REGPRM1 static void __fd_clo(int fd) hap_fd_clr(fd, fd_evts[DIR_WR]); } +static void _update_fd(int fd, int *max_add_fd) +{ + int en; + + en = fdtab[fd].state; + + /* we have a single state for all threads, which is why we + * don't check the tid_bit. First thread to see the update + * takes it for every other one. + */ + if (!(en & FD_EV_POLLED_RW)) { + if (!fdtab[fd].polled_mask) { + /* fd was not watched, it's still not */ + return; + } + /* fd totally removed from poll list */ + hap_fd_clr(fd, fd_evts[DIR_RD]); + hap_fd_clr(fd, fd_evts[DIR_WR]); + HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0); + } + else { + /* OK fd has to be monitored, it was either added or changed */ + if (!(en & FD_EV_POLLED_R)) + hap_fd_clr(fd, fd_evts[DIR_RD]); + else + hap_fd_set(fd, fd_evts[DIR_RD]); + + if (!(en & FD_EV_POLLED_W)) + hap_fd_clr(fd, fd_evts[DIR_WR]); + else + hap_fd_set(fd, fd_evts[DIR_WR]); + + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + if (fd > *max_add_fd) + *max_add_fd = fd; + } +} + /* * Select() poller */ @@ -46,10 +84,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) struct timeval delta; int delta_ms; int fds; - int updt_idx, en; + int updt_idx; char count; int readnotnull, writenotnull; int old_maxfd, new_maxfd, max_add_fd; + int old_fd; max_add_fd = -1; @@ -62,41 +101,33 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) activity[tid].poll_drop++; continue; } - - en = fdtab[fd].state; - - /* we have a single state for all threads, which is why we - * don't check the tid_bit. First thread to see the update - * takes it for every other one. - */ - if (!(en & FD_EV_POLLED_RW)) { - if (!fdtab[fd].polled_mask) { - /* fd was not watched, it's still not */ - continue; - } - /* fd totally removed from poll list */ - hap_fd_clr(fd, fd_evts[DIR_RD]); - hap_fd_clr(fd, fd_evts[DIR_WR]); - HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0); - } - else { - /* OK fd has to be monitored, it was either added or changed */ - if (!(en & FD_EV_POLLED_R)) - hap_fd_clr(fd, fd_evts[DIR_RD]); - else - hap_fd_set(fd, fd_evts[DIR_RD]); - - if (!(en & FD_EV_POLLED_W)) - hap_fd_clr(fd, fd_evts[DIR_WR]); - else - hap_fd_set(fd, fd_evts[DIR_WR]); - - HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); - if (fd > max_add_fd) - max_add_fd = fd; + _update_fd(fd, &max_add_fd); + } + /* Now scan the global update list */ + for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) { + if (fd == -2) { + fd = old_fd; + continue; } + else if (fd <= -3) + fd = -fd -4; + if (fd == -1) + break; + if (fdtab[fd].update_mask & tid_bit) { + /* Cheat a bit, as the state is global to all pollers + * we don't need every thread ot take care of the + * update. + */ + HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask); + done_update_polling(fd); + } else + continue; + if (!fdtab[fd].owner) + continue; + _update_fd(fd, &max_add_fd); } + /* maybe we added at least one fd larger than maxfd */ for (old_maxfd = maxfd; old_maxfd <= max_add_fd; ) { if (HA_ATOMIC_CAS(&maxfd, &old_maxfd, max_add_fd + 1)) diff --git a/src/fd.c b/src/fd.c index 01de0e1ff1..4e88d308fe 100644 --- a/src/fd.c +++ b/src/fd.c @@ -169,6 +169,7 @@ int nbpollers = 0; volatile struct fdlist fd_cache ; // FD events cache volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread +volatile struct fdlist update_list; // Global update list unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache @@ -244,7 +245,6 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off) int prev; int next; int last; - lock_self: #if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW)) next_list.next = next_list.prev = -2; @@ -492,6 +492,7 @@ int init_pollers() goto fail_info; fd_cache.first = fd_cache.last = -1; + update_list.first = update_list.last = -1; hap_register_per_thread_init(init_pollers_per_thread); hap_register_per_thread_deinit(deinit_pollers_per_thread); @@ -499,7 +500,7 @@ int init_pollers() HA_SPIN_INIT(&fdtab[p].lock); /* Mark the fd as out of the fd cache */ fdtab[p].cache.next = -3; - fdtab[p].cache.next = -3; + fdtab[p].update.next = -3; } for (p = 0; p < global.nbthread; p++) fd_cache_local[p].first = fd_cache_local[p].last = -1; diff --git a/src/hathreads.c b/src/hathreads.c index 0d690f3831..5db3c21978 100644 --- a/src/hathreads.c +++ b/src/hathreads.c @@ -31,7 +31,7 @@ void thread_sync_io_handler(int fd) static HA_SPINLOCK_T sync_lock; static int threads_sync_pipe[2]; static unsigned long threads_want_sync = 0; -static unsigned long all_threads_mask = 0; +unsigned long all_threads_mask = 0; #if defined(DEBUG_THREAD) || defined(DEBUG_FULL) struct lock_stat lock_stats[LOCK_LABELS]; -- 2.39.5