]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: ring: implement a waiting queue in front of the ring
authorWilly Tarreau <w@1wt.eu>
Mon, 11 Mar 2024 13:57:37 +0000 (14:57 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 25 Mar 2024 17:34:19 +0000 (17:34 +0000)
The queue-based approach consists in forcing threads to wait away from
the work area so as not to disturb the current writer, and to prepare
the work by grouping them in a queue. The last arrived takes the head
of the queue by placing its preinitialized ring cell there, becomes the
queue's leader, informs itself about the amount of previously accumulated
bytes so that when its turn comes, it immediately knows how much room is
needed to be released.

It can then take the whole queue with it, leaving an empty one for new
threads to come while it's releasing the room needed to copy everything.

By doing so we're cascading contention areas so that multiple parts can
work in parallel.

Note that we must never leave a write counter set to 0xFF at tail, and
this happens when a message cannot fit and we give up, because in this
case we're writing back tail_ofs, and only later we restore the counter.

The solution here is to make a special case when we're going to drop
the messages, and to write the readers count before restoring tail.

This already shows a tremendous performance gain on ARM (385k -> 4.8M),
thanks to the fact that now all waiting threads wait on the queue's
head instead of polluting the tail lock. On x86_64, the EPYC sees a big
boost at 24C48T (1.88M -> 3.82M) and a slowdown at 3C6T (6.0->4.45)
though this one is much less of a concern as so few threads need less
bandwidth than bigger counts.

src/ring.c

index 4118a645dfee53a88e7123bab43fe9fc87566687..c445a23cc458ba18ce7a90b7aa133a91f2f8db92 100644 (file)
@@ -175,6 +175,8 @@ void ring_free(struct ring *ring)
  */
 ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
 {
+       struct ring_wait_cell **ring_queue_ptr = DISGUISE(&ring->queue[ti->ring_queue].ptr);
+       struct ring_wait_cell cell, *next_cell, *curr_cell;
        size_t *tail_ptr = &ring->storage->tail;
        size_t head_ofs, tail_ofs, new_tail_ofs;
        size_t ring_size;
@@ -228,24 +230,78 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
        if (needed + 1 > ring_size)
                goto leave;
 
-       /* try to get exclusivity on the ring's tail. For this we set the
-        * tail's highest bit, and the one that gets it wins. Many tests were
-        * run on this and the approach below is optimal for armv8.1 atomics,
-        * second-to-optimal with both x86_64 and second-to-optimal on armv8.0.
-        * x86_64 would benefit slightly more from an xchg() which would
-        * require the readers to loop during changes, and armv8.0 is slightly
-        * better there as well (+5%). The CAS is bad for both (requires a
-        * preload), though it might degrade better on large x86 compared to
-        * a busy loop that the compiler would implement for the FETCH_OR.
-        * Alternately we could kill 12 upper bits on a 64-bit tail ofs and
-        * use XADD. Not tested, and would require to undo or watch for the
-        * change (use it as a ticket).
+       cell.to_send_self = needed;
+       cell.needed_tot = 0; // only when non-zero the cell is considered ready.
+       cell.maxlen = msglen;
+       cell.pfx = pfx;
+       cell.npfx = npfx;
+       cell.msg = msg;
+       cell.nmsg = nmsg;
+
+       /* insert our cell into the queue before the previous one. We may have
+        * to wait a bit if the queue's leader is attempting an election to win
+        * the tail, hence the busy value (should be rare enough).
+        */
+       next_cell = HA_ATOMIC_XCHG(ring_queue_ptr, &cell);
+
+       /* let's add the cumulated size of pending messages to ours */
+       cell.next = next_cell;
+       if (next_cell) {
+               size_t next_needed;
+
+               while ((next_needed = HA_ATOMIC_LOAD(&next_cell->needed_tot)) == 0)
+                       __ha_cpu_relax_for_read();
+               needed += next_needed;
+       }
+
+       /* now <needed> will represent the size to store *all* messages. The
+        * atomic store may unlock a subsequent thread waiting for this one.
+        */
+       HA_ATOMIC_STORE(&cell.needed_tot, needed);
+
+       /* OK now we're the queue leader, it's our job to try to get ownership
+        * of the tail, if we succeeded above, we don't even enter the loop. If
+        * we failed, we set ourselves at the top the queue, waiting for the
+        * tail to be unlocked again. We stop doing that if another thread
+        * comes in and becomes the leader in turn.
         */
        while (1) {
+               /* Wait for another thread to take the lead or for the tail to
+                * be available again. It's critical to be read-only in this
+                * loop so as not to lose time synchronizing cache lines. Also,
+                * we must detect a new leader ASAP so that the fewest possible
+                * threads check the tail.
+                */
+               while ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) {
+                       next_cell = HA_ATOMIC_LOAD(ring_queue_ptr);
+                       if (next_cell != &cell)
+                               goto wait_for_flush; // another thread arrived, we should go to wait now
+                       __ha_cpu_relax_for_read();
+               }
+
+               /* the tail is available again and we're still the leader, try
+                * again.
+                */
+               if (HA_ATOMIC_LOAD(ring_queue_ptr) != &cell)
+                       goto wait_for_flush; // another thread arrived, we should go to wait now
+
+               /* OK the queue is locked, let's attempt to get the tail lock */
                tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK);
-               if (!(tail_ofs & RING_TAIL_LOCK))
+
+               /* did we get it ? */
+               if (!(tail_ofs & RING_TAIL_LOCK)) {
+                       /* Yes! Are we still legitimate to get it ? We'll know by
+                        * trying to reset the head and replace it with ourselves.
+                        */
+                       curr_cell = &cell;
+                       if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) {
+                               /* oops, no, let's give it back to another thread and wait */
+                               HA_ATOMIC_STORE(tail_ptr, tail_ofs);
+                               goto wait_for_flush; // another thread arrived, we should go to wait now
+                       }
+                       /* Won! */
                        break;
-               pl_wait_unlock_long(tail_ptr, RING_TAIL_LOCK);
+               }
        }
 
        head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
