]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: fd: rely more on fd_update_events() to detect changes
authorWilly Tarreau <w@1wt.eu>
Thu, 29 Jul 2021 14:57:19 +0000 (16:57 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 30 Jul 2021 15:45:18 +0000 (17:45 +0200)
This function already performs a number of checks prior to calling the
IOCB, and detects the change of thread (FD migration). Half of the
controls are still in each poller, and these pollers also maintain
activity counters for various cases.

Note that the unreliable test on thread_mask was removed so that only
the one performed by fd_set_running() is now used, since this one is
reliable.

Let's centralize all that fd-specific logic into the function and make
it return a status among:

  FD_UPDT_DONE,        // update done, nothing else to be done
  FD_UPDT_DEAD,        // FD was already dead, ignore it
  FD_UPDT_CLOSED,      // FD was closed
  FD_UPDT_MIGRATED,    // FD was migrated, ignore it now

Some pollers already used to call it last and have nothing to do after
it, regardless of the result. epoll has to delete the FD in case a
migration is detected. Overall this removes more code than it adds.

include/haproxy/fd-t.h
include/haproxy/fd.h
src/ev_epoll.c
src/ev_evports.c
src/ev_kqueue.c
src/ev_poll.c
src/ev_select.c
src/fd.c

index 4759ef2c6c7980e2e9042488ea74fc6b25ff69f2..99231503a29fd3dab2e414a4a95cde883f7861fd 100644 (file)
@@ -110,6 +110,14 @@ enum {
 #define FD_EXPORTED         (1U << FD_EXPORTED_BIT)
 #define FD_EXCL_SYSCALL     (1U << FD_EXCL_SYSCALL_BIT)
 
+/* FD update status after fd_update_events() */
+enum {
+       FD_UPDT_DONE = 0,    // update done, nothing else to be done
+       FD_UPDT_DEAD,        // FD was already dead, ignore it
+       FD_UPDT_CLOSED,      // FD was closed
+       FD_UPDT_MIGRATED,    // FD was migrated, ignore it now
+};
+
 /* This is the value used to mark a file descriptor as dead. This value is
  * negative, this is important so that tests on fd < 0 properly match. It
  * also has the nice property of being highly negative but neither overflowing
index 7d9d0e649b1cbc2e749b98c6bc81215dc5fd69a5..8e1a442a32c823bec305b0f967abbf884483f652 100644 (file)
@@ -117,7 +117,7 @@ void run_poller();
 void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
 void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
 void updt_fd_polling(const int fd);
-void fd_update_events(int fd, uint evts);
+int fd_update_events(int fd, uint evts);
 
 /* Called from the poller to acknowledge we read an entry from the global
  * update list, to remove our bit from the update_mask, and remove it from
index 8810b77857491ec54ea37330e35aa748179a47f1..1de2e0fc4cf7d4aa5ccc0a671b5dac1f387b7d36 100644 (file)
@@ -218,6 +218,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
        for (count = 0; count < status; count++) {
                struct epoll_event ev;
                unsigned int n, e;
+               int ret;
 
                e = epoll_events[count].events;
                fd = epoll_events[count].data.fd;
@@ -228,27 +229,20 @@ static void _do_poll(struct poller *p, int exp, int wake)
 #ifdef DEBUG_FD
                _HA_ATOMIC_INC(&fdtab[fd].event_count);
 #endif
-               if (!fdtab[fd].owner) {
-                       activity[tid].poll_dead_fd++;
-                       continue;
-               }
-
-               if (!(fdtab[fd].thread_mask & tid_bit)) {
-                       /* FD has been migrated */
-                       activity[tid].poll_skip_fd++;
-                       epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
-                       _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
-                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
-                       continue;
-               }
-
                n = ((e & EPOLLIN)    ? FD_EV_READY_R : 0) |
                    ((e & EPOLLOUT)   ? FD_EV_READY_W : 0) |
                    ((e & EPOLLRDHUP) ? FD_EV_SHUT_R  : 0) |
                    ((e & EPOLLHUP)   ? FD_EV_SHUT_RW : 0) |
                    ((e & EPOLLERR)   ? FD_EV_ERR_RW  : 0);
 
-               fd_update_events(fd, n);
+               ret = fd_update_events(fd, n);
+
+               if (ret == FD_UPDT_MIGRATED) {
+                       /* FD has been migrated */
+                       epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
+                       _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+               }
        }
        /* the caller will take care of cached events */
 }
