]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: fd: Implement fd_takeover().
authorOlivier Houchard <ohouchard@haproxy.com>
Thu, 5 Mar 2020 17:10:51 +0000 (18:10 +0100)
committerOlivier Houchard <cognet@ci0.org>
Thu, 19 Mar 2020 21:07:33 +0000 (22:07 +0100)
Implement a new function, fd_takeover(), that lets you become the thread
responsible for the fd. On architectures that do not have a double-width CAS,
use a global rwlock.
fd_set_running() was also changed to be able to compete with fd_takeover(),
either using a dooble-width CAS on both running_mask and thread_mask, or
by claiming a reader on the global rwlock. This extra operation should not
have any measurable impact on modern architectures where threading is
relevant.

include/proto/fd.h
src/fd.c

index 5ab05d942396a26e0d4b04f058b644f001006ca1..8fa7670f98480d1ff1fb4bbc83b2ba691e010500 100644 (file)
@@ -60,6 +60,16 @@ void fd_delete(int fd);
  */
 void fd_remove(int fd);
 
+/*
+ * Take over a FD belonging to another thread.
+ * Returns 0 on success, and -1 on failure.
+ */
+int fd_takeover(int fd, void *expected_owner);
+
+#ifndef HA_HAVE_CAS_DW
+__decl_hathreads(HA_RWLOCK_T fd_mig_lock);
+#endif
+
 ssize_t fd_write_frag_line(int fd, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg, int nl);
 
 /* close all FDs starting from <start> */
@@ -299,9 +309,35 @@ static inline void fd_want_send(int fd)
        updt_fd_polling(fd);
 }
 
-static inline void fd_set_running(int fd)
+/* Set the fd as currently running on the current thread.
+ * Retuns 0 if all goes well, or -1 if we no longer own the fd, and should
+ * do nothing with it.
+ */
+static inline int fd_set_running(int fd)
 {
+#ifndef HA_HAVE_CAS_DW
+       HA_RWLOCK_RDLOCK(OTHER_LOCK, &fd_mig_lock);
+       if (!(fdtab[fd].thread_mask & tid_bit)) {
+               HA_RWLOCK_RDUNLOCK(OTHER_LOCK, &fd_mig_lock);
+               return -1;
+       }
        _HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit);
+       HA_RWLOCK_RDUNLOCK(OTHER_LOCK, &fd_mig_lock);
+       return 0;
+#else
+       unsigned long old_masks[2];
+       unsigned long new_masks[2];
+       old_masks[0] = fdtab[fd].running_mask;
+       old_masks[1] = fdtab[fd].thread_mask;
+       do {
+               if (!(old_masks[1] & tid_bit))
+                       return -1;
+               new_masks[0] = fdtab[fd].running_mask | tid_bit;
+               new_masks[1] = old_masks[1];
+
+       } while (!(HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks, &new_masks)));
+       return 0;
+#endif
 }
 
 static inline void fd_set_running_excl(int fd)
@@ -371,7 +407,8 @@ static inline void fd_update_events(int fd, unsigned char evts)
                fd_may_send(fd);
 
        if (fdtab[fd].iocb && fd_active(fd)) {
-               fd_set_running(fd);
+               if (fd_set_running(fd) == -1)
+                       return;
                fdtab[fd].iocb(fd);
                fd_clr_running(fd);
        }
index ecc99058e620900ec33e659889d986aa3454cac9..e790f3e8d2e8bc0fffd6f77eb533dae4934f7798 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
@@ -302,6 +302,13 @@ static void fd_dodelete(int fd, int do_close)
 {
        int locked = fdtab[fd].running_mask != tid_bit;
 
+       /* We're just trying to protect against a concurrent fd_insert()
+        * here, not against fd_takeother(), because either we're called
+        * directly from the iocb(), and we're already locked, or we're
+        * called from the mux tasklet, but then the mux is responsible for
+        * making sure the tasklet does nothing, and the connection is never
+        * destroyed.
+        */
        if (locked)
                fd_set_running_excl(fd);
 
@@ -328,6 +335,66 @@ static void fd_dodelete(int fd, int do_close)
                fd_clr_running(fd);
 }
 
+#ifndef HA_HAVE_CAS_DW
+__decl_hathreads(__delc_rwlock(fd_mig_lock));
+#endif
+
+/*
+ * Take over a FD belonging to another thread.
+ * unexpected_conn is the expected owner of the fd.
+ * Returns 0 on success, and -1 on failure.
+ */
+int fd_takeover(int fd, void *expected_owner)
+{
+#ifndef HA_HAVE_CAS_DW
+       int ret;
+
+       HA_RWLOCK_WRLOCK(OTHER_LOCK, &fd_mig_lock);
+       _HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit);
+       if (fdtab[fd].running_mask != tid_bit || fdtab[fd].owner != expected_owner) {
+               ret = -1;
+               _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
+               goto end;
+       }
+       fdtab[fd].thread_mask = tid_bit;
+       _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
+       ret = 0;
+end:
+       HA_RWLOCK_WRUNLOCK(OTHER_LOCK, &fd_mig_lock);
+       /* Make sure the FD doesn't have the active bit. It is possible that
+        * the fd is polled by the thread that used to own it, the new thread
+        * is supposed to call subscribe() later, to activate polling.
+        */
+       fd_stop_recv(fd);
+       return ret;
+#else
+       unsigned long old_masks[2];
+       unsigned long new_masks[2];
+
+       old_masks[0] = tid_bit;
+       old_masks[1] = fdtab[fd].thread_mask;
+       new_masks[0] = new_masks[1] = tid_bit;
+       /* protect ourself against a delete then an insert for the same fd,
+        * if it happens, then the owner will no longer be the expected
+        * connection.
+        */
+       _HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit);
+       if (fdtab[fd].owner != expected_owner) {
+               _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
+               return -1;
+       }
+       do {
+               if (old_masks[0] != tid_bit || !old_masks[1]) {
+                       _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
+                       return -1;
+               }
+       } while (!(_HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks,
+                  &new_masks)));
+       _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
+       return 0;
+#endif /* HW_HAVE_CAS_DW */
+}
+
 /* Deletes an FD from the fdsets.
  * The file descriptor is also closed.
  */