#define FD_EXPORTED (1U << FD_EXPORTED_BIT)
#define FD_EXCL_SYSCALL (1U << FD_EXCL_SYSCALL_BIT)
+/* FD update status after fd_update_events() */
+enum {
+ FD_UPDT_DONE = 0, // update done, nothing else to be done
+ FD_UPDT_DEAD, // FD was already dead, ignore it
+ FD_UPDT_CLOSED, // FD was closed
+ FD_UPDT_MIGRATED, // FD was migrated, ignore it now
+};
+
/* This is the value used to mark a file descriptor as dead. This value is
* negative, this is important so that tests on fd < 0 properly match. It
* also has the nice property of being highly negative but neither overflowing
void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
void updt_fd_polling(const int fd);
-void fd_update_events(int fd, uint evts);
+int fd_update_events(int fd, uint evts);
/* Called from the poller to acknowledge we read an entry from the global
* update list, to remove our bit from the update_mask, and remove it from
for (count = 0; count < status; count++) {
struct epoll_event ev;
unsigned int n, e;
+ int ret;
e = epoll_events[count].events;
fd = epoll_events[count].data.fd;
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
- if (!fdtab[fd].owner) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- /* FD has been migrated */
- activity[tid].poll_skip_fd++;
- epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
- continue;
- }
-
n = ((e & EPOLLIN) ? FD_EV_READY_R : 0) |
((e & EPOLLOUT) ? FD_EV_READY_W : 0) |
((e & EPOLLRDHUP) ? FD_EV_SHUT_R : 0) |
((e & EPOLLHUP) ? FD_EV_SHUT_RW : 0) |
((e & EPOLLERR) ? FD_EV_ERR_RW : 0);
- fd_update_events(fd, n);
+ ret = fd_update_events(fd, n);
+
+ if (ret == FD_UPDT_MIGRATED) {
+ /* FD has been migrated */
+ epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ }
}
/* the caller will take care of cached events */
}
for (i = 0; i < nevlist; i++) {
unsigned int n = 0;
int events, rebind_events;
+ int ret;
+
fd = evports_evlist[i].portev_object;
events = evports_evlist[i].portev_events;
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
- if (fdtab[fd].owner == NULL) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].poll_skip_fd++;
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
- fd_updt[fd_nbupdt++] = fd;
- continue;
- }
-
/*
* By virtue of receiving an event for this file descriptor, it
* is no longer associated with the port in question. Store
* entry. If it changes, the fd will be placed on the updated
* list for processing the next time we are called.
*/
- fd_update_events(fd, n);
+ ret = fd_update_events(fd, n);
+
+ /* If the FD was already dead , skip it */
+ if (ret == FD_UPDT_DEAD)
+ continue;
+
+ /* disable polling on this instance if the FD was migrated */
+ if (ret == FD_UPDT_MIGRATED) {
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ fd_updt[fd_nbupdt++] = fd;
+ continue;
+ }
/*
* This file descriptor was closed during the processing of
* polled events. No need to reassociate.
*/
- if (fdtab[fd].owner == NULL)
+ if (ret == FD_UPDT_CLOSED)
continue;
/*
for (count = 0; count < status; count++) {
unsigned int n = 0;
+ int ret;
+
fd = kev[count].ident;
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
- if (!fdtab[fd].owner) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].poll_skip_fd++;
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
- fd_updt[fd_nbupdt++] = fd;
- continue;
- }
-
if (kev[count].filter == EVFILT_READ) {
if (kev[count].data || !(kev[count].flags & EV_EOF))
n |= FD_EV_READY_R;
n |= FD_EV_ERR_RW;
}
- fd_update_events(fd, n);
+ ret = fd_update_events(fd, n);
+
+ if (ret == FD_UPDT_MIGRATED) {
+ /* FD was migrated, let's stop polling it */
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ fd_updt[fd_nbupdt++] = fd;
+ }
}
}
for (count = 0; status > 0 && count < nbfd; count++) {
unsigned int n;
+ int ret;
int e = poll_events[count].revents;
fd = poll_events[count].fd;
/* ok, we found one active fd */
status--;
- if (!fdtab[fd].owner) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].poll_skip_fd++;
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
- fd_updt[fd_nbupdt++] = fd;
- continue;
- }
-
n = ((e & POLLIN) ? FD_EV_READY_R : 0) |
((e & POLLOUT) ? FD_EV_READY_W : 0) |
((e & POLLRDHUP) ? FD_EV_SHUT_R : 0) |
((e & POLLHUP) ? FD_EV_SHUT_RW : 0) |
((e & POLLERR) ? FD_EV_ERR_RW : 0);
- fd_update_events(fd, n);
- }
+ ret = fd_update_events(fd, n);
+ if (ret == FD_UPDT_MIGRATED) {
+ /* FD was migrated, let's stop polling it */
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ fd_updt[fd_nbupdt++] = fd;
+ }
+ }
}
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
- if (!fdtab[fd].owner) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].poll_skip_fd++;
- continue;
- }
fd_update_events(fd, n);
}
/* Update events seen for FD <fd> and its state if needed. This should be
* called by the poller, passing FD_EV_*_{R,W,RW} in <evts>. FD_EV_ERR_*
* doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are
- * allowed to be reported regardless of R/W readiness.
+ * allowed to be reported regardless of R/W readiness. Returns one of
+ * FD_UPDT_*.
*/
-void fd_update_events(int fd, uint evts)
+int fd_update_events(int fd, uint evts)
{
unsigned long locked;
uint old, new;
ti->flags &= ~TI_FL_STUCK; // this thread is still running
+ /* do nothing on remains of an old dead FD */
+ if (!fdtab[fd].owner) {
+ activity[tid].poll_dead_fd++;
+ return FD_UPDT_DEAD;
+ }
+
/* do nothing if the FD was taken over under us */
- if (fd_set_running(fd) == -1)
- return;
+ if (fd_set_running(fd) == -1) {
+ activity[tid].poll_skip_fd++;
+ return FD_UPDT_MIGRATED;
+ }
locked = (fdtab[fd].thread_mask != tid_bit);
if ((fdtab[fd].running_mask & tid_bit) &&
fd_clr_running(fd) == 0 && !fdtab[fd].thread_mask) {
_fd_delete_orphan(fd);
+ return FD_UPDT_CLOSED;
}
/* we had to stop this FD and it still must be stopped after the I/O
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
}
+
+ return FD_UPDT_DONE;
}
/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>