index 109e59c618e1a52053860fa26b329b4999bbff6c..c7bf4f6f7057f897f254ddede96fabcb469b20bb 100644 (file)
@@ -213,24 +213,14 @@ static void _do_poll(struct poller *p, int exp, int wake)
        for (i = 0; i < nevlist; i++) {
                unsigned int n = 0;
                int events, rebind_events;
+               int ret;
+
                fd = evports_evlist[i].portev_object;
                events = evports_evlist[i].portev_events;
 
 #ifdef DEBUG_FD
                _HA_ATOMIC_INC(&fdtab[fd].event_count);
 #endif
-               if (fdtab[fd].owner == NULL) {
-                       activity[tid].poll_dead_fd++;
-                       continue;
-               }
-
-               if (!(fdtab[fd].thread_mask & tid_bit)) {
-                       activity[tid].poll_skip_fd++;
-                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
-                               fd_updt[fd_nbupdt++] = fd;
-                       continue;
-               }
-
                /*
                 * By virtue of receiving an event for this file descriptor, it
                 * is no longer associated with the port in question.  Store
@@ -255,13 +245,24 @@ static void _do_poll(struct poller *p, int exp, int wake)
                 * entry.  If it changes, the fd will be placed on the updated
                 * list for processing the next time we are called.
                 */
-               fd_update_events(fd, n);
+               ret = fd_update_events(fd, n);
+
+               /* If the FD was already dead , skip it */
+               if (ret == FD_UPDT_DEAD)
+                       continue;
+
+               /* disable polling on this instance if the FD was migrated */
+               if (ret == FD_UPDT_MIGRATED) {
+                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+                               fd_updt[fd_nbupdt++] = fd;
+                       continue;
+               }
 
                /*
                 * This file descriptor was closed during the processing of
                 * polled events.  No need to reassociate.
                 */
-               if (fdtab[fd].owner == NULL)
+               if (ret == FD_UPDT_CLOSED)
                        continue;
 
                /*
index d51a833ed2f2df549cc9716dd682598e968ef75e..ce71484de330298c77c0ce5c5567388932c65200 100644 (file)
@@ -181,23 +181,13 @@ static void _do_poll(struct poller *p, int exp, int wake)
 
        for (count = 0; count < status; count++) {
                unsigned int n = 0;
+               int ret;
+
                fd = kev[count].ident;
 
 #ifdef DEBUG_FD
                _HA_ATOMIC_INC(&fdtab[fd].event_count);
 #endif
-               if (!fdtab[fd].owner) {
-                       activity[tid].poll_dead_fd++;
-                       continue;
-               }
-
-               if (!(fdtab[fd].thread_mask & tid_bit)) {
-                       activity[tid].poll_skip_fd++;
-                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
-                               fd_updt[fd_nbupdt++] = fd;
-                       continue;
-               }
-
                if (kev[count].filter == EVFILT_READ) {
                        if (kev[count].data || !(kev[count].flags & EV_EOF))
                                n |= FD_EV_READY_R;
@@ -210,7 +200,13 @@ static void _do_poll(struct poller *p, int exp, int wake)
                                n |= FD_EV_ERR_RW;
                }
 
-               fd_update_events(fd, n);
+               ret = fd_update_events(fd, n);
+
+               if (ret == FD_UPDT_MIGRATED) {
+                       /* FD was migrated, let's stop polling it */
+                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+                               fd_updt[fd_nbupdt++] = fd;
+               }
        }
 }
 
