]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: polling: centralize calls to I/O callbacks
authorWilly Tarreau <w@1wt.eu>
Wed, 19 Nov 2014 18:43:05 +0000 (19:43 +0100)
committerWilly Tarreau <w@1wt.eu>
Fri, 21 Nov 2014 19:37:32 +0000 (20:37 +0100)
In order for HTTP/2 not to eat too much memory, we'll have to support
on-the-fly buffer allocation, since most streams will have an empty
request buffer at some point. Supporting allocation on the fly means
being able to sleep inside I/O callbacks if a buffer is not available.

Till now, the I/O callbacks were called from two locations :
  - when processing the cached events
  - when processing the polled events from the poller

This change cleans up the design a bit further than what was started in
1.5. It now ensures that we never call any iocb from the poller itself
and that instead, events learned by the poller are put into the cache.
The benefit is important in terms of stability : we don't have to care
anymore about the risk that new events are added into the poller while
processing its events, and we're certain that updates are processed at
a single location.

To achieve this, we now modify all the fd_* functions so that instead of
creating updates, they add/remove the fd to/from the cache depending on
its state, and only create an update when the polling status reaches a
state where it will have to change. Since the pollers make use of these
functions to notify readiness (using fd_may_recv/fd_may_send), the cache
is always up to date with the poller.

Creating updates only when the polling status needs to change saves a
significant amount of work for the pollers : a benchmark showed that on
a typical TCP proxy test, the amount of updates per connection dropped
from 11 to 1 on average. This also means that the update list is smaller
and has more chances of not thrashing too many CPU cache lines. The first
observed benefit is a net 2% performance gain on the connection rate.

A second benefit is that when a connection is accepted, it's only when
we're processing the cache, and the recv event is automatically added
into the cache *after* the current one, resulting in this event to be
processed immediately during the same loop. Previously we used to have
a second run over the updates to detect if new events were added to
catch them before waking up tasks.

The next gain will be offered by the next steps on this subject consisting
in implementing an I/O queue containing all cached events ordered by priority
just like the run queue, and to be able to leave some events pending there
as long as needed. That will allow us *not* to perform some FD processing
if it's not the proper time for this (typically keep waiting for a buffer
to be allocated if none is available for an recv()). And by only processing
a small bunch of them, we'll allow priorities to take place even at the I/O
level.

As a result of this change, functions fd_alloc_or_release_cache_entry()
and fd_process_polled_events() have disappeared, and the code dedicated
to checking for new fd events after the callback during the poll() loop
was removed as well. Despite the patch looking large, it's mostly a
change of what function is falled upon fd_*() and almost nothing was
added.

include/proto/fd.h
src/ev_epoll.c
src/ev_kqueue.c
src/ev_poll.c
src/ev_select.c
src/fd.c

index b0a478e4c294bf7b5ce286a1aaf15e92a6bfffef..87309bf0a38a24790c9a3237bf2a21905f4cd9eb 100644 (file)
@@ -81,18 +81,10 @@ void run_poller();
  */
 void fd_process_cached_events();
 
-/* Check the events attached to a file descriptor, update its cache
- * accordingly, and call the associated I/O callback. If new updates are
- * detected, the function tries to process them as well in order to save
- * wakeups after accept().
+/* Mark fd <fd> as updated for polling and allocate an entry in the update list
+ * for this if it was not already there. This can be done at any time.
  */
-void fd_process_polled_events(int fd);
-
-
-/* Mark fd <fd> as updated and allocate an entry in the update list for this if
- * it was not already there. This can be done at any time.
- */
-static inline void updt_fd(const int fd)
+static inline void updt_fd_polling(const int fd)
 {
        if (fdtab[fd].updated)
                /* already scheduled for update */
@@ -157,15 +149,22 @@ static inline int fd_compute_new_polled_status(int state)
        return state;
 }
 
-/* Automatically allocates or releases a cache entry for fd <fd> depending on
- * its new state. This is meant to be used by pollers while processing updates.
+/* This function automatically enables/disables caching for an entry depending
+ * on its state, and also possibly creates an update entry so that the poller
+ * does its job as well. It is only called on state changes.
  */
