]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: fd: Get rid of the fd cache.
authorOlivier Houchard <ohouchard@haproxy.com>
Wed, 24 Jul 2019 16:07:06 +0000 (18:07 +0200)
committerOlivier Houchard <cognet@ci0.org>
Wed, 31 Jul 2019 12:12:55 +0000 (14:12 +0200)
Now that the architecture was changed so that attempts to receive/send data
always come from the upper layers, instead of them only trying to do so when
the lower layer let them know they could try, we can finally get rid of the
fd cache. We don't really need it anymore, and removing it gives us a small
performance boost.

13 files changed:
include/proto/fd.h
include/types/activity.h
src/cli.c
src/debug.c
src/ev_epoll.c
src/ev_evports.c
src/ev_kqueue.c
src/ev_poll.c
src/ev_select.c
src/fd.c
src/haproxy.c
src/mux_h1.c
src/session.c

index 444d1b011348ddc6c78683a4e2a0816e75743bd1..673eaae0f50a9c9144bacfd9aeb6037d58146620 100644 (file)
 
 /* public variables */
 
-extern volatile struct fdlist fd_cache;
-extern volatile struct fdlist fd_cache_local[MAX_THREADS];
-
 extern volatile struct fdlist update_list;
 
 extern unsigned long *polled_mask;
 
-extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
-
 extern THREAD_LOCAL int *fd_updt;  // FD updates list
 extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
 
@@ -51,8 +46,6 @@ extern int poller_wr_pipe[MAX_THREADS];
 
 extern volatile int ha_used_fds; // Number of FDs we're currently using
 
-__decl_hathreads(extern HA_RWLOCK_T   __attribute__((aligned(64))) fdcache_lock);    /* global lock to protect fd_cache array */
-
 /* Deletes an FD from the fdsets.
  * The file descriptor is also closed.
  */
@@ -103,11 +96,6 @@ int list_pollers(FILE *out);
  */
 void run_poller();
 
-/* Scan and process the cached events. This should be called right after
- * the poller.
- */
-void fd_process_cached_events();
-
 void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
 void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
 
@@ -165,45 +153,6 @@ static inline void done_update_polling(int fd)
        }
 }
 
