]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: fd/threads: Make the fdcache mostly lockless.
authorOlivier Houchard <ohouchard@haproxy.com>
Wed, 24 Jan 2018 17:17:56 +0000 (18:17 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 5 Feb 2018 15:02:22 +0000 (16:02 +0100)
Create a local, per-thread, fdcache, for file descriptors that only belongs
to one thread, and make the global fd cache mostly lockless, as we can get
a lot of contention on the fd cache lock.

include/common/hathreads.h
include/proto/fd.h
include/types/fd.h
src/cli.c
src/fd.c
src/stream.c

index 1dabf3cc23a2f0c220629b4e10d338a32a02c979..a8fdf150a15842bfdfa29bdb826b5680480dc88f 100644 (file)
@@ -259,7 +259,6 @@ int  thread_need_sync(void);
 /* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
 enum lock_label {
        THREAD_SYNC_LOCK = 0,
-       FDCACHE_LOCK,
        FD_LOCK,
        TASK_RQ_LOCK,
        TASK_WQ_LOCK,
@@ -376,7 +375,6 @@ static inline const char *lock_label(enum lock_label label)
 {
        switch (label) {
        case THREAD_SYNC_LOCK:     return "THREAD_SYNC";
-       case FDCACHE_LOCK:         return "FDCACHE";
        case FD_LOCK:              return "FD";
        case TASK_RQ_LOCK:         return "TASK_RQ";
        case TASK_WQ_LOCK:         return "TASK_WQ";
index a7e70b7fd4655822c02faddcab5a686be2330edf..595af90f2b7ed3d6f93319cb5c36da5913612cfb 100644 (file)
@@ -33,8 +33,9 @@
 
 /* public variables */
 
-extern unsigned int *fd_cache;      // FD events cache
-extern int fd_cache_num;            // number of events in the cache
+extern volatile struct fdlist fd_cache;
+extern volatile struct fdlist fd_cache_local[MAX_THREADS];
+
 extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
 
 extern THREAD_LOCAL int *fd_updt;  // FD updates list
@@ -105,44 +106,223 @@ static inline void updt_fd_polling(const int fd)
 }
 
 
