*/
void fd_process_cached_events();
-/* Check the events attached to a file descriptor, update its cache
- * accordingly, and call the associated I/O callback. If new updates are
- * detected, the function tries to process them as well in order to save
- * wakeups after accept().
+/* Mark fd <fd> as updated for polling and allocate an entry in the update list
+ * for this if it was not already there. This can be done at any time.
*/
-void fd_process_polled_events(int fd);
-
-
-/* Mark fd <fd> as updated and allocate an entry in the update list for this if
- * it was not already there. This can be done at any time.
- */
-static inline void updt_fd(const int fd)
+static inline void updt_fd_polling(const int fd)
{
if (fdtab[fd].updated)
/* already scheduled for update */
return state;
}
-/* Automatically allocates or releases a cache entry for fd <fd> depending on
- * its new state. This is meant to be used by pollers while processing updates.
+/* This function automatically enables/disables caching for an entry depending
+ * on its state, and also possibly creates an update entry so that the poller
+ * does its job as well. It is only called on state changes.
*/
-static inline void fd_alloc_or_release_cache_entry(int fd, int new_state)
+static inline void fd_update_cache(int fd)
{
- /* READY and ACTIVE states (the two with both flags set) require a cache entry */
-
- if (((new_state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
- ((new_state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
+ /* 3 states for each direction require a polling update */
+ if ((fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_ACTIVE_R)) == FD_EV_POLLED_R ||
+ (fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R ||
+ (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_ACTIVE_W)) == FD_EV_POLLED_W ||
+ (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W)
+ updt_fd_polling(fd);
+
+ /* only READY and ACTIVE states (the two with both flags set) require a cache entry */
+ if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
+ ((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
fd_alloc_cache_entry(fd);
}
else {
if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R))
return; /* already disabled */
fdtab[fd].state &= ~FD_EV_ACTIVE_R;
- updt_fd(fd); /* need an update entry to change the state */
+ fd_update_cache(fd); /* need an update entry to change the state */
}
/* Disable processing send events on fd <fd> */
if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W))
return; /* already disabled */
fdtab[fd].state &= ~FD_EV_ACTIVE_W;
- updt_fd(fd); /* need an update entry to change the state */
+ fd_update_cache(fd); /* need an update entry to change the state */
}
/* Disable processing of events on fd <fd> for both directions. */
if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_RW))
return; /* already disabled */
fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
- updt_fd(fd); /* need an update entry to change the state */
+ fd_update_cache(fd); /* need an update entry to change the state */
}
/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_R))
return; /* already marked as blocked */
fdtab[fd].state &= ~FD_EV_READY_R;
- updt_fd(fd);
+ fd_update_cache(fd);
}
/* Report that FD <fd> can receive anymore without polling. */
if (((unsigned int)fdtab[fd].state) & FD_EV_READY_R)
return; /* already marked as blocked */
fdtab[fd].state |= FD_EV_READY_R;
- updt_fd(fd);
+ fd_update_cache(fd);
}
/* Disable readiness when polled. This is useful to interrupt reading when it
if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_W))
return; /* already marked as blocked */
fdtab[fd].state &= ~FD_EV_READY_W;
- updt_fd(fd);
+ fd_update_cache(fd);
}
/* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
if (((unsigned int)fdtab[fd].state) & FD_EV_READY_W)
return; /* already marked as blocked */
fdtab[fd].state |= FD_EV_READY_W;
- updt_fd(fd);
+ fd_update_cache(fd);
}
/* Prepare FD <fd> to try to receive */
if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R))
return; /* already enabled */
fdtab[fd].state |= FD_EV_ACTIVE_R;
- updt_fd(fd); /* need an update entry to change the state */
+ fd_update_cache(fd); /* need an update entry to change the state */
}
/* Prepare FD <fd> to try to send */
if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W))
return; /* already enabled */
fdtab[fd].state |= FD_EV_ACTIVE_W;
- updt_fd(fd); /* need an update entry to change the state */
+ fd_update_cache(fd); /* need an update entry to change the state */
}
/* Prepares <fd> for being polled */
int updt_idx;
int wait_time;
- /* first, scan the update list to find changes */
+ /* first, scan the update list to find polling changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
fdtab[fd].updated = 0;
ev.data.fd = fd;
epoll_ctl(epoll_fd, opcode, fd, &ev);
}
-
- fd_alloc_or_release_cache_entry(fd, en);
}
fd_nbupdt = 0;
n |= FD_POLL_HUP;
fdtab[fd].ev |= n;
- fd_process_polled_events(fd);
+ if (n & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+ fd_may_recv(fd);
+
+ if (n & (FD_POLL_OUT | FD_POLL_ERR))
+ fd_may_send(fd);
}
/* the caller will take care of cached events */
}
}
}
}
-
- fd_alloc_or_release_cache_entry(fd, en);
}
if (changes)
kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
fdtab[fd].ev |= FD_POLL_OUT;
}
- fd_process_polled_events(fd);
+ if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+ fd_may_recv(fd);
+
+ if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+ fd_may_send(fd);
}
}
else if ((en & ~eo) & FD_EV_POLLED_W)
hap_fd_set(fd, fd_evts[DIR_WR]);
}
-
- fd_alloc_or_release_cache_entry(fd, en);
}
fd_nbupdt = 0;
((e & POLLHUP) ? FD_POLL_HUP : 0);
}
- fd_process_polled_events(fd);
+ if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+ fd_may_recv(fd);
+
+ if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+ fd_may_send(fd);
}
}
else if ((en & ~eo) & FD_EV_POLLED_W)
FD_SET(fd, fd_evts[DIR_WR]);
}
-
- fd_alloc_or_release_cache_entry(fd, en);
}
fd_nbupdt = 0;
if (FD_ISSET(fd, tmp_evts[DIR_WR]))
fdtab[fd].ev |= FD_POLL_OUT;
- fd_process_polled_events(fd);
+ if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+ fd_may_recv(fd);
+
+ if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+ fd_may_send(fd);
}
}
}
}
/* Scan and process the cached events. This should be called right after
- * the poller.
+ * the poller. The loop may cause new entries to be created, for example
+ * if a listener causes an accept() to initiate a new incoming connection
+ * wanting to attempt an recv().
*/
void fd_process_cached_events()
{
fd = fd_cache[entry];
e = fdtab[fd].state;
- /* Principle: events which are marked FD_EV_ACTIVE are processed
- * with their usual I/O callback. The callback may remove the
- * events from the cache or tag them for polling. Changes will be
- * applied on next round. Cache entries with no more activity are
- * automatically scheduled for removal.
- */
fdtab[fd].ev &= FD_POLL_STICKY;
if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
fdtab[fd].iocb(fd);
else
- updt_fd(fd);
+ fd_release_cache_entry(fd);
/* If the fd was removed from the cache, it has been
* replaced by the next one that we don't want to skip !
}
}
-/* Check the events attached to a file descriptor, update its cache
- * accordingly, and call the associated I/O callback. If new updates are
- * detected, the function tries to process them as well in order to save
- * wakeups after accept().
- */
-void fd_process_polled_events(int fd)
-{
- int new_updt, old_updt;
-
- /* First thing to do is to mark the reported events as ready, in order
- * for them to later be continued from the cache without polling if
- * they have to be interrupted (eg: recv fills a buffer).
- */
- if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
- fd_may_recv(fd);
-
- if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
- fd_may_send(fd);
-
- if (fdtab[fd].cache) {
- /* This fd is already cached, no need to process it now. */
- return;
- }
-
- if (unlikely(!fdtab[fd].iocb || !fdtab[fd].ev)) {
- /* nothing to do */
- return;
- }
-
- /* Save number of updates to detect creation of new FDs. */
- old_updt = fd_nbupdt;
- fdtab[fd].iocb(fd);
-
- /* One or more fd might have been created during the iocb().
- * This mainly happens with new incoming connections that have
- * just been accepted, so we'd like to process them immediately
- * for better efficiency, as it saves one useless task wakeup.
- * Second benefit, if at the end the fds are disabled again, we can
- * safely destroy their update entry to reduce the scope of later
- * scans. This is the reason we scan the new entries backwards.
- */
- for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
- fd = fd_updt[new_updt - 1];
- if (!fdtab[fd].new)
- continue;
-
- fdtab[fd].new = 0;
- fdtab[fd].ev &= FD_POLL_STICKY;
-
- if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
- fdtab[fd].ev |= FD_POLL_IN;
-
- if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
- fdtab[fd].ev |= FD_POLL_OUT;
-
- if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
- fdtab[fd].iocb(fd);
-
- /* we can remove this update entry if it's the last one and is
- * unused, otherwise we don't touch anything, especially given
- * that the FD might have been closed already.
- */
- if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) {
- fdtab[fd].updated = 0;
- fd_nbupdt--;
- }
- }
-}
-
/* disable the specified poller */
void disable_poller(const char *poller_name)
{