-static inline void fd_alloc_or_release_cache_entry(int fd, int new_state)
+static inline void fd_update_cache(int fd)
 {
-       /* READY and ACTIVE states (the two with both flags set) require a cache entry */
-
-       if (((new_state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
-           ((new_state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
+       /* 3 states for each direction require a polling update */
+       if ((fdtab[fd].state & (FD_EV_POLLED_R |                 FD_EV_ACTIVE_R)) == FD_EV_POLLED_R ||
+           (fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R ||
+           (fdtab[fd].state & (FD_EV_POLLED_W |                 FD_EV_ACTIVE_W)) == FD_EV_POLLED_W ||
+           (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W)
+               updt_fd_polling(fd);
+
+       /* only READY and ACTIVE states (the two with both flags set) require a cache entry */
+       if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
+           ((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
                fd_alloc_cache_entry(fd);
        }
        else {
@@ -243,7 +242,7 @@ static inline void fd_stop_recv(int fd)
        if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R))
                return; /* already disabled */
        fdtab[fd].state &= ~FD_EV_ACTIVE_R;
-       updt_fd(fd); /* need an update entry to change the state */
+       fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Disable processing send events on fd <fd> */
@@ -252,7 +251,7 @@ static inline void fd_stop_send(int fd)
        if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W))
                return; /* already disabled */
        fdtab[fd].state &= ~FD_EV_ACTIVE_W;
-       updt_fd(fd); /* need an update entry to change the state */
+       fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Disable processing of events on fd <fd> for both directions. */
@@ -261,7 +260,7 @@ static inline void fd_stop_both(int fd)
        if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_RW))
                return; /* already disabled */
        fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
-       updt_fd(fd); /* need an update entry to change the state */
+       fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
@@ -270,7 +269,7 @@ static inline void fd_cant_recv(const int fd)
        if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_R))
                return; /* already marked as blocked */
        fdtab[fd].state &= ~FD_EV_READY_R;
-       updt_fd(fd);
+       fd_update_cache(fd);
 }
 
 /* Report that FD <fd> can receive anymore without polling. */
@@ -279,7 +278,7 @@ static inline void fd_may_recv(const int fd)
        if (((unsigned int)fdtab[fd].state) & FD_EV_READY_R)
                return; /* already marked as blocked */
        fdtab[fd].state |= FD_EV_READY_R;
-       updt_fd(fd);
+       fd_update_cache(fd);
 }
 
 /* Disable readiness when polled. This is useful to interrupt reading when it
@@ -299,7 +298,7 @@ static inline void fd_cant_send(const int fd)
        if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_W))
                return; /* already marked as blocked */
        fdtab[fd].state &= ~FD_EV_READY_W;
-       updt_fd(fd);
+       fd_update_cache(fd);
 }
 
 /* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
@@ -308,7 +307,7 @@ static inline void fd_may_send(const int fd)
        if (((unsigned int)fdtab[fd].state) & FD_EV_READY_W)
                return; /* already marked as blocked */
        fdtab[fd].state |= FD_EV_READY_W;
-       updt_fd(fd);
+       fd_update_cache(fd);
 }
 
 /* Prepare FD <fd> to try to receive */
@@ -317,7 +316,7 @@ static inline void fd_want_recv(int fd)
        if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R))
                return; /* already enabled */
        fdtab[fd].state |= FD_EV_ACTIVE_R;
-       updt_fd(fd); /* need an update entry to change the state */
+       fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Prepare FD <fd> to try to send */
@@ -326,7 +325,7 @@ static inline void fd_want_send(int fd)
        if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W))
                return; /* already enabled */
        fdtab[fd].state |= FD_EV_ACTIVE_W;
-       updt_fd(fd); /* need an update entry to change the state */
+       fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Prepares <fd> for being polled */
index 9d359b2ab5d20036b27e8d2a3ee58f74d3c5a07f..755c6fa58182c2a365c0ca591a9d08a583728185 100644 (file)
@@ -69,7 +69,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
        int updt_idx;
        int wait_time;
 
-       /* first, scan the update list to find changes */
+       /* first, scan the update list to find polling changes */
        for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
                fd = fd_updt[updt_idx];
                fdtab[fd].updated = 0;
@@ -109,8 +109,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        ev.data.fd = fd;
                        epoll_ctl(epoll_fd, opcode, fd, &ev);
                }
-
-               fd_alloc_or_release_cache_entry(fd, en);
        }
        fd_nbupdt = 0;
 
@@ -175,7 +173,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        n |= FD_POLL_HUP;
 
                fdtab[fd].ev |= n;
-               fd_process_polled_events(fd);
+               if (n & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+                       fd_may_recv(fd);
+
+               if (n & (FD_POLL_OUT | FD_POLL_ERR))
+                       fd_may_send(fd);
        }
        /* the caller will take care of cached events */
 }
index 06ccaee292ab17a10e9427b91e64ae94be5f046a..2af94b647c10a44541c128ea6d42a88c77ac17a9 100644 (file)
@@ -84,8 +84,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                                }
                        }
                }
-
-               fd_alloc_or_release_cache_entry(fd, en);
        }
        if (changes)
                kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
@@ -138,7 +136,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                                fdtab[fd].ev |= FD_POLL_OUT;
                }
 
-               fd_process_polled_events(fd);
+               if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+                       fd_may_recv(fd);
+
+               if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+                       fd_may_send(fd);
        }
 }
 
index 2f6e56d3c43d4a6abe9c30982f0864f59e3b7b7f..866906c3a3515a14bcd9c2090e247d7e33099b1b 100644 (file)
@@ -93,8 +93,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        else if ((en & ~eo) & FD_EV_POLLED_W)
                                hap_fd_set(fd, fd_evts[DIR_WR]);
                }