+#define _GET_NEXT(fd) fdtab[fd].fdcache_entry.next
+#define _GET_PREV(fd) fdtab[fd].fdcache_entry.prev
+
+static inline void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
+{
+       int next;
+       int new;
+       int old;
+       int last;
+
+redo_next:
+       next = _GET_NEXT(fd);
+       /*
+        * Check that we're not already in the cache, and if not, lock us.
+        * <= -3 means not in the cache, -2 means locked, -1 means we're
+        * in the cache, and the last element, >= 0 gives the FD of the next
+        * in the cache.
+        */
+       if (next >= -2)
+               goto done;
+       if (!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2))
+               goto redo_next;
+       __ha_barrier_store();
+redo_last:
+       /* First, insert in the linked list */
+       last = list->last;
+       old = -1;
+       new = fd;
+       if (unlikely(last == -1)) {
+               /* list is empty, try to add ourselves alone so that list->last=fd */
+
+               _GET_PREV(fd) = last;
+
+               /* Make sure the "prev" store is visible before we update the last entry */
+               __ha_barrier_store();
+               if (unlikely(!HA_ATOMIC_CAS(&list->last, &old, new)))
+                           goto redo_last;
+
+               /* list->first was necessary -1, we're guaranteed to be alone here */
+               list->first = fd;
+
+               /* since we're alone at the end of the list and still locked(-2),
+                * we know noone tried to add past us. Mark the end of list.
+                */
+               _GET_NEXT(fd) = -1;
+               goto done; /* We're done ! */
+       } else {
+               /* non-empty list, add past the tail */
+               do {
+                       new = fd;
+                       old = -1;
+                       _GET_PREV(fd) = last;
+
+                       __ha_barrier_store();
+
+                       /* adding ourselves past the last element
+                        * The CAS will only succeed if its next is -1,
+                        * which means it's in the cache, and the last element.
+                        */
+                       if (likely(HA_ATOMIC_CAS(&_GET_NEXT(last), &old, new)))
+                               break;
+                       goto redo_last;
+               } while (1);
+       }
+       /* Then, update the last entry */
+redo_fd_cache:
+       last = list->last;
+       __ha_barrier_load();
+
+       if (unlikely(!HA_ATOMIC_CAS(&list->last, &last, fd)))
+               goto redo_fd_cache;
+       __ha_barrier_store();
+       _GET_NEXT(fd) = -1;
+       __ha_barrier_store();
+done:
+       return;
+}
+
 /* Allocates a cache entry for a file descriptor if it does not yet have one.
  * This can be done at any time.
  */
 static inline void fd_alloc_cache_entry(const int fd)
 {
-       HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
-       if (fdtab[fd].cache)
-               goto end;
-       fd_cache_num++;
-       fd_cache_mask |= fdtab[fd].thread_mask;
-       fdtab[fd].cache = fd_cache_num;
-       fd_cache[fd_cache_num-1] = fd;
-  end:
-       HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
+       if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
+               fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
+       else
+               fd_add_to_fd_list(&fd_cache, fd);
+ }
+
+static inline void fd_rm_from_fd_list(volatile struct fdlist *list, int fd)
+{
+#if defined(HA_HAVE_CAS_DW) || defined(HA_CAS_IS_8B)
+       volatile struct fdlist_entry cur_list, next_list;
+#endif
+       int old;
+       int new = -2;
+       volatile int prev;
+       volatile int next;
+       int last;
+
+lock_self:
+#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
+       next_list.next = next_list.prev = -2;
+       cur_list.prev = _GET_PREV(fd);
+       cur_list.next = _GET_NEXT(fd);
+       /* First, attempt to lock our own entries */
+       do {
+               /* The FD is not in the FD cache, give up */
+               if (unlikely(cur_list.next <= -3))
+                       return;
+               if (unlikely(cur_list.prev == -2 || cur_list.next == -2))
+                       goto lock_self;
+       } while (
+#ifdef HA_CAS_IS_8B
+           unlikely(!HA_ATOMIC_CAS(((void **)(void *)&_GET_NEXT(fd)), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
+#else
+           unlikely(!__ha_cas_dw((void *)&_GET_NEXT(fd), (void *)&cur_list, (void *)&next_list)))
+#endif
+           ;
+       next = cur_list.next;
+       prev = cur_list.prev;
+
+#else
+lock_self_next:
+       next = _GET_NEXT(fd);
+       if (next == -2)
+               goto lock_self_next;
+       if (next <= -3)
+               goto done;
+       if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2)))
+               goto lock_self_next;
+lock_self_prev:
+       prev = _GET_PREV(fd);
+       if (prev == -2)
+               goto lock_self_prev;
+       if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(fd), &prev, -2)))
+               goto lock_self_prev;
+#endif
+       __ha_barrier_store();
+
+       /* Now, lock the entries of our neighbours */
+       if (likely(prev != -1)) {
+redo_prev:
+               old = fd;
+
+               if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(prev), &old, new))) {
+                       if (unlikely(old == -2)) {
+                               /* Neighbour already locked, give up and
+                                * retry again once he's done
+                                */
+                               _GET_PREV(fd) = prev;
+                               __ha_barrier_store();
+                               _GET_NEXT(fd) = next;
+                               __ha_barrier_store();
+                               goto lock_self;
+                       }
+                       goto redo_prev;
+               }
+       }
+       if (likely(next != -1)) {
+redo_next:
+               old = fd;
+               if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(next), &old, new))) {
+                       if (unlikely(old == -2)) {
+                               /* Neighbour already locked, give up and
+                                * retry again once he's done
+                                */
+                               if (prev != -1) {
+                                       _GET_NEXT(prev) = fd;
+                                       __ha_barrier_store();
+                               }
+                               _GET_PREV(fd) = prev;
+                               __ha_barrier_store();
+                               _GET_NEXT(fd) = next;
+                               __ha_barrier_store();
+                               goto lock_self;
+                       }
+                       goto redo_next;
+               }
+       }
+       if (list->first == fd)
+               list->first = next;
+       __ha_barrier_store();
+       last = list->last;
+       while (unlikely(last == fd && (!HA_ATOMIC_CAS(&list->last, &last, prev))))
+               __ha_compiler_barrier();
+       /* Make sure we let other threads know we're no longer in cache,
+        * before releasing our neighbours.
+        */
+       __ha_barrier_store();
+       if (likely(prev != -1))
+               _GET_NEXT(prev) = next;
+       __ha_barrier_store();
+       if (likely(next != -1))
+               _GET_PREV(next) = prev;
+       __ha_barrier_store();
+       /* Ok, now we're out of the fd cache */
+       _GET_NEXT(fd) = -(next + 4);
+       __ha_barrier_store();
+done:
+       return;
 }
 
