]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: fd/poller: turn polled_mask to group-local IDs
authorWilly Tarreau <w@1wt.eu>
Wed, 6 Jul 2022 08:37:31 +0000 (10:37 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 15 Jul 2022 18:16:30 +0000 (20:16 +0200)
This changes the signification of each bit in the polled_mask so that
now each bit represents a local thread ID for the current group instead
of a global thread ID. As such, all tests now apply to ltid_bit instead
of tid_bit.

No particular check was made to verify that the FD's tgid matches the
current one because there should be no case where this is not true. A
check was added in epoll's __fd_clo() to confirm it never differs unless
expected (soft stop under thread isolation, or master in starting mode
going to exec mode), but that doesn't prevent from doing the job: it
only consists in checking in the group's threads those that are still
polling this FD and to remove them.

Some atomic loads were added at the various locations, and most repetitive
references to polled_mask[fd].xx were turned to a local copy instead making
the code much more clear.

src/ev_epoll.c
src/ev_evports.c
src/ev_kqueue.c
src/ev_poll.c
src/ev_select.c

index ea2b24f0360744696d2c966cb47b72e03082a6a3..6a71650e8bbd1cf64c85c2070f13ba955280b0aa 100644 (file)
@@ -43,12 +43,25 @@ static int epoll_fd[MAX_THREADS] __read_mostly; // per-thread epoll_fd
 static void __fd_clo(int fd)
 {
        if (unlikely(fdtab[fd].state & FD_CLONED)) {
-               unsigned long m = polled_mask[fd].poll_recv | polled_mask[fd].poll_send;
+               unsigned long m = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv) | _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
+               int tgrp = fd_tgid(fd);
                struct epoll_event ev;
                int i;
 
-               for (i = global.nbthread - 1; i >= 0; i--)
-                       if (m & (1UL << i))
+               if (!m)
+                       return;
+
+               /* since FDs may only be shared per group and are only closed
+                * once entirely reset, it should never happen that we have to
+                * close an FD for another group, unless we're stopping from the
+                * wrong thread or during startup, which is what we're checking
+                * for. Regardless, it is not a problem to do so.
+                */
+               if (unlikely(!(global.mode & MODE_STARTING)))
+                       CHECK_IF(tgid != tgrp && !thread_isolated());
+
+               for (i = ha_tgroup_info[tgrp-1].base; i < ha_tgroup_info[tgrp-1].base + ha_tgroup_info[tgrp-1].count; i++)
+                       if (m & ha_thread_info[i].ltid_bit)
                                epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, fd, &ev);
        }
 }
@@ -57,18 +70,21 @@ static void _update_fd(int fd)
 {
        int en, opcode;
        struct epoll_event ev = { };
+       ulong pr, ps;
 
        en = fdtab[fd].state;
+       pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+       ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
 
        /* Try to force EPOLLET on FDs that support it */
        if (fdtab[fd].state & FD_ET_POSSIBLE) {
                /* already done ? */
-               if (polled_mask[fd].poll_recv & polled_mask[fd].poll_send & tid_bit)
+               if (pr & ps & ti->ltid_bit)
                        return;
 
                /* enable ET polling in both directions */
-               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
-               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
+               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
                opcode = EPOLL_CTL_ADD;
                ev.events = EPOLLIN | EPOLLRDHUP | EPOLLOUT | EPOLLET;
                goto done;
@@ -79,38 +95,35 @@ static void _update_fd(int fd)
         * needlessly unsubscribe then re-subscribe it.
         */
        if (!(en & FD_EV_READY_R) &&
-           ((en & FD_EV_ACTIVE_W) ||
-            ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit)))
+           ((en & FD_EV_ACTIVE_W) || ((ps | pr) & ti->ltid_bit)))
                en |= FD_EV_ACTIVE_R;
 