index 6bd0cf47317c19548d4e6e82bfa29762837ff20e..bb9d8f87a8bd02c633facc5dfbcbbe8612ccf082 100644 (file)
@@ -215,6 +215,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
 
        for (count = 0; status > 0 && count < nbfd; count++) {
                unsigned int n;
+               int ret;
                int e = poll_events[count].revents;
                fd = poll_events[count].fd;
 
@@ -230,27 +231,20 @@ static void _do_poll(struct poller *p, int exp, int wake)
                /* ok, we found one active fd */
                status--;
 
-               if (!fdtab[fd].owner) {
-                       activity[tid].poll_dead_fd++;
-                       continue;
-               }
-
-               if (!(fdtab[fd].thread_mask & tid_bit)) {
-                       activity[tid].poll_skip_fd++;
-                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
-                               fd_updt[fd_nbupdt++] = fd;
-                       continue;
-               }
-
                n = ((e & POLLIN)    ? FD_EV_READY_R : 0) |
                    ((e & POLLOUT)   ? FD_EV_READY_W : 0) |
                    ((e & POLLRDHUP) ? FD_EV_SHUT_R  : 0) |
                    ((e & POLLHUP)   ? FD_EV_SHUT_RW : 0) |
                    ((e & POLLERR)   ? FD_EV_ERR_RW  : 0);
 
-               fd_update_events(fd, n);
-       }
+               ret = fd_update_events(fd, n);
 
+               if (ret == FD_UPDT_MIGRATED) {
+                       /* FD was migrated, let's stop polling it */
+                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+                               fd_updt[fd_nbupdt++] = fd;
+               }
+       }
 }
 
 
index 3e5ee5a25244271d0729d2e407444a0992c3b4c1..c143bd916897f7cd2c72ccf11c730ef788aa8a10 100644 (file)
@@ -209,15 +209,6 @@ static void _do_poll(struct poller *p, int exp, int wake)
 #ifdef DEBUG_FD
                        _HA_ATOMIC_INC(&fdtab[fd].event_count);
 #endif
-                       if (!fdtab[fd].owner) {
-                               activity[tid].poll_dead_fd++;
-                               continue;
-                       }
-
-                       if (!(fdtab[fd].thread_mask & tid_bit)) {
-                               activity[tid].poll_skip_fd++;
-                               continue;
-                       }
 
                        fd_update_events(fd, n);
                }
index febe45161828a56db7309b84c737c44ab014bc5b..df9b5deb2bf5bfed27ab1774415bb478a6958c00 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
@@ -448,9 +448,10 @@ void updt_fd_polling(const int fd)
 /* Update events seen for FD <fd> and its state if needed. This should be
  * called by the poller, passing FD_EV_*_{R,W,RW} in <evts>. FD_EV_ERR_*
  * doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are
- * allowed to be reported regardless of R/W readiness.
+ * allowed to be reported regardless of R/W readiness. Returns one of
+ * FD_UPDT_*.
  */
-void fd_update_events(int fd, uint evts)
+int fd_update_events(int fd, uint evts)
 {
        unsigned long locked;
        uint old, new;
@@ -458,9 +459,17 @@ void fd_update_events(int fd, uint evts)
 
        ti->flags &= ~TI_FL_STUCK; // this thread is still running
 
+       /* do nothing on remains of an old dead FD */
+       if (!fdtab[fd].owner) {
+               activity[tid].poll_dead_fd++;
+               return FD_UPDT_DEAD;
+       }
+
        /* do nothing if the FD was taken over under us */
-       if (fd_set_running(fd) == -1)
-               return;
+       if (fd_set_running(fd) == -1) {
+               activity[tid].poll_skip_fd++;
+               return FD_UPDT_MIGRATED;
+       }
 
        locked = (fdtab[fd].thread_mask != tid_bit);
 
@@ -523,6 +532,7 @@ void fd_update_events(int fd, uint evts)
        if ((fdtab[fd].running_mask & tid_bit) &&
            fd_clr_running(fd) == 0 && !fdtab[fd].thread_mask) {
                _fd_delete_orphan(fd);
+               return FD_UPDT_CLOSED;
        }
 
        /* we had to stop this FD and it still must be stopped after the I/O
@@ -534,6 +544,8 @@ void fd_update_events(int fd, uint evts)
                        if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
                                fd_updt[fd_nbupdt++] = fd;
        }
+
+       return FD_UPDT_DONE;
 }
 
 /* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>