]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: pollers: Drop fd events after a takeover to another tgid.
authorOlivier Houchard <ohouchard@haproxy.com>
Tue, 25 Feb 2025 17:28:45 +0000 (18:28 +0100)
committerOlivier Houchard <cognet@ci0.org>
Wed, 26 Feb 2025 12:00:18 +0000 (13:00 +0100)
In pollers that support it, provide the generation number in addition to
the fd, and, when an event happened, if the generation number is the
same, but the tgid changed, then assumed the fd was taken over by a
thread from another thread group, and just delete the event from the
current thread's poller, as we no longer want to hear about it.

src/ev_epoll.c
src/ev_evports.c
src/ev_kqueue.c

index 694b538a5cfc15218682f75b80e6d30a983f4680..1f99e7cebc2c4d0b31f45d2f58ea505408ed9143 100644 (file)
@@ -315,7 +315,15 @@ static void _do_poll(struct poller *p, int exp, int wake)
                                COUNT_IF(1, "epoll report of event on a just closed fd (harmless)");
                        }
                        continue;
-               }
+               } else if (fd_tgid(fd) != tgid) {
+                       struct epoll_event ev;
+                       /*
+                        * We've been taken over by another thread from
+                        * another thread group, give up.
+                        */
+                       epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
+                       continue;
+                }
 
                if ((e & EPOLLRDHUP) && !(cur_poller.flags & HAP_POLL_F_RDHUP))
                        _HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);
index 645f3edb1213e577fa45a916aa574092dd5835da..ddbd6acbb798616feeb58ea1011ee29ba39e92bb 100644 (file)
@@ -58,7 +58,7 @@ static inline void evports_resync_fd(int fd, int events)
        if (events == 0)
                port_dissociate(evports_fd[tid], PORT_SOURCE_FD, fd);
        else
-               port_associate(evports_fd[tid], PORT_SOURCE_FD, fd, events, NULL);
+               port_associate(evports_fd[tid], PORT_SOURCE_FD, fd, events, (void *)(uintptr_t)fdtab[fd].generation);
 }
 
 static void _update_fd(int fd)
@@ -251,10 +251,21 @@ static void _do_poll(struct poller *p, int exp, int wake)
 
        for (i = 0; i < nevlist; i++) {
                unsigned int n = 0;
+               unsigned int generation = (unsigned int)(uintptr_t)evports_evlist[i].portev_user;
                int events, rebind_events;
                int ret;
 
                fd = evports_evlist[i].portev_object;
+
+               if (generation == fdtab[fd].generation && fd_tgid(fd) != tgid) {
+                       /*
+                        * The FD was taken over by another tgid, forget about
+                        * it.
+                        */
+                       port_dissociate(evports_fd[tid], PORT_SOURCE_FD, fd);
+                       continue;
+               }
+
                events = evports_evlist[i].portev_events;
 
 #ifdef DEBUG_FD
index 125f3cd52fe0426f672ebcb688ea4fd54772fe1b..bf84fa61d139bb7b05362c8a788f385e0342e44f 100644 (file)
@@ -60,23 +60,23 @@ static int _update_fd(int fd, int start)
 
                if (en & FD_EV_ACTIVE_R) {
                        if (!(pr & ti->ltid_bit)) {
-                               EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+                               EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
                                _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
                        }
                }
                else if (pr & ti->ltid_bit) {
-                       EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+                       EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
                        HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
                }
 
                if (en & FD_EV_ACTIVE_W) {
                        if (!(ps & ti->ltid_bit)) {
-                               EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+                               EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
                                _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
                        }
                }
                else if (ps & ti->ltid_bit) {
-                       EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+                       EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
                        _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
                }
 
@@ -208,12 +208,26 @@ static void _do_poll(struct poller *p, int exp, int wake)
 
        for (count = 0; count < status; count++) {
                unsigned int n = 0;
+               unsigned int generation = (unsigned int)(uintptr_t)kev[count].udata;
 
                fd = kev[count].ident;
 
 #ifdef DEBUG_FD
                _HA_ATOMIC_INC(&fdtab[fd].event_count);
 #endif
+               if (generation == fdtab[fd].generation && fd_tgid(fd) != tgid) {
+                       struct kevent tmpkev[2];
+
+                       /*
+                        * The FD was taken over by another tgid, forget about
+                        * it.
+                        */
+                       EV_SET(&tmpkev[0], fd,  EVFILT_READ, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
+                       EV_SET(&tmpkev[1], fd,  EVFILT_WRITE, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
+                       kevent(kqueue_fd[tid], tmpkev, 2, NULL, 0, &timeout_ts);
+                       continue;
+               }
+
                if (kev[count].filter == EVFILT_READ) {
                        if (kev[count].data || !(kev[count].flags & EV_EOF))
                                n |= FD_EV_READY_R;