]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: fd/poller: turn update_mask to group-local IDs
authorWilly Tarreau <w@1wt.eu>
Tue, 5 Jul 2022 17:21:06 +0000 (19:21 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 15 Jul 2022 18:16:30 +0000 (20:16 +0200)
From now on, the FD's update_mask only refers to local thread IDs. However,
there remains a limitation, in updt_fd_polling(), we temporarily have to
check and set shared FDs against .thread_mask, which still contains global
ones. As such, nbtgroups > 1 may break (but this is not yet supported without
special build options).

include/haproxy/fd.h
src/ev_epoll.c
src/ev_evports.c
src/ev_kqueue.c
src/ev_poll.c
src/ev_select.c
src/fd.c
src/stream.c

index 601419e660f9161e08a65094bb5b3eed439a9b3e..9321777767a04c557acb950b279e5130c802c7c2 100644 (file)
@@ -131,17 +131,17 @@ static inline void done_update_polling(int fd)
 {
        unsigned long update_mask;
 
-       update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~tid_bit);
-       while ((update_mask & all_threads_mask)== 0) {
+       update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit);
+       while ((update_mask & tg->threads_enabled) == 0) {
                /* If we were the last one that had to update that entry, remove it from the list */
                fd_rm_from_fd_list(&update_list[tgid - 1], fd);
-               update_mask = (volatile unsigned long)fdtab[fd].update_mask;
-               if ((update_mask & all_threads_mask) != 0) {
+               update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
+               if ((update_mask & tg->threads_enabled) != 0) {
                        /* Maybe it's been re-updated in the meanwhile, and we
                         * wrongly removed it from the list, if so, re-add it
                         */
                        fd_add_to_fd_list(&update_list[tgid - 1], fd);
-                       update_mask = (volatile unsigned long)(fdtab[fd].update_mask);
+                       update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
                        /* And then check again, just in case after all it
                         * should be removed, even if it's very unlikely, given
                         * the current thread wouldn't have been able to take
index 6a71650e8bbd1cf64c85c2070f13ba955280b0aa..13ebc4f4d4a6b6608414309770e32f5d5005b996 100644 (file)
@@ -169,7 +169,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
        for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
                fd = fd_updt[updt_idx];
 
-               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
                if (!fdtab[fd].owner) {
                        activity[tid].poll_drop_fd++;
                        continue;
@@ -188,7 +188,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
                        fd = -fd -4;
                if (fd == -1)
                        break;
-               if (fdtab[fd].update_mask & tid_bit)
+               if (fdtab[fd].update_mask & ti->ltid_bit)
                        done_update_polling(fd);
                else
                        continue;
index 8129e9e4e940cd1e94735753b787bb61047f5742..a8fbc13fc477546335354d64b616ccc0221fc2f6 100644 (file)
@@ -126,7 +126,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
        for (i = 0; i < fd_nbupdt; i++) {
                fd = fd_updt[i];
 
-               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
                if (fdtab[fd].owner == NULL) {
                        activity[tid].poll_drop_fd++;
                        continue;
@@ -145,7 +145,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
                        fd = -fd -4;
                if (fd == -1)
                        break;
-               if (fdtab[fd].update_mask & tid_bit)
+               if (fdtab[fd].update_mask & ti->ltid_bit)
                        done_update_polling(fd);
                else
                        continue;
index 79ddf7d5179c54ffed87ef4dc9682f2d2a15a31d..70ee2ade1d4b895c4ac7a39981b8ea30930a11b2 100644 (file)
@@ -102,7 +102,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
        for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
                fd = fd_updt[updt_idx];
 
-               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
                if (!fdtab[fd].owner) {
                        activity[tid].poll_drop_fd++;
                        continue;
@@ -119,7 +119,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
                        fd = -fd -4;
                if (fd == -1)
                        break;
-               if (fdtab[fd].update_mask & tid_bit)
+               if (fdtab[fd].update_mask & ti->ltid_bit)
                        done_update_polling(fd);
                else
                        continue;
index a7fadeab9241c949212593399672f7248cf0f073..21478137048ca7a84ca147c6660b7f22ccc0e6a3 100644 (file)
@@ -116,7 +116,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
        for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
                fd = fd_updt[updt_idx];
 
-               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
                if (!fdtab[fd].owner) {
                        activity[tid].poll_drop_fd++;
                        continue;
@@ -134,12 +134,12 @@ static void _do_poll(struct poller *p, int exp, int wake)
                        fd = -fd -4;
                if (fd == -1)
                        break;
-               if (fdtab[fd].update_mask & tid_bit) {
+               if (fdtab[fd].update_mask & ti->ltid_bit) {
                        /* Cheat a bit, as the state is global to all pollers
                         * we don't need every thread to take care of the
                         * update.
                         */
-                       _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
+                       _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tg->threads_enabled);
                        done_update_polling(fd);
                } else
                        continue;
index 2b39a81bb1d4ceab7bf2c283704a299db69d31b9..eadd5888abca57fcb8e67e14fc62ab6ea387225b 100644 (file)
@@ -108,7 +108,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
        for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
                fd = fd_updt[updt_idx];
 
-               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+               _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
                if (!fdtab[fd].owner) {
                        activity[tid].poll_drop_fd++;
                        continue;
@@ -125,12 +125,12 @@ static void _do_poll(struct poller *p, int exp, int wake)
                        fd = -fd -4;
                if (fd == -1)
                        break;
-               if (fdtab[fd].update_mask & tid_bit) {
+               if (fdtab[fd].update_mask & ti->ltid_bit) {
                        /* Cheat a bit, as the state is global to all pollers
                         * we don't need every thread to take care of the
                         * update.
                         */
-                       _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
+                       _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tg->threads_enabled);
                        done_update_polling(fd);
                } else
                        continue;
index 6a7043c8962a237206ea3b5c46a89653bdff986c..45cc5f46e921e4c0f51ce2cab8e2abd45c098c96 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
@@ -459,14 +459,14 @@ int fd_takeover(int fd, void *expected_owner)
 void updt_fd_polling(const int fd)
 {
        if (all_threads_mask == 1UL || (fdtab[fd].thread_mask & all_threads_mask) == tid_bit) {
-               if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+               if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
                        return;
 
                fd_updt[fd_nbupdt++] = fd;
        } else {
                unsigned long update_mask = fdtab[fd].update_mask;
                do {
-                       if (update_mask == fdtab[fd].thread_mask)
+                       if (update_mask == fdtab[fd].thread_mask) // FIXME: this works only on thread-groups 1
                                return;
                } while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, fdtab[fd].thread_mask));
 
@@ -525,7 +525,7 @@ int fd_update_events(int fd, uint evts)
                        activity[tid].poll_skip_fd++;
 
                        /* Let the poller know this FD was lost */
-                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
                                fd_updt[fd_nbupdt++] = fd;
 
                        fd_drop_tgid(fd);
@@ -603,10 +603,10 @@ int fd_update_events(int fd, uint evts)
        /* we had to stop this FD and it still must be stopped after the I/O
         * cb's changes, so let's program an update for this.
         */
-       if (must_stop && !(fdtab[fd].update_mask & tid_bit)) {
+       if (must_stop && !(fdtab[fd].update_mask & ti->ltid_bit)) {
                if (((must_stop & FD_POLL_IN)  && !fd_recv_active(fd)) ||
                    ((must_stop & FD_POLL_OUT) && !fd_send_active(fd)))
-                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+                       if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
                                fd_updt[fd_nbupdt++] = fd;
        }
 
index 66529d4d77acd45bc447cb7b4e6ded00b11adbb5..4208885f9d3237f0764211e366540c51b7bd4191 100644 (file)
@@ -3330,7 +3330,7 @@ static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm
                                      conn->flags,
                                      conn_fd(conn),
                                      conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].state : 0,
-                                     conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
+                                     conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & ti->ltid_bit) : 0,
                                      conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
 
                }
@@ -3368,7 +3368,7 @@ static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm
                                      conn->flags,
                                      conn_fd(conn),
                                      conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].state : 0,
-                                     conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
+                                     conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & ti->ltid_bit) : 0,
                                      conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
 
                }