@@ -253,9 +309,10 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
        /* this is the byte before tail, it contains the users count */
        lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
 
-       /* take the lock on the area. Normally we're alone */
+       /* Take the lock on the area. We're guaranteed to be the only writer
+        * here.
+        */
        readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE);
-       BUG_ON_HOT(readers == RING_WRITING_SIZE);
 
        vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
 
@@ -278,7 +335,7 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
 
        /* now let's update the buffer with the new tail if our message will fit */
        new_tail_ofs = tail_ofs;
-       if (vp_size(v1, v2) <= ring_size - needed - 1) {
+       if (vp_size(v1, v2) <= ring_size - needed - 1 - 1) {
                vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
 
                /* update the new space in the buffer */
@@ -292,54 +349,76 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
                /* reset next read counter before releasing writers */
                HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0);
        }
+       else {
+               /* release readers right now, before writing the tail, so as
+                * not to expose the readers count byte to another writer.
+                */
+               HA_ATOMIC_STORE(lock_ptr, readers);
+       }
 
        /* and release other writers */
        HA_ATOMIC_STORE(tail_ptr, new_tail_ofs);
 
-       if (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
-               /* we had to stop due to readers blocking the head,
-                * let's give up.
-                */
-               goto done_update_buf;
-       }
-
-       /* now focus on free room between the old and the new tail */
        vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
 
-       /* let's write the message size */
-       vp_put_varint(&v1, &v2, msglen);
+       /* the list stops on a NULL */
+       for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_XCHG(&curr_cell->next, curr_cell)) {
+               if (unlikely(tail_ofs == new_tail_ofs)) {
+                       /* report that this message was dropped.
+                        * Note: for now this must not happen!
+                        */
+                       HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
+                       continue;
+               }
 
-       /* then write the messages */
-       msglen = 0;
-       for (i = 0; i < npfx; i++) {
-               size_t len = pfx[i].len;
+               maxlen = curr_cell->maxlen;
+               pfx = curr_cell->pfx;
+               npfx = curr_cell->npfx;
+               msg = curr_cell->msg;
+               nmsg = curr_cell->nmsg;
+
+               /* let's write the message size */
+               vp_put_varint(&v1, &v2, maxlen);
+
+               /* then write the messages */
+               msglen = 0;
+               for (i = 0; i < npfx; i++) {
+                       size_t len = pfx[i].len;
+
+                       if (len + msglen > maxlen)
+                               len = maxlen - msglen;
+                       if (len)
+                               vp_putblk(&v1, &v2, pfx[i].ptr, len);
+                       msglen += len;
+               }
 
-               if (len + msglen > maxlen)
-                       len = maxlen - msglen;
-               if (len)
-                       vp_putblk(&v1, &v2, pfx[i].ptr, len);
-               msglen += len;
-       }
+               for (i = 0; i < nmsg; i++) {
+                       size_t len = msg[i].len;
 
-       for (i = 0; i < nmsg; i++) {
-               size_t len = msg[i].len;
+                       if (len + msglen > maxlen)
+                               len = maxlen - msglen;
+                       if (len)
+                               vp_putblk(&v1, &v2, msg[i].ptr, len);
+                       msglen += len;
+               }
 
-               if (len + msglen > maxlen)
-                       len = maxlen - msglen;
-               if (len)
-                       vp_putblk(&v1, &v2, msg[i].ptr, len);
-               msglen += len;
+               /* for all but the last message we need to write the
+                * readers count byte.
+                */
+               if (curr_cell->next)
+                       vp_putchr(&v1, &v2, 0);
        }
 
-       /* we must not write the read counter, it was already done,
-        * plus we could ruin the one of the next writer.
+       /* we must not write the trailing read counter, it was already done,
+        * plus we could ruin the one of the next writer. Let's just unlock
+        * the front.
         */
-       sent = lenlen + msglen + 1;
-       BUG_ON_HOT(sent != needed);
 
- done_update_buf:
        /* unlock the message area */
-       HA_ATOMIC_STORE(lock_ptr, readers);
+       if (new_tail_ofs != tail_ofs)
+               HA_ATOMIC_STORE(lock_ptr, readers);
+
+       sent = cell.to_send_self;
 
        /* notify potential readers */
        if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) {
@@ -357,6 +436,21 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
 
  leave:
        return sent;
+
+ wait_for_flush:
+       /* The leader will write our own pointer in the cell's next to
+        * mark it as released. Let's wait for this.
+        */
+       do {
+               next_cell = HA_ATOMIC_LOAD(&cell.next);
+       } while (next_cell != &cell && __ha_cpu_relax_for_read());
+
+       /* OK our message was queued. Retrieving the sent size in the ring cell
+        * allows another leader thread to zero it if it finally couldn't send
+        * it (should only happen when using too small ring buffers to store
+        * all competing threads' messages at once).
+        */
+       return HA_ATOMIC_LOAD(&cell.to_send_self);
 }
 
 /* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is