-       if ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit) {
+       if ((ps | pr) & ti->ltid_bit) {
                if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
                        /* fd removed from poll list */
                        opcode = EPOLL_CTL_DEL;
-                       if (polled_mask[fd].poll_recv & tid_bit)
-                               _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
-                       if (polled_mask[fd].poll_send & tid_bit)
-                               _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+                       if (pr & ti->ltid_bit)
+                               _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+                       if (ps & ti->ltid_bit)
+                               _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
                }
                else {
-                       if (((en & FD_EV_ACTIVE_R) != 0) ==
-                           ((polled_mask[fd].poll_recv & tid_bit) != 0) &&
-                           ((en & FD_EV_ACTIVE_W) != 0) ==
-                           ((polled_mask[fd].poll_send & tid_bit) != 0))
+                       if (((en & FD_EV_ACTIVE_R) != 0) == ((pr & ti->ltid_bit) != 0) &&
+                           ((en & FD_EV_ACTIVE_W) != 0) == ((ps & ti->ltid_bit) != 0))
                                return;
                        if (en & FD_EV_ACTIVE_R) {
-                               if (!(polled_mask[fd].poll_recv & tid_bit))
-                                       _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+                               if (!(pr & ti->ltid_bit))
+                                       _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
                        } else {
-                               if (polled_mask[fd].poll_recv & tid_bit)
-                                       _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+                               if (pr & ti->ltid_bit)
+                                       _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
                        }
                        if (en & FD_EV_ACTIVE_W) {
-                               if (!(polled_mask[fd].poll_send & tid_bit))
-                                       _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+                               if (!(ps & ti->ltid_bit))
+                                       _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
                        } else {
-                               if (polled_mask[fd].poll_send & tid_bit)
-                                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+                               if (ps & ti->ltid_bit)
+                                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
                        }
                        /* fd status changed */
                        opcode = EPOLL_CTL_MOD;
@@ -120,9 +133,9 @@ static void _update_fd(int fd)
                /* new fd in the poll list */
                opcode = EPOLL_CTL_ADD;
                if (en & FD_EV_ACTIVE_R)
-                       _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+                       _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
                if (en & FD_EV_ACTIVE_W)
-                       _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+                       _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
        }
        else {
                return;
index 25cc79b8388d24d691ff27032518522789deae30..8129e9e4e940cd1e94735753b787bb61047f5742 100644 (file)
@@ -65,38 +65,40 @@ static void _update_fd(int fd)
 {
        int en;
        int events;
+       ulong pr, ps;
 
        en = fdtab[fd].state;
+       pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+       ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
 
        if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
-               if (!(polled_mask[fd].poll_recv & tid_bit) &&
-                   !(polled_mask[fd].poll_send & tid_bit)) {
+               if (!((pr | ps) & ti->ltid_bit)) {
                        /* fd was not watched, it's still not */
                        return;
                }
                /* fd totally removed from poll list */
                events = 0;
-               if (polled_mask[fd].poll_recv & tid_bit)
-                       _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
-               if (polled_mask[fd].poll_send & tid_bit)
-                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+               if (pr & ti->ltid_bit)
+                       _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+               if (ps & ti->ltid_bit)
+                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
        }
        else {
                /* OK fd has to be monitored, it was either added or changed */
                events = evports_state_to_events(en);
                if (en & FD_EV_ACTIVE_R) {
-                       if (!(polled_mask[fd].poll_recv & tid_bit))
-                               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+                       if (!(pr & ti->ltid_bit))
+                               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
                } else {
-                       if (polled_mask[fd].poll_recv & tid_bit)
-                               _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+                       if (pr & ti->ltid_bit)
+                               _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
                }
                if (en & FD_EV_ACTIVE_W) {
-                       if (!(polled_mask[fd].poll_send & tid_bit))
-                               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+                       if (!(ps & ti->ltid_bit))
+                               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
                } else {
-                       if (polled_mask[fd].poll_send & tid_bit)
-                               _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+                       if (ps & ti->ltid_bit)
+                               _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
                }
 
        }
index 42a1bc5bece45a6e066e843549f5f88e22bf369b..79ddf7d5179c54ffed87ef4dc9682f2d2a15a31d 100644 (file)
@@ -36,46 +36,48 @@ static int _update_fd(int fd, int start)
 {
        int en;
        int changes = start;
+       ulong pr, ps;
 
        en = fdtab[fd].state;
+       pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+       ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
 
        if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
-               if (!(polled_mask[fd].poll_recv & tid_bit) &&
-                   !(polled_mask[fd].poll_send & tid_bit)) {
+               if (!((pr | ps) & ti->ltid_bit)) {
                        /* fd was not watched, it's still not */
                        return changes;
                }
                /* fd totally removed from poll list */
                EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
                EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-               if (polled_mask[fd].poll_recv & tid_bit)
-                       _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
-               if (polled_mask[fd].poll_send & tid_bit)
-                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+               if (pr & ti->ltid_bit)
+                       _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+               if (ps & ti->ltid_bit)
+                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
        }
        else {
                /* OK fd has to be monitored, it was either added or changed */
 
                if (en & FD_EV_ACTIVE_R) {
-                       if (!(polled_mask[fd].poll_recv & tid_bit)) {
+                       if (!(pr & ti->ltid_bit)) {
                                EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
-                               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+                               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
                        }
                }
-               else if (polled_mask[fd].poll_recv & tid_bit) {
+               else if (pr & ti->ltid_bit) {
                        EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-                       HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+                       HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
                }
 
                if (en & FD_EV_ACTIVE_W) {
-                       if (!(polled_mask[fd].poll_send & tid_bit)) {
+                       if (!(ps & ti->ltid_bit)) {
                                EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
-                               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+                               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
                        }
                }
-               else if (polled_mask[fd].poll_send & tid_bit) {
+               else if (ps & ti->ltid_bit) {
                        EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+                       _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
                }
 
        }
index 093184f6d868ea0e681330e87244c0e5948190dc..a7fadeab9241c949212593399672f7248cf0f073 100644 (file)
@@ -47,15 +47,18 @@ static void __fd_clo(int fd)
 static void _update_fd(int fd, int *max_add_fd)
 {
        int en;
+       ulong pr, ps;
 
        en = fdtab[fd].state;
+       pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+       ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
 
        /* we have a single state for all threads, which is why we
         * don't check the tid_bit. First thread to see the update
         * takes it for every other one.
         */
        if (!(en & FD_EV_ACTIVE_RW)) {
-               if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
+               if (!(pr | ps)) {
                        /* fd was not watched, it's still not */
                        return;
                }
@@ -69,22 +72,22 @@ static void _update_fd(int fd, int *max_add_fd)
                /* OK fd has to be monitored, it was either added or changed */
                if (!(en & FD_EV_ACTIVE_R)) {
                        hap_fd_clr(fd, fd_evts[DIR_RD]);
-                       if (polled_mask[fd].poll_recv & tid_bit)
-                               _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+                       if (pr & ti->ltid_bit)
+                               _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
                } else {
                        hap_fd_set(fd, fd_evts[DIR_RD]);
-                       if (!(polled_mask[fd].poll_recv & tid_bit))
-                               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+                       if (!(pr & ti->ltid_bit))
+                               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
                }
 
                if (!(en & FD_EV_ACTIVE_W)) {
                        hap_fd_clr(fd, fd_evts[DIR_WR]);
-                       if (polled_mask[fd].poll_send & tid_bit)
-                               _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
-               }else {
+                       if (ps & ti->ltid_bit)
+                               _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
+               } else {
                        hap_fd_set(fd, fd_evts[DIR_WR]);
-                       if (!(polled_mask[fd].poll_send & tid_bit))
-                               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+                       if (!(ps & ti->ltid_bit))
+                               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
                }
 
                if (fd > *max_add_fd)
index 86a89c72b159ecf4a4b424ca5bdb81db5951d15f..2b39a81bb1d4ceab7bf2c283704a299db69d31b9 100644 (file)
@@ -38,15 +38,18 @@ static void __fd_clo(int fd)
 static void _update_fd(int fd, int *max_add_fd)
 {
        int en;
+       ulong pr, ps;
 
        en = fdtab[fd].state;
+       pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+       ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
 
        /* we have a single state for all threads, which is why we
         * don't check the tid_bit. First thread to see the update
         * takes it for every other one.
         */
        if (!(en & FD_EV_ACTIVE_RW)) {
-               if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
+               if (!(pr | ps)) {
                        /* fd was not watched, it's still not */
                        return;
                }
@@ -60,22 +63,22 @@ static void _update_fd(int fd, int *max_add_fd)
                /* OK fd has to be monitored, it was either added or changed */
                if (!(en & FD_EV_ACTIVE_R)) {
                        hap_fd_clr(fd, fd_evts[DIR_RD]);
-                       if (polled_mask[fd].poll_recv & tid_bit)
-                               _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+                       if (pr & ti->ltid_bit)
+                               _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
                } else {
                        hap_fd_set(fd, fd_evts[DIR_RD]);
-                       if (!(polled_mask[fd].poll_recv & tid_bit))
-                               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+                       if (!(pr & ti->ltid_bit))
+                               _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
                }
 
                if (!(en & FD_EV_ACTIVE_W)) {
                        hap_fd_clr(fd, fd_evts[DIR_WR]);
-                       if (polled_mask[fd].poll_send & tid_bit)
-                               _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+                       if (ps & ti->ltid_bit)
+                               _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
                } else {
                        hap_fd_set(fd, fd_evts[DIR_WR]);
-                       if (!(polled_mask[fd].poll_send & tid_bit))
-                               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+                       if (!(ps & ti->ltid_bit))
+                               _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
                }
 
                if (fd > *max_add_fd)