From 200bd50b73f8e0e3f27e83361dc456a69747a2b8 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Thu, 29 Jul 2021 16:57:19 +0200 Subject: [PATCH] MEDIUM: fd: rely more on fd_update_events() to detect changes This function already performs a number of checks prior to calling the IOCB, and detects the change of thread (FD migration). Half of the controls are still in each poller, and these pollers also maintain activity counters for various cases. Note that the unreliable test on thread_mask was removed so that only the one performed by fd_set_running() is now used, since this one is reliable. Let's centralize all that fd-specific logic into the function and make it return a status among: FD_UPDT_DONE, // update done, nothing else to be done FD_UPDT_DEAD, // FD was already dead, ignore it FD_UPDT_CLOSED, // FD was closed FD_UPDT_MIGRATED, // FD was migrated, ignore it now Some pollers already used to call it last and have nothing to do after it, regardless of the result. epoll has to delete the FD in case a migration is detected. Overall this removes more code than it adds. --- include/haproxy/fd-t.h | 8 ++++++++ include/haproxy/fd.h | 2 +- src/ev_epoll.c | 24 +++++++++--------------- src/ev_evports.c | 29 +++++++++++++++-------------- src/ev_kqueue.c | 22 +++++++++------------- src/ev_poll.c | 22 ++++++++-------------- src/ev_select.c | 9 --------- src/fd.c | 20 ++++++++++++++++---- 8 files changed, 66 insertions(+), 70 deletions(-) diff --git a/include/haproxy/fd-t.h b/include/haproxy/fd-t.h index 4759ef2c6c..99231503a2 100644 --- a/include/haproxy/fd-t.h +++ b/include/haproxy/fd-t.h @@ -110,6 +110,14 @@ enum { #define FD_EXPORTED (1U << FD_EXPORTED_BIT) #define FD_EXCL_SYSCALL (1U << FD_EXCL_SYSCALL_BIT) +/* FD update status after fd_update_events() */ +enum { + FD_UPDT_DONE = 0, // update done, nothing else to be done + FD_UPDT_DEAD, // FD was already dead, ignore it + FD_UPDT_CLOSED, // FD was closed + FD_UPDT_MIGRATED, // FD was migrated, ignore it now +}; + /* This is the value used to mark a file descriptor as dead. This value is * negative, this is important so that tests on fd < 0 properly match. It * also has the nice property of being highly negative but neither overflowing diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h index 7d9d0e649b..8e1a442a32 100644 --- a/include/haproxy/fd.h +++ b/include/haproxy/fd.h @@ -117,7 +117,7 @@ void run_poller(); void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off); void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off); void updt_fd_polling(const int fd); -void fd_update_events(int fd, uint evts); +int fd_update_events(int fd, uint evts); /* Called from the poller to acknowledge we read an entry from the global * update list, to remove our bit from the update_mask, and remove it from diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 8810b77857..1de2e0fc4c 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -218,6 +218,7 @@ static void _do_poll(struct poller *p, int exp, int wake) for (count = 0; count < status; count++) { struct epoll_event ev; unsigned int n, e; + int ret; e = epoll_events[count].events; fd = epoll_events[count].data.fd; @@ -228,27 +229,20 @@ static void _do_poll(struct poller *p, int exp, int wake) #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif - if (!fdtab[fd].owner) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - /* FD has been migrated */ - activity[tid].poll_skip_fd++; - epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev); - _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); - _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); - continue; - } - n = ((e & EPOLLIN) ? FD_EV_READY_R : 0) | ((e & EPOLLOUT) ? FD_EV_READY_W : 0) | ((e & EPOLLRDHUP) ? FD_EV_SHUT_R : 0) | ((e & EPOLLHUP) ? FD_EV_SHUT_RW : 0) | ((e & EPOLLERR) ? FD_EV_ERR_RW : 0); - fd_update_events(fd, n); + ret = fd_update_events(fd, n); + + if (ret == FD_UPDT_MIGRATED) { + /* FD has been migrated */ + epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev); + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); + } } /* the caller will take care of cached events */ } diff --git a/src/ev_evports.c b/src/ev_evports.c index 109e59c618..c7bf4f6f70 100644 --- a/src/ev_evports.c +++ b/src/ev_evports.c @@ -213,24 +213,14 @@ static void _do_poll(struct poller *p, int exp, int wake) for (i = 0; i < nevlist; i++) { unsigned int n = 0; int events, rebind_events; + int ret; + fd = evports_evlist[i].portev_object; events = evports_evlist[i].portev_events; #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif - if (fdtab[fd].owner == NULL) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].poll_skip_fd++; - if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) - fd_updt[fd_nbupdt++] = fd; - continue; - } - /* * By virtue of receiving an event for this file descriptor, it * is no longer associated with the port in question. Store @@ -255,13 +245,24 @@ static void _do_poll(struct poller *p, int exp, int wake) * entry. If it changes, the fd will be placed on the updated * list for processing the next time we are called. */ - fd_update_events(fd, n); + ret = fd_update_events(fd, n); + + /* If the FD was already dead , skip it */ + if (ret == FD_UPDT_DEAD) + continue; + + /* disable polling on this instance if the FD was migrated */ + if (ret == FD_UPDT_MIGRATED) { + if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) + fd_updt[fd_nbupdt++] = fd; + continue; + } /* * This file descriptor was closed during the processing of * polled events. No need to reassociate. */ - if (fdtab[fd].owner == NULL) + if (ret == FD_UPDT_CLOSED) continue; /* diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index d51a833ed2..ce71484de3 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -181,23 +181,13 @@ static void _do_poll(struct poller *p, int exp, int wake) for (count = 0; count < status; count++) { unsigned int n = 0; + int ret; + fd = kev[count].ident; #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif - if (!fdtab[fd].owner) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].poll_skip_fd++; - if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) - fd_updt[fd_nbupdt++] = fd; - continue; - } - if (kev[count].filter == EVFILT_READ) { if (kev[count].data || !(kev[count].flags & EV_EOF)) n |= FD_EV_READY_R; @@ -210,7 +200,13 @@ static void _do_poll(struct poller *p, int exp, int wake) n |= FD_EV_ERR_RW; } - fd_update_events(fd, n); + ret = fd_update_events(fd, n); + + if (ret == FD_UPDT_MIGRATED) { + /* FD was migrated, let's stop polling it */ + if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) + fd_updt[fd_nbupdt++] = fd; + } } } diff --git a/src/ev_poll.c b/src/ev_poll.c index 6bd0cf4731..bb9d8f87a8 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -215,6 +215,7 @@ static void _do_poll(struct poller *p, int exp, int wake) for (count = 0; status > 0 && count < nbfd; count++) { unsigned int n; + int ret; int e = poll_events[count].revents; fd = poll_events[count].fd; @@ -230,27 +231,20 @@ static void _do_poll(struct poller *p, int exp, int wake) /* ok, we found one active fd */ status--; - if (!fdtab[fd].owner) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].poll_skip_fd++; - if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) - fd_updt[fd_nbupdt++] = fd; - continue; - } - n = ((e & POLLIN) ? FD_EV_READY_R : 0) | ((e & POLLOUT) ? FD_EV_READY_W : 0) | ((e & POLLRDHUP) ? FD_EV_SHUT_R : 0) | ((e & POLLHUP) ? FD_EV_SHUT_RW : 0) | ((e & POLLERR) ? FD_EV_ERR_RW : 0); - fd_update_events(fd, n); - } + ret = fd_update_events(fd, n); + if (ret == FD_UPDT_MIGRATED) { + /* FD was migrated, let's stop polling it */ + if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) + fd_updt[fd_nbupdt++] = fd; + } + } } diff --git a/src/ev_select.c b/src/ev_select.c index 3e5ee5a252..c143bd9168 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -209,15 +209,6 @@ static void _do_poll(struct poller *p, int exp, int wake) #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif - if (!fdtab[fd].owner) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].poll_skip_fd++; - continue; - } fd_update_events(fd, n); } diff --git a/src/fd.c b/src/fd.c index febe451618..df9b5deb2b 100644 --- a/src/fd.c +++ b/src/fd.c @@ -448,9 +448,10 @@ void updt_fd_polling(const int fd) /* Update events seen for FD and its state if needed. This should be * called by the poller, passing FD_EV_*_{R,W,RW} in . FD_EV_ERR_* * doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are - * allowed to be reported regardless of R/W readiness. + * allowed to be reported regardless of R/W readiness. Returns one of + * FD_UPDT_*. */ -void fd_update_events(int fd, uint evts) +int fd_update_events(int fd, uint evts) { unsigned long locked; uint old, new; @@ -458,9 +459,17 @@ void fd_update_events(int fd, uint evts) ti->flags &= ~TI_FL_STUCK; // this thread is still running + /* do nothing on remains of an old dead FD */ + if (!fdtab[fd].owner) { + activity[tid].poll_dead_fd++; + return FD_UPDT_DEAD; + } + /* do nothing if the FD was taken over under us */ - if (fd_set_running(fd) == -1) - return; + if (fd_set_running(fd) == -1) { + activity[tid].poll_skip_fd++; + return FD_UPDT_MIGRATED; + } locked = (fdtab[fd].thread_mask != tid_bit); @@ -523,6 +532,7 @@ void fd_update_events(int fd, uint evts) if ((fdtab[fd].running_mask & tid_bit) && fd_clr_running(fd) == 0 && !fdtab[fd].thread_mask) { _fd_delete_orphan(fd); + return FD_UPDT_CLOSED; } /* we had to stop this FD and it still must be stopped after the I/O @@ -534,6 +544,8 @@ void fd_update_events(int fd, uint evts) if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) fd_updt[fd_nbupdt++] = fd; } + + return FD_UPDT_DONE; } /* Tries to send parts from followed by parts from -- 2.39.5