static void __fd_clo(int fd)
{
if (unlikely(fdtab[fd].state & FD_CLONED)) {
- unsigned long m = polled_mask[fd].poll_recv | polled_mask[fd].poll_send;
+ unsigned long m = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv) | _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
+ int tgrp = fd_tgid(fd);
struct epoll_event ev;
int i;
- for (i = global.nbthread - 1; i >= 0; i--)
- if (m & (1UL << i))
+ if (!m)
+ return;
+
+ /* since FDs may only be shared per group and are only closed
+ * once entirely reset, it should never happen that we have to
+ * close an FD for another group, unless we're stopping from the
+ * wrong thread or during startup, which is what we're checking
+ * for. Regardless, it is not a problem to do so.
+ */
+ if (unlikely(!(global.mode & MODE_STARTING)))
+ CHECK_IF(tgid != tgrp && !thread_isolated());
+
+ for (i = ha_tgroup_info[tgrp-1].base; i < ha_tgroup_info[tgrp-1].base + ha_tgroup_info[tgrp-1].count; i++)
+ if (m & ha_thread_info[i].ltid_bit)
epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, fd, &ev);
}
}
{
int en, opcode;
struct epoll_event ev = { };
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* Try to force EPOLLET on FDs that support it */
if (fdtab[fd].state & FD_ET_POSSIBLE) {
/* already done ? */
- if (polled_mask[fd].poll_recv & polled_mask[fd].poll_send & tid_bit)
+ if (pr & ps & ti->ltid_bit)
return;
/* enable ET polling in both directions */
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
opcode = EPOLL_CTL_ADD;
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLOUT | EPOLLET;
goto done;
* needlessly unsubscribe then re-subscribe it.
*/
if (!(en & FD_EV_READY_R) &&
- ((en & FD_EV_ACTIVE_W) ||
- ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit)))
+ ((en & FD_EV_ACTIVE_W) || ((ps | pr) & ti->ltid_bit)))
en |= FD_EV_ACTIVE_R;
- if ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit) {
+ if ((ps | pr) & ti->ltid_bit) {
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
/* fd removed from poll list */
opcode = EPOLL_CTL_DEL;
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
- if (((en & FD_EV_ACTIVE_R) != 0) ==
- ((polled_mask[fd].poll_recv & tid_bit) != 0) &&
- ((en & FD_EV_ACTIVE_W) != 0) ==
- ((polled_mask[fd].poll_send & tid_bit) != 0))
+ if (((en & FD_EV_ACTIVE_R) != 0) == ((pr & ti->ltid_bit) != 0) &&
+ ((en & FD_EV_ACTIVE_W) != 0) == ((ps & ti->ltid_bit) != 0))
return;
if (en & FD_EV_ACTIVE_R) {
- if (!(polled_mask[fd].poll_recv & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (!(pr & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
} else {
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
- if (!(polled_mask[fd].poll_send & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ if (!(ps & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
} else {
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
/* fd status changed */
opcode = EPOLL_CTL_MOD;
/* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
if (en & FD_EV_ACTIVE_R)
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
if (en & FD_EV_ACTIVE_W)
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
else {
return;
{
int en;
int events;
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
- if (!(polled_mask[fd].poll_recv & tid_bit) &&
- !(polled_mask[fd].poll_send & tid_bit)) {
+ if (!((pr | ps) & ti->ltid_bit)) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
events = 0;
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
events = evports_state_to_events(en);
if (en & FD_EV_ACTIVE_R) {
- if (!(polled_mask[fd].poll_recv & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (!(pr & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
} else {
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
- if (!(polled_mask[fd].poll_send & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ if (!(ps & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
} else {
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
}
{
int en;
int changes = start;
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
- if (!(polled_mask[fd].poll_recv & tid_bit) &&
- !(polled_mask[fd].poll_send & tid_bit)) {
+ if (!((pr | ps) & ti->ltid_bit)) {
/* fd was not watched, it's still not */
return changes;
}
/* fd totally removed from poll list */
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (en & FD_EV_ACTIVE_R) {
- if (!(polled_mask[fd].poll_recv & tid_bit)) {
+ if (!(pr & ti->ltid_bit)) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
}
- else if (polled_mask[fd].poll_recv & tid_bit) {
+ else if (pr & ti->ltid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
- if (!(polled_mask[fd].poll_send & tid_bit)) {
+ if (!(ps & ti->ltid_bit)) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
}
- else if (polled_mask[fd].poll_send & tid_bit) {
+ else if (ps & ti->ltid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
}
static void _update_fd(int fd, int *max_add_fd)
{
int en;
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_ACTIVE_RW)) {
- if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
+ if (!(pr | ps)) {
/* fd was not watched, it's still not */
return;
}
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_ACTIVE_R)) {
hap_fd_clr(fd, fd_evts[DIR_RD]);
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_RD]);
- if (!(polled_mask[fd].poll_recv & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (!(pr & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
if (!(en & FD_EV_ACTIVE_W)) {
hap_fd_clr(fd, fd_evts[DIR_WR]);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
- }else {
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
+ } else {
hap_fd_set(fd, fd_evts[DIR_WR]);
- if (!(polled_mask[fd].poll_send & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ if (!(ps & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
if (fd > *max_add_fd)
static void _update_fd(int fd, int *max_add_fd)
{
int en;
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_ACTIVE_RW)) {
- if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
+ if (!(pr | ps)) {
/* fd was not watched, it's still not */
return;
}
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_ACTIVE_R)) {
hap_fd_clr(fd, fd_evts[DIR_RD]);
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_RD]);
- if (!(polled_mask[fd].poll_recv & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (!(pr & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
if (!(en & FD_EV_ACTIVE_W)) {
hap_fd_clr(fd, fd_evts[DIR_WR]);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_WR]);
- if (!(polled_mask[fd].poll_send & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ if (!(ps & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
if (fd > *max_add_fd)