-/* Allocates a cache entry for a file descriptor if it does not yet have one.
- * This can be done at any time.
- */
-static inline void fd_alloc_cache_entry(const int fd)
-{
-       _HA_ATOMIC_OR(&fd_cache_mask, fdtab[fd].thread_mask);
-       if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
-               fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd,  offsetof(struct fdtab, cache));
-       else
-               fd_add_to_fd_list(&fd_cache, fd,  offsetof(struct fdtab, cache));
-}
-
-/* Removes entry used by fd <fd> from the FD cache and replaces it with the
- * last one.
- * If the fd has no entry assigned, return immediately.
- */
-static inline void fd_release_cache_entry(const int fd)
-{
-       if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
-               fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd, offsetof(struct fdtab, cache));
-       else
-               fd_rm_from_fd_list(&fd_cache, fd, offsetof(struct fdtab, cache));
-}
-
-/* This function automatically enables/disables caching for an entry depending
- * on its state. It is only called on state changes.
- */
-static inline void fd_update_cache(int fd)
-{
-       /* only READY and ACTIVE states (the two with both flags set) require a cache entry */
-       if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
-           ((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
-               fd_alloc_cache_entry(fd);
-       }
-       else {
-               fd_release_cache_entry(fd);
-       }
-}
-
 /*
  * returns the FD's recv state (FD_EV_*)
  */
@@ -280,7 +229,6 @@ static inline int fd_active(const int fd)
 static inline void fd_stop_recv(int fd)
 {
        unsigned char old, new;
-       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -292,20 +240,12 @@ static inline void fd_stop_recv(int fd)
 
        if ((old ^ new) & FD_EV_POLLED_R)
                updt_fd_polling(fd);
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable processing send events on fd <fd> */
 static inline void fd_stop_send(int fd)
 {
        unsigned char old, new;
-       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -317,20 +257,12 @@ static inline void fd_stop_send(int fd)
 
        if ((old ^ new) & FD_EV_POLLED_W)
                updt_fd_polling(fd);
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable processing of events on fd <fd> for both directions. */
 static inline void fd_stop_both(int fd)
 {
        unsigned char old, new;
-       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -342,20 +274,12 @@ static inline void fd_stop_both(int fd)
 
        if ((old ^ new) & FD_EV_POLLED_RW)
                updt_fd_polling(fd);
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
 static inline void fd_cant_recv(const int fd)
 {
        unsigned char old, new;
-       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -368,31 +292,15 @@ static inline void fd_cant_recv(const int fd)
 
        if ((old ^ new) & FD_EV_POLLED_R)
                updt_fd_polling(fd);
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> may receive again without polling. */
 static inline void fd_may_recv(const int fd)
 {
-       unsigned long locked;
-
        /* marking ready never changes polled status */
        if ((fdtab[fd].state & FD_EV_READY_R) ||
            HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT))
                return;
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable readiness when polled. This is useful to interrupt reading when it
@@ -403,7 +311,6 @@ static inline void fd_may_recv(const int fd)
 static inline void fd_done_recv(const int fd)
 {
        unsigned char old, new;
-       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -416,20 +323,12 @@ static inline void fd_done_recv(const int fd)
 
        if ((old ^ new) & FD_EV_POLLED_R)
                updt_fd_polling(fd);
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */
 static inline void fd_cant_send(const int fd)
 {
        unsigned char old, new;
-       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -442,83 +341,47 @@ static inline void fd_cant_send(const int fd)
 
        if ((old ^ new) & FD_EV_POLLED_W)
                updt_fd_polling(fd);
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> may send again without polling (EAGAIN not detected). */
 static inline void fd_may_send(const int fd)
 {
-       unsigned long locked;
-
        /* marking ready never changes polled status */
        if ((fdtab[fd].state & FD_EV_READY_W) ||
            HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT))
                return;
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Prepare FD <fd> to try to receive */
 static inline void fd_want_recv(int fd)
 {
        unsigned char old, new;
-       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
                if (old & FD_EV_ACTIVE_R)
                        return;
-               new = old | FD_EV_ACTIVE_R;
-               if (!(new & FD_EV_READY_R))
-                       new |= FD_EV_POLLED_R;
+               new = old | FD_EV_ACTIVE_R | FD_EV_POLLED_R;
        } while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new)));
 
        if ((old ^ new) & FD_EV_POLLED_R)
                updt_fd_polling(fd);
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Prepare FD <fd> to try to send */
 static inline void fd_want_send(int fd)
 {
        unsigned char old, new;
-       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
                if (old & FD_EV_ACTIVE_W)
                        return;
-               new = old | FD_EV_ACTIVE_W;
-               if (!(new & FD_EV_READY_W))
-                       new |= FD_EV_POLLED_W;
+               new = old | FD_EV_ACTIVE_W | FD_EV_POLLED_W;
        } while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new)));
 
        if ((old ^ new) & FD_EV_POLLED_W)
                updt_fd_polling(fd);
-
-       locked = atleast2(fdtab[fd].thread_mask);
-       if (locked)
-               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
-       fd_update_cache(fd); /* need an update entry to change the state */
-       if (locked)
-               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Update events seen for FD <fd> and its state if needed. This should be called
@@ -545,6 +408,9 @@ static inline void fd_update_events(int fd, int evts)
 
        if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
                fd_may_send(fd);
+
+       if (fdtab[fd].iocb)
+               fdtab[fd].iocb(fd);
 }
 
 /* Prepares <fd> for being polled */