-
-               fd_alloc_or_release_cache_entry(fd, en);
        }
        fd_nbupdt = 0;
 
@@ -164,7 +162,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                                ((e & POLLHUP) ? FD_POLL_HUP : 0);
                }
 
-               fd_process_polled_events(fd);
+               if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+                       fd_may_recv(fd);
+
+               if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+                       fd_may_send(fd);
        }
 
 }
index 5a76d44c074af54b93f791ab4693e3d67d291e05..73fe327aa838413632d08de5f1546ff8d1659549 100644 (file)
@@ -76,8 +76,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        else if ((en & ~eo) & FD_EV_POLLED_W)
                                FD_SET(fd, fd_evts[DIR_WR]);
                }
-
-               fd_alloc_or_release_cache_entry(fd, en);
        }
        fd_nbupdt = 0;
 
@@ -148,7 +146,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        if (FD_ISSET(fd, tmp_evts[DIR_WR]))
                                fdtab[fd].ev |= FD_POLL_OUT;
 
-                       fd_process_polled_events(fd);
+                       if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+                               fd_may_recv(fd);
+
+                       if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+                               fd_may_send(fd);
                }
        }
 }
index c238bc880f5f19dfe11815ae29f186f02c97709a..2a6179f9fee55b070d8c70b52e4d3778ff073364 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
@@ -199,7 +199,9 @@ void fd_delete(int fd)
 }
 
 /* Scan and process the cached events. This should be called right after
- * the poller.
+ * the poller. The loop may cause new entries to be created, for example
+ * if a listener causes an accept() to initiate a new incoming connection
+ * wanting to attempt an recv().
  */
 void fd_process_cached_events()
 {
@@ -209,12 +211,6 @@ void fd_process_cached_events()
                fd = fd_cache[entry];
                e = fdtab[fd].state;
 
-               /* Principle: events which are marked FD_EV_ACTIVE are processed
-                * with their usual I/O callback. The callback may remove the
-                * events from the cache or tag them for polling. Changes will be
-                * applied on next round. Cache entries with no more activity are
-                * automatically scheduled for removal.
-                */
                fdtab[fd].ev &= FD_POLL_STICKY;
 
                if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
@@ -226,7 +222,7 @@ void fd_process_cached_events()
                if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
                        fdtab[fd].iocb(fd);
                else
-                       updt_fd(fd);
+                       fd_release_cache_entry(fd);
 
                /* If the fd was removed from the cache, it has been
                 * replaced by the next one that we don't want to skip !
@@ -237,75 +233,6 @@ void fd_process_cached_events()
        }
 }
 
-/* Check the events attached to a file descriptor, update its cache
- * accordingly, and call the associated I/O callback. If new updates are
- * detected, the function tries to process them as well in order to save
- * wakeups after accept().
- */
-void fd_process_polled_events(int fd)
-{
-       int new_updt, old_updt;
-
-       /* First thing to do is to mark the reported events as ready, in order
-        * for them to later be continued from the cache without polling if
-        * they have to be interrupted (eg: recv fills a buffer).
-        */
-       if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
-               fd_may_recv(fd);
-
-       if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
-               fd_may_send(fd);
-
-       if (fdtab[fd].cache) {
-               /* This fd is already cached, no need to process it now. */
-               return;
-       }
-
-       if (unlikely(!fdtab[fd].iocb || !fdtab[fd].ev)) {
-               /* nothing to do */
-               return;
-       }
-
-       /* Save number of updates to detect creation of new FDs. */
-       old_updt = fd_nbupdt;
-       fdtab[fd].iocb(fd);
-
-       /* One or more fd might have been created during the iocb().
-        * This mainly happens with new incoming connections that have
-        * just been accepted, so we'd like to process them immediately
-        * for better efficiency, as it saves one useless task wakeup.
-        * Second benefit, if at the end the fds are disabled again, we can
-        * safely destroy their update entry to reduce the scope of later
-        * scans. This is the reason we scan the new entries backwards.
-        */
-       for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
-               fd = fd_updt[new_updt - 1];
-               if (!fdtab[fd].new)
-                       continue;
-
-               fdtab[fd].new = 0;
-               fdtab[fd].ev &= FD_POLL_STICKY;
-
-               if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
-                       fdtab[fd].ev |= FD_POLL_IN;
-
-               if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
-                       fdtab[fd].ev |= FD_POLL_OUT;
-
-               if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
-                       fdtab[fd].iocb(fd);
-
-               /* we can remove this update entry if it's the last one and is
-                * unused, otherwise we don't touch anything, especially given
-                * that the FD might have been closed already.
-                */
-               if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) {
-                       fdtab[fd].updated = 0;
-                       fd_nbupdt--;
-               }
-       }
-}
-
 /* disable the specified poller */
 void disable_poller(const char *poller_name)
 {