From: Olivier Houchard Date: Thu, 5 Mar 2020 17:10:51 +0000 (+0100) Subject: MINOR: fd: Implement fd_takeover(). X-Git-Tag: v2.2-dev5~26 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=88516642934539d0103e3e5e0127b04a28152188;p=thirdparty%2Fhaproxy.git MINOR: fd: Implement fd_takeover(). Implement a new function, fd_takeover(), that lets you become the thread responsible for the fd. On architectures that do not have a double-width CAS, use a global rwlock. fd_set_running() was also changed to be able to compete with fd_takeover(), either using a dooble-width CAS on both running_mask and thread_mask, or by claiming a reader on the global rwlock. This extra operation should not have any measurable impact on modern architectures where threading is relevant. --- diff --git a/include/proto/fd.h b/include/proto/fd.h index 5ab05d9423..8fa7670f98 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -60,6 +60,16 @@ void fd_delete(int fd); */ void fd_remove(int fd); +/* + * Take over a FD belonging to another thread. + * Returns 0 on success, and -1 on failure. + */ +int fd_takeover(int fd, void *expected_owner); + +#ifndef HA_HAVE_CAS_DW +__decl_hathreads(HA_RWLOCK_T fd_mig_lock); +#endif + ssize_t fd_write_frag_line(int fd, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg, int nl); /* close all FDs starting from */ @@ -299,9 +309,35 @@ static inline void fd_want_send(int fd) updt_fd_polling(fd); } -static inline void fd_set_running(int fd) +/* Set the fd as currently running on the current thread. + * Retuns 0 if all goes well, or -1 if we no longer own the fd, and should + * do nothing with it. + */ +static inline int fd_set_running(int fd) { +#ifndef HA_HAVE_CAS_DW + HA_RWLOCK_RDLOCK(OTHER_LOCK, &fd_mig_lock); + if (!(fdtab[fd].thread_mask & tid_bit)) { + HA_RWLOCK_RDUNLOCK(OTHER_LOCK, &fd_mig_lock); + return -1; + } _HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit); + HA_RWLOCK_RDUNLOCK(OTHER_LOCK, &fd_mig_lock); + return 0; +#else + unsigned long old_masks[2]; + unsigned long new_masks[2]; + old_masks[0] = fdtab[fd].running_mask; + old_masks[1] = fdtab[fd].thread_mask; + do { + if (!(old_masks[1] & tid_bit)) + return -1; + new_masks[0] = fdtab[fd].running_mask | tid_bit; + new_masks[1] = old_masks[1]; + + } while (!(HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks, &new_masks))); + return 0; +#endif } static inline void fd_set_running_excl(int fd) @@ -371,7 +407,8 @@ static inline void fd_update_events(int fd, unsigned char evts) fd_may_send(fd); if (fdtab[fd].iocb && fd_active(fd)) { - fd_set_running(fd); + if (fd_set_running(fd) == -1) + return; fdtab[fd].iocb(fd); fd_clr_running(fd); } diff --git a/src/fd.c b/src/fd.c index ecc99058e6..e790f3e8d2 100644 --- a/src/fd.c +++ b/src/fd.c @@ -302,6 +302,13 @@ static void fd_dodelete(int fd, int do_close) { int locked = fdtab[fd].running_mask != tid_bit; + /* We're just trying to protect against a concurrent fd_insert() + * here, not against fd_takeother(), because either we're called + * directly from the iocb(), and we're already locked, or we're + * called from the mux tasklet, but then the mux is responsible for + * making sure the tasklet does nothing, and the connection is never + * destroyed. + */ if (locked) fd_set_running_excl(fd); @@ -328,6 +335,66 @@ static void fd_dodelete(int fd, int do_close) fd_clr_running(fd); } +#ifndef HA_HAVE_CAS_DW +__decl_hathreads(__delc_rwlock(fd_mig_lock)); +#endif + +/* + * Take over a FD belonging to another thread. + * unexpected_conn is the expected owner of the fd. + * Returns 0 on success, and -1 on failure. + */ +int fd_takeover(int fd, void *expected_owner) +{ +#ifndef HA_HAVE_CAS_DW + int ret; + + HA_RWLOCK_WRLOCK(OTHER_LOCK, &fd_mig_lock); + _HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit); + if (fdtab[fd].running_mask != tid_bit || fdtab[fd].owner != expected_owner) { + ret = -1; + _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit); + goto end; + } + fdtab[fd].thread_mask = tid_bit; + _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit); + ret = 0; +end: + HA_RWLOCK_WRUNLOCK(OTHER_LOCK, &fd_mig_lock); + /* Make sure the FD doesn't have the active bit. It is possible that + * the fd is polled by the thread that used to own it, the new thread + * is supposed to call subscribe() later, to activate polling. + */ + fd_stop_recv(fd); + return ret; +#else + unsigned long old_masks[2]; + unsigned long new_masks[2]; + + old_masks[0] = tid_bit; + old_masks[1] = fdtab[fd].thread_mask; + new_masks[0] = new_masks[1] = tid_bit; + /* protect ourself against a delete then an insert for the same fd, + * if it happens, then the owner will no longer be the expected + * connection. + */ + _HA_ATOMIC_OR(&fdtab[fd].running_mask, tid_bit); + if (fdtab[fd].owner != expected_owner) { + _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit); + return -1; + } + do { + if (old_masks[0] != tid_bit || !old_masks[1]) { + _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit); + return -1; + } + } while (!(_HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks, + &new_masks))); + _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit); + return 0; +#endif /* HW_HAVE_CAS_DW */ +} + /* Deletes an FD from the fdsets. * The file descriptor is also closed. */