index 37251502e2ff0f5b960a63c3f2883d342e6fabd3..01e484facd2745b804247db3690b5e916b222c3a 100644 (file)
@@ -32,7 +32,6 @@
  */
 struct activity {
        unsigned int loops;        // complete loops in run_poll_loop()
-       unsigned int wake_cache;   // active fd_cache prevented poll() from sleeping
        unsigned int wake_tasks;   // active tasks prevented poll() from sleeping
        unsigned int wake_signal;  // pending signal prevented poll() from sleeping
        unsigned int poll_exp;     // number of times poll() sees an expired timeout (includes wake_*)
index ab4d3690aeb44c0dc30655ca3807b14cc9e6aa14..16f6243a70f55499ca08064357baa4abb441d5aa 100644 (file)
--- a/src/cli.c
+++ b/src/cli.c
@@ -984,7 +984,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx)
                        li = fdt.owner;
 
                chunk_printf(&trash,
-                            "  %5d : st=0x%02x(R:%c%c%c W:%c%c%c) ev=0x%02x(%c%c%c%c%c) [%c%c] cnext=%d cprev=%d tmask=0x%lx umask=0x%lx owner=%p iocb=%p(%s)",
+                            "  %5d : st=0x%02x(R:%c%c%c W:%c%c%c) ev=0x%02x(%c%c%c%c%c) [%c%c] tmask=0x%lx umask=0x%lx owner=%p iocb=%p(%s)",
                             fd,
                             fdt.state,
                             (fdt.state & FD_EV_POLLED_R) ? 'P' : 'p',
@@ -1001,8 +1001,6 @@ static int cli_io_handler_show_fd(struct appctx *appctx)
                             (fdt.ev & FD_POLL_IN)  ? 'I' : 'i',
                             fdt.linger_risk ? 'L' : 'l',
                             fdt.cloned ? 'C' : 'c',
-                            fdt.cache.next,
-                            fdt.cache.prev,
                             fdt.thread_mask, fdt.update_mask,
                             fdt.owner,
                             fdt.iocb,
@@ -1119,7 +1117,6 @@ static int cli_io_handler_show_activity(struct appctx *appctx)
        chunk_appendf(&trash, "thread_id: %u (%u..%u)\n", tid + 1, 1, global.nbthread);
        chunk_appendf(&trash, "date_now: %lu.%06lu\n", (long)now.tv_sec, (long)now.tv_usec);
        chunk_appendf(&trash, "loops:");        SHOW_TOT(thr, activity[thr].loops);
-       chunk_appendf(&trash, "wake_cache:");   SHOW_TOT(thr, activity[thr].wake_cache);
        chunk_appendf(&trash, "wake_tasks:");   SHOW_TOT(thr, activity[thr].wake_tasks);
        chunk_appendf(&trash, "wake_signal:");  SHOW_TOT(thr, activity[thr].wake_signal);
        chunk_appendf(&trash, "poll_exp:");     SHOW_TOT(thr, activity[thr].poll_exp);
index 059bc6b9754be50e2e6f8351c5ab6de404f9cac8..6b9d14988114910324b3462e1d5166a18cdbf267 100644 (file)
@@ -45,7 +45,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
 
        chunk_appendf(buf,
                      "%c%cThread %-2u: act=%d glob=%d wq=%d rq=%d tl=%d tlsz=%d rqsz=%d\n"
-                     "             stuck=%d fdcache=%d prof=%d",
+                     "             stuck=%d prof=%d",
                      (thr == calling_tid) ? '*' : ' ', stuck ? '>' : ' ', thr + 1,
                      thread_has_tasks(),
                      !!(global_tasks_mask & thr_bit),
@@ -55,7 +55,6 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
                      task_per_thread[thr].task_list_size,
                      task_per_thread[thr].rqueue_size,
                      stuck,
-                     !!(fd_cache_mask & thr_bit),
                      !!(task_profiling_mask & thr_bit));
 
        chunk_appendf(buf,
index 6c09c0498f7725b713dc1b37a57213e4cfbdb596..bd2d616cd15ec308f0691afc118a91bec92582aa 100644 (file)
@@ -169,6 +169,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake)
        tv_leaving_poll(wait_time, status);
 
        thread_harmless_end();
+       if (sleeping_thread_mask & tid_bit)
+               _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
 
        /* process polled events */
 
index eae72d0991b547b44293744ca257393b295a0ddf..7842bf242b81f85146606f69c82d91bc724bc220 100644 (file)
@@ -140,6 +140,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake)
        }
 
        thread_harmless_now();
