]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: ring: avoid writes to cells during copy
authorWilly Tarreau <w@1wt.eu>
Fri, 15 Mar 2024 15:10:55 +0000 (16:10 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
It has been found that performing a first pass consisting in copying
all messages, and a second one to notify about releases is more efficient
on AMD than updating all of them on the fly using a CAS, despite making
writers wait longer to be released.

Maybe it's related to the ability for the CPU to prefetch the contents
during a simple load while it wouldn't do it for an XCHG, it's unsure
at this point. This will also mater permit to use relaxed stores to
release threads.

On ARM the performance increased to 7.0M/s. If this patch is applied
before the dropping of the intermediary step, instead it drops to
3.9M/s. This shows the dependency between such changes that strive to
limit the number of writes on the fast path.

On x86_64, the EPYC at 3C6T saw a small drop from 4.57M to 4.45M, but
the 24C48T setup saw a nice 33% boost from 3.33M to 4.44M, i.e. we
get stable perf at 3 and 24 cores, despite having 8 CCX involved and
fighting with each other.

Other possibilities are:
  - use of HA_ATOMIC_XCHG() instead of FETCH_OR()
    => slightly faster (4.62/7.37 vs 4.58/7.34). Pb: requires to
       modify the readers to wait much longer since the tail value
       won't be valid in this case during updates, and it will have
       to wait by looping over it.
  - use other conditions to release a cell
    => to be tested

src/ring.c

index 7998f09050a9fa38f061adbb1218d7f72a415a37..9f2be10fda9a2f0ad9d9342a8ce052aa13d7aaaf 100644 (file)
@@ -367,63 +367,70 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
 
        vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
 
-       /* the list stops on a NULL */
-       for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_XCHG(&curr_cell->next, curr_cell)) {
-               if (unlikely(tail_ofs == new_tail_ofs)) {
-                       /* report that this message was dropped.
-                        * Note: for now this must not happen!
+       if (likely(tail_ofs != new_tail_ofs)) {
+               /* the list stops on a NULL */
+               for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_LOAD(&curr_cell->next)) {
+                       maxlen = curr_cell->maxlen;
+                       pfx = curr_cell->pfx;
+                       npfx = curr_cell->npfx;
+                       msg = curr_cell->msg;
+                       nmsg = curr_cell->nmsg;
+
+                       /* let's write the message size */
+                       vp_put_varint(&v1, &v2, maxlen);
+
+                       /* then write the messages */
+                       msglen = 0;
+                       for (i = 0; i < npfx; i++) {
+                               size_t len = pfx[i].len;
+
+                               if (len + msglen > maxlen)
+                                       len = maxlen - msglen;
+                               if (len)
+                                       vp_putblk(&v1, &v2, pfx[i].ptr, len);
+                               msglen += len;
+                       }
+
+                       for (i = 0; i < nmsg; i++) {
+                               size_t len = msg[i].len;
+
+                               if (len + msglen > maxlen)
+                                       len = maxlen - msglen;
+                               if (len)
+                                       vp_putblk(&v1, &v2, msg[i].ptr, len);
+                               msglen += len;
+                       }
+
+                       /* for all but the last message we need to write the
+                        * readers count byte.
                         */
-                       HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
-                       continue;
+                       if (curr_cell->next)
+                               vp_putchr(&v1, &v2, 0);
                }
 
-               maxlen = curr_cell->maxlen;
-               pfx = curr_cell->pfx;
-               npfx = curr_cell->npfx;
-               msg = curr_cell->msg;
-               nmsg = curr_cell->nmsg;
-
-               /* let's write the message size */
-               vp_put_varint(&v1, &v2, maxlen);
-
-               /* then write the messages */
-               msglen = 0;
-               for (i = 0; i < npfx; i++) {
-                       size_t len = pfx[i].len;
-
-                       if (len + msglen > maxlen)
-                               len = maxlen - msglen;
-                       if (len)
-                               vp_putblk(&v1, &v2, pfx[i].ptr, len);
-                       msglen += len;
+               /* now release */
+               for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
+                       next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
+                       HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
                }
 
-               for (i = 0; i < nmsg; i++) {
-                       size_t len = msg[i].len;
-
-                       if (len + msglen > maxlen)
-                               len = maxlen - msglen;
-                       if (len)
-                               vp_putblk(&v1, &v2, msg[i].ptr, len);
-                       msglen += len;
+               /* unlock the message area */
+               HA_ATOMIC_STORE(lock_ptr, readers);
+       } else {
+               /* messages were dropped, notify about this and release them  */
+               for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
+                       next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
+                       HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
+                       HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
                }
-
-               /* for all but the last message we need to write the
-                * readers count byte.
-                */
-               if (curr_cell->next)
-                       vp_putchr(&v1, &v2, 0);
        }
 
        /* we must not write the trailing read counter, it was already done,
-        * plus we could ruin the one of the next writer. Let's just unlock
-        * the front.
+        * plus we could ruin the one of the next writer. And the front was
+        * unlocked either at the top if the ring was full, or just above if it
+        * could be properly filled.
         */
 
-       /* unlock the message area */
-       if (new_tail_ofs != tail_ofs)
-               HA_ATOMIC_STORE(lock_ptr, readers);
-
        sent = cell.to_send_self;
 
        /* notify potential readers */