+#undef _GET_NEXT
+#undef _GET_PREV
+
+
 /* Removes entry used by fd <fd> from the FD cache and replaces it with the
- * last one. The fdtab.cache is adjusted to match the back reference if needed.
+ * last one.
  * If the fd has no entry assigned, return immediately.
  */
 static inline void fd_release_cache_entry(int fd)
 {
-       unsigned int pos;
-
-       HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
-       pos = fdtab[fd].cache;
-       if (!pos)
-               goto end;
-       fdtab[fd].cache = 0;
-       fd_cache_num--;
-       if (likely(pos <= fd_cache_num)) {
-               /* was not the last entry */
-               fd = fd_cache[fd_cache_num];
-               fd_cache[pos - 1] = fd;
-               fdtab[fd].cache = pos;
-       }
-  end:
-       HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
+       if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
+               fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
+       else
+               fd_rm_from_fd_list(&fd_cache, fd);
 }
 
 /* Computes the new polled status based on the active and ready statuses, for
@@ -402,7 +582,6 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
        fdtab[fd].update_mask &= ~tid_bit;
        fdtab[fd].linger_risk = 0;
        fdtab[fd].cloned = 0;
-       fdtab[fd].cache = 0;
        fdtab[fd].thread_mask = thread_mask;
        /* note: do not reset polled_mask here as it indicates which poller
         * still knows this FD from a possible previous round.
index e04ea675f8f6a15ac97d189e5fc1d6d2d43a604b..8edf26bf1bc9909f9cbf15d8346f81ff5739158d 100644 (file)
@@ -90,15 +90,25 @@ enum fd_states {
  */
 #define DEAD_FD_MAGIC 0xFDDEADFD
 
+struct fdlist_entry {
+       volatile int next;
+       volatile int prev;
+} __attribute__ ((aligned(8)));
+
+struct fdlist {
+       volatile int first;
+       volatile int last;
+} __attribute__ ((aligned(8)));
+
 /* info about one given fd */
 struct fdtab {
        __decl_hathreads(HA_SPINLOCK_T lock);
        unsigned long thread_mask;           /* mask of thread IDs authorized to process the task */
        unsigned long polled_mask;           /* mask of thread IDs currently polling this fd */
        unsigned long update_mask;           /* mask of thread IDs having an update for fd */
+       struct fdlist_entry fdcache_entry;   /* Entry in the fdcache */
        void (*iocb)(int fd);                /* I/O handler */
        void *owner;                         /* the connection or listener associated with this fd, NULL if closed */
-       unsigned int  cache;                 /* position+1 in the FD cache. 0=not in cache. */
        unsigned char state;                 /* FD state for read and write directions (2*3 bits) */
        unsigned char ev;                    /* event seen in return of poll() : FD_POLL_* */
        unsigned char linger_risk:1;         /* 1 if we must kill lingering before closing */
index ed8cc5bff54db331c1b5167d5b8e04febb336f30..85d35678b1ed31c2dc52473b48256a55e5e0f95d 100644 (file)
--- a/src/cli.c
+++ b/src/cli.c
@@ -811,7 +811,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx)
                             (fdt.ev & FD_POLL_IN)  ? 'I' : 'i',
                             fdt.linger_risk ? 'L' : 'l',
                             fdt.cloned ? 'C' : 'c',
-                            fdt.cache,
+                            fdt.fdcache_entry.next >= -2 ? 1 : 0,
                             fdt.owner,
                             fdt.iocb,
                             (fdt.iocb == conn_fd_handler)  ? "conn_fd_handler" :
index 0995040d6b29af9a812154ed10b30642b68ddc71..2cd79fb9fcd9f02150d3dd9c9418489b0d7d27c6 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
@@ -167,15 +167,14 @@ struct poller pollers[MAX_POLLERS];
 struct poller cur_poller;
 int nbpollers = 0;
 
-unsigned int *fd_cache = NULL; // FD events cache
-int fd_cache_num = 0;          // number of events in the cache
+volatile struct fdlist fd_cache ; // FD events cache
+volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
+
 unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
 
 THREAD_LOCAL int *fd_updt  = NULL;  // FD updates list
 THREAD_LOCAL int  fd_nbupdt = 0;   // number of updates in the list
 
-__decl_hathreads(HA_RWLOCK_T   fdcache_lock);     /* global lock to protect fd_cache array */
-
 /* Deletes an FD from the fdsets.
  * The file descriptor is also closed.
  */