+       if (sleeping_thread_mask & tid_bit)
+               _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
 
        /*
         * Determine how long to wait for events to materialise on the port.
index aea2ab7389dcbbac487f8d89cd3c925b9f57bce5..692437731b0c9551d6aec6dcf8dcb1407060d559 100644 (file)
@@ -164,6 +164,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake)
        tv_leaving_poll(wait_time, status);
 
        thread_harmless_end();
+       if (sleeping_thread_mask & tid_bit)
+               _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
 
        for (count = 0; count < status; count++) {
                unsigned int n = 0;
index 54812f53b85924b4397b15af1bf8785942af0129..b349c555f620ada825182cc2626743433b2c4e83 100644 (file)
@@ -159,6 +159,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake)
        } while (!_HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd));
 
        thread_harmless_now();
+       if (sleeping_thread_mask & tid_bit)
+               _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
 
        fd_nbupdt = 0;
 
index 0ccf2f150c4c4c3598568144e015938601348e26..be88bc2bf414b4c7409da139b455045dfbeba628 100644 (file)
@@ -176,6 +176,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake)
        tv_leaving_poll(delta_ms, status);
 
        thread_harmless_end();
+       if (sleeping_thread_mask & tid_bit)
+               _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
 
        if (status <= 0)
                return;
index e71c2ee8d666aebc8d3876d44439d17c33ad5966..71df46e05ef18cf367863831b834c3cdbacb11f9 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
@@ -8,40 +8,6 @@
  * as published by the Free Software Foundation; either version
  * 2 of the License, or (at your option) any later version.
  *
- * This code implements an events cache for file descriptors. It remembers the
- * readiness of a file descriptor after a return from poll() and the fact that
- * an I/O attempt failed on EAGAIN. Events in the cache which are still marked
- * ready and active are processed just as if they were reported by poll().
- *
- * This serves multiple purposes. First, it significantly improves performance
- * by avoiding to subscribe to polling unless absolutely necessary, so most
- * events are processed without polling at all, especially send() which
- * benefits from the socket buffers. Second, it is the only way to support
- * edge-triggered pollers (eg: EPOLL_ET). And third, it enables I/O operations
- * that are backed by invisible buffers. For example, SSL is able to read a
- * whole socket buffer and not deliver it to the application buffer because
- * it's full. Unfortunately, it won't be reported by a poller anymore until
- * some new activity happens. The only way to call it again thus is to keep
- * this readiness information in the cache and to access it without polling
- * once the FD is enabled again.
- *
- * One interesting feature of the cache is that it maintains the principle
- * of speculative I/O introduced in haproxy 1.3 : the first time an event is
- * enabled, the FD is considered as ready so that the I/O attempt is performed
- * via the cache without polling. And the polling happens only when EAGAIN is
- * first met. This avoids polling for HTTP requests, especially when the
- * defer-accept mode is used. It also avoids polling for sending short data
- * such as requests to servers or short responses to clients.
- *
- * The cache consists in a list of active events and a list of updates.
- * Active events are events that are expected to come and that we must report
- * to the application until it asks to stop or asks to poll. Updates are new
- * requests for changing an FD state. Updates are the only way to create new
- * events. This is important because it means that the number of cached events
- * cannot increase between updates and will only grow one at a time while
- * processing updates. All updates must always be processed, though events
- * might be processed by small batches if required.
- *
  * There is no direct link between the FD and the updates list. There is only a
  * bit in the fdtab[] to indicate than a file descriptor is already present in
  * the updates list. Once an fd is present in the updates list, it will have to
  * is that unhandled events will still wake the poller up. Using an edge-
  * triggered poller such as EPOLL_ET will solve this issue though.
  *
- * Since we do not want to scan all the FD list to find cached I/O events,
- * we store them in a list consisting in a linear array holding only the FD
- * indexes right now. Note that a closed FD cannot exist in the cache, because
- * it is closed by fd_delete() which in turn calls fd_release_cache_entry()
- * which always removes it from the list.
- *
- * The FD array has to hold a back reference to the cache. This reference is
- * always valid unless the FD is not in the cache and is not updated, in which
- * case the reference points to index 0.
- *
  * The event state for an FD, as found in fdtab[].state, is maintained for each
  * direction. The state field is built this way, with R bits in the low nibble
  * and W bits in the high nibble for ease of access and debugging :
@@ -175,12 +131,8 @@ struct poller pollers[MAX_POLLERS];
 struct poller cur_poller;
 int nbpollers = 0;
 
-volatile struct fdlist fd_cache ; // FD events cache
-volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
 volatile struct fdlist update_list; // Global update list
 
-unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
-
 THREAD_LOCAL int *fd_updt  = NULL;  // FD updates list
 THREAD_LOCAL int  fd_nbupdt = 0;   // number of updates in the list
 THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread
@@ -379,7 +331,6 @@ static void fd_dodelete(int fd, int do_close)
        if (cur_poller.clo)
                cur_poller.clo(fd);
 
-       fd_release_cache_entry(fd);
        fdtab[fd].state = 0;
 
        port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port);
@@ -411,66 +362,6 @@ void fd_remove(int fd)
        fd_dodelete(fd, 0);
 }
 
-static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
-{
-       int fd, old_fd, e;
-       unsigned long locked;
-
-       for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].cache.next) {
-               if (fd == -2) {
-                       fd = old_fd;
-                       continue;
-               } else if (fd <= -3)
-                       fd = -fd - 4;
-               if (fd == -1)
-                       break;
-               old_fd = fd;
-               if (!(fdtab[fd].thread_mask & tid_bit))
-                       continue;
-               if (fdtab[fd].cache.next < -3)
-                       continue;
-
-               _HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
-               locked = atleast2(fdtab[fd].thread_mask);
-               if (locked && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
-                       activity[tid].fd_lock++;
-                       continue;
-               }
-
-               e = fdtab[fd].state;
-               fdtab[fd].ev &= FD_POLL_STICKY;
-
-               if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
-                       fdtab[fd].ev |= FD_POLL_IN;
-
-               if ((e & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
-                       fdtab[fd].ev |= FD_POLL_OUT;
-
-               if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
-                       if (locked)
-                               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
-                       fdtab[fd].iocb(fd);
-               }
-               else {
-                       fd_release_cache_entry(fd);
-                       if (locked)
-                               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
-               }
-       }
-}
-
-/* Scan and process the cached events. This should be called right after
- * the poller. The loop may cause new entries to be created, for example
- * if a listener causes an accept() to initiate a new incoming connection
- * wanting to attempt an recv().
- */
-void fd_process_cached_events()
-{
-       _HA_ATOMIC_AND(&fd_cache_mask, ~tid_bit);
-       fdlist_process_cached_events(&fd_cache_local[tid]);
-       fdlist_process_cached_events(&fd_cache);
-}
-
 #if defined(USE_CLOSEFROM)
 void my_closefrom(int start)
 {
@@ -640,7 +531,6 @@ int init_pollers()
        if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
                goto fail_info;
 
-       fd_cache.first = fd_cache.last = -1;
        update_list.first = update_list.last = -1;
 
        for (p = 0; p < global.maxsock; p++) {
@@ -649,8 +539,6 @@ int init_pollers()
                fdtab[p].cache.next = -3;
                fdtab[p].update.next = -3;
        }
-       for (p = 0; p < global.nbthread; p++)
-               fd_cache_local[p].first = fd_cache_local[p].last = -1;
 
        do {
                bp = NULL;
index dfd2819e1335f3b2e07ef0252d6a4247208b3b8c..4d58c532168c23c03df12ea7b9f3a429a705e014 100644 (file)
@@ -2496,9 +2496,7 @@ static void run_poll_loop()
 
                /* expire immediately if events are pending */
                wake = 1;
-               if (fd_cache_mask & tid_bit)
-                       activity[tid].wake_cache++;
-               else if (thread_has_tasks())
+               if (thread_has_tasks())
                        activity[tid].wake_tasks++;
                else if (signal_queue_len && tid == 0)
                        activity[tid].wake_signal++;
@@ -2514,9 +2512,6 @@ static void run_poll_loop()
 
                /* The poller will ensure it returns around <next> */
                cur_poller.poll(&cur_poller, next, wake);
-               if (sleeping_thread_mask & tid_bit)
-                       _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
-               fd_process_cached_events();
 
                activity[tid].loops++;
        }
index 8de76d736c0335799c35ccb52224e83e6ada3eaa..b8beaef24becf142e950764811c5f7e6f83d518d 100644 (file)
@@ -48,7 +48,6 @@
 #define H1C_F_CS_ERROR       0x00001000 /* connection must be closed ASAP because an error occurred */
 #define H1C_F_CS_SHUTW_NOW   0x00002000 /* connection must be shut down for writes ASAP */
 #define H1C_F_CS_SHUTDOWN    0x00004000 /* connection is shut down for read and writes */
-#define H1C_F_CS_WAIT_CONN   0x00008000 /* waiting for the connection establishment */
 
 #define H1C_F_WAIT_NEXT_REQ  0x00010000 /*  waiting for the next request to start, use keep-alive timeout */
 #define H1C_F_UPG_H2C        0x00020000 /* set if an upgrade to h2 should be done */
@@ -451,9 +450,6 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
                t->expire = tick_add(now_ms, h1c->timeout);
        }
 
