* will not take new bits in its running_mask so we have the guarantee
* that the last thread eliminating running_mask is the one allowed to
* safely delete the FD. Most of the time it will be the current thread.
+ * We still need to set and check the one-shot flag FD_MUST_CLOSE
+ * to take care of the rare cases where a thread wakes up on late I/O
+ * before the thread_mask is zero, and sets its bit in the running_mask
+ * just after the current thread finishes clearing its own bit, hence
+ * the two threads see themselves as last ones (which they really are).
*/
HA_ATOMIC_OR(&fdtab[fd].running_mask, ti->ltid_bit);
+ HA_ATOMIC_OR(&fdtab[fd].state, FD_MUST_CLOSE);
HA_ATOMIC_STORE(&fdtab[fd].thread_mask, 0);
- if (fd_clr_running(fd) == ti->ltid_bit)
- _fd_delete_orphan(fd);
+ if (fd_clr_running(fd) == ti->ltid_bit) {
+ if (HA_ATOMIC_BTR(&fdtab[fd].state, FD_MUST_CLOSE_BIT)) {
+ _fd_delete_orphan(fd);
+ }
+ }
}
/* makes the new fd non-blocking and clears all other O_* flags; this is meant
return FD_UPDT_CLOSED;
}
- /* do nothing if the FD was taken over under us */
+ /* Do not take running_mask if not strictly needed (will trigger a
+ * cosmetic BUG_ON() in fd_insert() anyway if done).
+ */
+ tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask);
+ if (!(tmask & ti->ltid_bit))
+ goto do_update;
+
+ HA_ATOMIC_OR(&fdtab[fd].running_mask, ti->ltid_bit);
+
+ /* From this point, our bit may possibly be in thread_mask, but it may
+ * still vanish, either because a takeover completed just before taking
+ * the bit above with the new owner deleting the FD, or because a
+ * takeover started just before taking the bit. In order to make sure a
+ * started takeover is complete, we need to verify that all bits of
+ * running_mask are present in thread_mask, since takeover first takes
+ * running then atomically replaces thread_mask. Once it's stable, if
+ * our bit remains there, no further takeover may happen because we
+ * hold running, but if our bit is not there it means we've lost the
+ * takeover race and have to decline touching the FD. Regarding the
+ * risk of deletion, our bit in running_mask prevents fd_delete() from
+ * finalizing the close, and the caller will leave the FD with a zero
+ * thread_mask and the FD_MUST_CLOSE flag set. It will then be our
+ * responsibility to close it.
+ */
do {
- /* make sure we read a synchronous copy of rmask and tmask
- * (tmask is only up to date if it reflects all of rmask's
- * bits).
- */
- do {
- rmask = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask);
- tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask);
- } while (rmask & ~tmask);
+ rmask = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask);
+ tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask);
+ rmask &= ~ti->ltid_bit;
+ } while (rmask & ~tmask);
- if (!(tmask & ti->ltid_bit)) {
- /* a takeover has started */
- activity[tid].poll_skip_fd++;
+ /* Now tmask is stable. Do nothing if the FD was taken over under us */
- /* Let the poller know this FD was lost */
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
- fd_updt[fd_nbupdt++] = fd;
+ if (!(tmask & ti->ltid_bit)) {
+ /* a takeover has started */
+ activity[tid].poll_skip_fd++;
- fd_drop_tgid(fd);
- return FD_UPDT_MIGRATED;
- }
- } while (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &rmask, rmask | ti->ltid_bit));
+ if (fd_clr_running(fd) == ti->ltid_bit)
+ goto closed_or_migrated;
+
+ goto do_update;
+ }
/* with running we're safe now, we can drop the reference */
fd_drop_tgid(fd);
/* another thread might have attempted to close this FD in the mean
* time (e.g. timeout task) striking on a previous thread and closing.
- * This is detected by both thread_mask and running_mask being 0 after
+ * This is detected by us being the last owners of a running_mask bit,
+ * and the thread_mask being zero. At the moment we release the running
+ * bit, a takeover may also happen, so in practice we check for our loss
+ * of the thread_mask bitboth thread_mask and running_mask being 0 after
* we remove ourselves last. There is no risk the FD gets reassigned
* to a different group since it's not released until the real close()
* in _fd_delete_orphan().
*/
- if (fd_clr_running(fd) == ti->ltid_bit && !tmask) {
- fd_drop_tgid(fd);
- _fd_delete_orphan(fd);
- return FD_UPDT_CLOSED;
- }
+ if (fd_clr_running(fd) == ti->ltid_bit && !(tmask & ti->ltid_bit))
+ goto closed_or_migrated;
/* we had to stop this FD and it still must be stopped after the I/O
* cb's changes, so let's program an update for this.
fd_drop_tgid(fd);
return FD_UPDT_DONE;
+
+ closed_or_migrated:
+ /* We only come here once we've last dropped running and the FD is
+ * not for us as per !(tmask & tid_bit). It may imply we're
+ * responsible for closing it. Otherwise it's just a migration.
+ */
+ if (HA_ATOMIC_BTR(&fdtab[fd].state, FD_MUST_CLOSE_BIT)) {
+ fd_drop_tgid(fd);
+ _fd_delete_orphan(fd);
+ return FD_UPDT_CLOSED;
+ }
+
+ /* So we were alone, no close bit, at best the FD was migrated, at
+ * worst it's in the process of being closed by another thread. We must
+ * be ultra-careful as it can be re-inserted by yet another thread as
+ * the result of socket() or accept(). Let's just tell the poller the
+ * FD was lost. If it was closed it was already removed and this will
+ * only cost an update for nothing.
+ */
+
+ do_update:
+ /* The FD is not closed but we don't want the poller to wake up for
+ * it anymore.
+ */
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
+ fd_updt[fd_nbupdt++] = fd;
+
+ fd_drop_tgid(fd);
+ return FD_UPDT_MIGRATED;
}
/* This is used by pollers at boot time to re-register desired events for