@@ -221,33 +220,30 @@ void fd_remove(int fd)
        fd_dodelete(fd, 0);
 }
 
-/* Scan and process the cached events. This should be called right after
- * the poller. The loop may cause new entries to be created, for example
- * if a listener causes an accept() to initiate a new incoming connection
- * wanting to attempt an recv().
- */
-void fd_process_cached_events()
+static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
 {
-       int fd, entry, e;
+       int fd, old_fd, e;
 
-       HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
-       fd_cache_mask &= ~tid_bit;
-       for (entry = 0; entry < fd_cache_num; ) {
-               fd = fd_cache[entry];
-
-               if (!(fdtab[fd].thread_mask & tid_bit)) {
-                       activity[tid].fd_skip++;
-                       goto next;
-               }
+       for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].fdcache_entry.next) {
+               if (fd == -2) {
+                       fd = old_fd;
+                       continue;
+               } else if (fd <= -3)
+                       fd = -fd - 4;
+               if (fd == -1)
+                       break;
+               old_fd = fd;
+               if (!(fdtab[fd].thread_mask & tid_bit))
+                       continue;
+               if (fdtab[fd].fdcache_entry.next < -3)
+                       continue;
 
-               fd_cache_mask |= tid_bit;
+               HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
                if (HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
                        activity[tid].fd_lock++;
-                       goto next;
+                       continue;
                }
 
-               HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
-
                e = fdtab[fd].state;
                fdtab[fd].ev &= FD_POLL_STICKY;
 
@@ -265,19 +261,19 @@ void fd_process_cached_events()
                        fd_release_cache_entry(fd);
                        HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
                }
-
-               HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
-               /* If the fd was removed from the cache, it has been
-                * replaced by the next one that we don't want to skip !
-                */
-               if (entry < fd_cache_num && fd_cache[entry] != fd) {
-                       activity[tid].fd_del++;
-                       continue;
-               }
-         next:
-               entry++;
        }
-       HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
+}
+
+/* Scan and process the cached events. This should be called right after
+ * the poller. The loop may cause new entries to be created, for example
+ * if a listener causes an accept() to initiate a new incoming connection
+ * wanting to attempt an recv().
+ */
+void fd_process_cached_events()
+{
+       HA_ATOMIC_AND(&fd_cache_mask, ~tid_bit);
+       fdlist_process_cached_events(&fd_cache_local[tid]);
+       fdlist_process_cached_events(&fd_cache);
 }
 
 /* disable the specified poller */
@@ -320,16 +316,19 @@ int init_pollers()
        if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
                goto fail_info;
 
-       if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
-               goto fail_cache;
-
+       fd_cache.first = fd_cache.last = -1;
        hap_register_per_thread_init(init_pollers_per_thread);
        hap_register_per_thread_deinit(deinit_pollers_per_thread);
 
-       for (p = 0; p < global.maxsock; p++)
+       for (p = 0; p < global.maxsock; p++) {
                HA_SPIN_INIT(&fdtab[p].lock);
+               /* Mark the fd as out of the fd cache */
+               fdtab[p].fdcache_entry.next = -3;
+               fdtab[p].fdcache_entry.next = -3;
+       }
+       for (p = 0; p < global.nbthread; p++)
+               fd_cache_local[p].first = fd_cache_local[p].last = -1;
 
-       HA_RWLOCK_INIT(&fdcache_lock);
        do {
                bp = NULL;
                for (p = 0; p < nbpollers; p++)
@@ -372,11 +371,8 @@ void deinit_pollers() {
                        bp->term(bp);
        }
 
-       free(fd_cache); fd_cache = NULL;
        free(fdinfo);   fdinfo   = NULL;
        free(fdtab);    fdtab    = NULL;
-
-       HA_RWLOCK_DESTROY(&fdcache_lock);
 }
 
 /*
index 92f9c0a64c24181675b37d803e1bd3df5a71bf89..e710d0d39ea5c0a48d31dcfd24ab0fa174c1a990 100644 (file)
@@ -2905,7 +2905,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
                                      conn->flags,
                                      conn->handle.fd,
                                      conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
-                                     conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0,
+                                     conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
                                      conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
                                      conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
                }
@@ -2938,7 +2938,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
                                      conn->flags,
                                      conn->handle.fd,
                                      conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
-                                     conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0,
+                                     conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
                                      conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
                                      conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
                }