-       if (!(conn->flags & CO_FL_CONNECTED) || (conn->flags & CO_FL_HANDSHAKE))
-               h1c->flags |= H1C_F_CS_WAIT_CONN;
-
        /* Always Create a new H1S */
        if (!h1s_create(h1c, conn->ctx, sess))
                goto fail;
@@ -1894,11 +1890,6 @@ static int h1_recv(struct h1c *h1c)
        if (h1c->wait_event.events & SUB_RETRY_RECV)
                return (b_data(&h1c->ibuf));
 
-       if (!(conn->flags & CO_FL_ERROR) && h1c->flags & H1C_F_CS_WAIT_CONN) {
-               conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
-               return 0;
-       }
-
        if (!h1_recv_allowed(h1c)) {
                rcvd = 1;
                goto end;
@@ -1982,12 +1973,6 @@ static int h1_send(struct h1c *h1c)
        if (conn->flags & CO_FL_ERROR)
                return 0;
 
-       if (h1c->flags & H1C_F_CS_WAIT_CONN) {
-               if (!(h1c->wait_event.events & SUB_RETRY_SEND))
-                       conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &h1c->wait_event);
-               return 0;
-       }
-
        if (!b_data(&h1c->obuf))
                goto end;
 
@@ -2035,14 +2020,6 @@ static int h1_process(struct h1c * h1c)
        if (!conn->ctx)
                return -1;
 
-       if (h1c->flags & H1C_F_CS_WAIT_CONN) {
-               if (!(conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR)) ||
-                   (!(conn->flags & CO_FL_ERROR) && (conn->flags & CO_FL_HANDSHAKE)))
-                       goto end;
-               h1c->flags &= ~H1C_F_CS_WAIT_CONN;
-               h1_wake_stream_for_send(h1s);
-       }
-
        if (!h1s) {
                if (h1c->flags & (H1C_F_CS_ERROR|H1C_F_CS_SHUTDOWN) ||
                    conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH) ||
@@ -2103,10 +2080,7 @@ static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status)
 
 static void h1_reset(struct connection *conn)
 {
-       struct h1c *h1c = conn->ctx;
 
-       /* Reset the flags, and let the mux know we're waiting for a connection */
-       h1c->flags = H1C_F_CS_WAIT_CONN;
 }
 
 static int h1_wake(struct connection *conn)
@@ -2401,6 +2375,7 @@ static int h1_subscribe(struct conn_stream *cs, int event_type, void *param)
 {
        struct wait_event *sw;
        struct h1s *h1s = cs->ctx;
+       struct h1c *h1c = h1s->h1c;
 
        if (!h1s)
                return -1;
@@ -2417,6 +2392,17 @@ static int h1_subscribe(struct conn_stream *cs, int event_type, void *param)
                        BUG_ON(h1s->send_wait != NULL || (sw->events & SUB_RETRY_SEND));
                        sw->events |= SUB_RETRY_SEND;
                        h1s->send_wait = sw;
+                       /*
+                        * If the conn_stream attempt to subscribe, and the
+                        * mux isn't subscribed to the connection, then it
+                        * probably means the connection wasn't established
+                        * yet, so we have to subscribe.
+                        */
+                       if (!(h1c->wait_event.events & SUB_RETRY_SEND))
+                               h1c->conn->xprt->subscribe(h1c->conn,
+                                                          h1c->conn->xprt_ctx,
+                                                          SUB_RETRY_SEND,
+                                                          &h1c->wait_event);
                        return 0;
                default:
                        break;
@@ -2461,7 +2447,13 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
                return 0;
 
        h1c = h1s->h1c;
-       if (h1c->flags & H1C_F_CS_WAIT_CONN)
+
+       /* If we're not connected yet, or we're waiting for a handshake, stop
+        * now, as we don't want to remove everything from the channel buffer
+        * before we're sure we can send it.
+        */
+       if (!(h1c->conn->flags & CO_FL_CONNECTED) ||
+           (h1c->conn->flags & CO_FL_HANDSHAKE))
                return 0;
 
        while (count) {
index 7def387347693fe023763692e88e120353e5a0b8..7b2564e8c53f81376d688141e1fff1826a81ecea 100644 (file)
@@ -175,7 +175,6 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
        if (l->options & LI_O_ACC_CIP)
                cli_conn->flags |= CO_FL_ACCEPT_CIP;
 
-       conn_xprt_want_recv(cli_conn);
        if (conn_xprt_init(cli_conn) < 0)
                goto out_free_conn;