*/
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
{
+ struct ring_wait_cell **ring_queue_ptr = DISGUISE(&ring->queue[ti->ring_queue].ptr);
+ struct ring_wait_cell cell, *next_cell, *curr_cell;
size_t *tail_ptr = &ring->storage->tail;
size_t head_ofs, tail_ofs, new_tail_ofs;
size_t ring_size;
if (needed + 1 > ring_size)
goto leave;
- /* try to get exclusivity on the ring's tail. For this we set the
- * tail's highest bit, and the one that gets it wins. Many tests were
- * run on this and the approach below is optimal for armv8.1 atomics,
- * second-to-optimal with both x86_64 and second-to-optimal on armv8.0.
- * x86_64 would benefit slightly more from an xchg() which would
- * require the readers to loop during changes, and armv8.0 is slightly
- * better there as well (+5%). The CAS is bad for both (requires a
- * preload), though it might degrade better on large x86 compared to
- * a busy loop that the compiler would implement for the FETCH_OR.
- * Alternately we could kill 12 upper bits on a 64-bit tail ofs and
- * use XADD. Not tested, and would require to undo or watch for the
- * change (use it as a ticket).
+ cell.to_send_self = needed;
+ cell.needed_tot = 0; // only when non-zero the cell is considered ready.
+ cell.maxlen = msglen;
+ cell.pfx = pfx;
+ cell.npfx = npfx;
+ cell.msg = msg;
+ cell.nmsg = nmsg;
+
+ /* insert our cell into the queue before the previous one. We may have
+ * to wait a bit if the queue's leader is attempting an election to win
+ * the tail, hence the busy value (should be rare enough).
+ */
+ next_cell = HA_ATOMIC_XCHG(ring_queue_ptr, &cell);
+
+ /* let's add the cumulated size of pending messages to ours */
+ cell.next = next_cell;
+ if (next_cell) {
+ size_t next_needed;
+
+ while ((next_needed = HA_ATOMIC_LOAD(&next_cell->needed_tot)) == 0)
+ __ha_cpu_relax_for_read();
+ needed += next_needed;
+ }
+
+ /* now <needed> will represent the size to store *all* messages. The
+ * atomic store may unlock a subsequent thread waiting for this one.
+ */
+ HA_ATOMIC_STORE(&cell.needed_tot, needed);
+
+ /* OK now we're the queue leader, it's our job to try to get ownership
+ * of the tail, if we succeeded above, we don't even enter the loop. If
+ * we failed, we set ourselves at the top the queue, waiting for the
+ * tail to be unlocked again. We stop doing that if another thread
+ * comes in and becomes the leader in turn.
*/
while (1) {
+ /* Wait for another thread to take the lead or for the tail to
+ * be available again. It's critical to be read-only in this
+ * loop so as not to lose time synchronizing cache lines. Also,
+ * we must detect a new leader ASAP so that the fewest possible
+ * threads check the tail.
+ */
+ while ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) {
+ next_cell = HA_ATOMIC_LOAD(ring_queue_ptr);
+ if (next_cell != &cell)
+ goto wait_for_flush; // another thread arrived, we should go to wait now
+ __ha_cpu_relax_for_read();
+ }
+
+ /* the tail is available again and we're still the leader, try
+ * again.
+ */
+ if (HA_ATOMIC_LOAD(ring_queue_ptr) != &cell)
+ goto wait_for_flush; // another thread arrived, we should go to wait now
+
+ /* OK the queue is locked, let's attempt to get the tail lock */
tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK);
- if (!(tail_ofs & RING_TAIL_LOCK))
+
+ /* did we get it ? */
+ if (!(tail_ofs & RING_TAIL_LOCK)) {
+ /* Yes! Are we still legitimate to get it ? We'll know by
+ * trying to reset the head and replace it with ourselves.
+ */
+ curr_cell = &cell;
+ if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) {
+ /* oops, no, let's give it back to another thread and wait */
+ HA_ATOMIC_STORE(tail_ptr, tail_ofs);
+ goto wait_for_flush; // another thread arrived, we should go to wait now
+ }
+ /* Won! */
break;
- pl_wait_unlock_long(tail_ptr, RING_TAIL_LOCK);
+ }
}
head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
/* this is the byte before tail, it contains the users count */
lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
- /* take the lock on the area. Normally we're alone */
+ /* Take the lock on the area. We're guaranteed to be the only writer
+ * here.
+ */
readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE);
- BUG_ON_HOT(readers == RING_WRITING_SIZE);
vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
/* now let's update the buffer with the new tail if our message will fit */
new_tail_ofs = tail_ofs;
- if (vp_size(v1, v2) <= ring_size - needed - 1) {
+ if (vp_size(v1, v2) <= ring_size - needed - 1 - 1) {
vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
/* update the new space in the buffer */
/* reset next read counter before releasing writers */
HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0);
}
+ else {
+ /* release readers right now, before writing the tail, so as
+ * not to expose the readers count byte to another writer.
+ */
+ HA_ATOMIC_STORE(lock_ptr, readers);
+ }
/* and release other writers */
HA_ATOMIC_STORE(tail_ptr, new_tail_ofs);
- if (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
- /* we had to stop due to readers blocking the head,
- * let's give up.
- */
- goto done_update_buf;
- }
-
- /* now focus on free room between the old and the new tail */
vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
- /* let's write the message size */
- vp_put_varint(&v1, &v2, msglen);
+ /* the list stops on a NULL */
+ for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_XCHG(&curr_cell->next, curr_cell)) {
+ if (unlikely(tail_ofs == new_tail_ofs)) {
+ /* report that this message was dropped.
+ * Note: for now this must not happen!
+ */
+ HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
+ continue;
+ }
- /* then write the messages */
- msglen = 0;
- for (i = 0; i < npfx; i++) {
- size_t len = pfx[i].len;
+ maxlen = curr_cell->maxlen;
+ pfx = curr_cell->pfx;
+ npfx = curr_cell->npfx;
+ msg = curr_cell->msg;
+ nmsg = curr_cell->nmsg;
+
+ /* let's write the message size */
+ vp_put_varint(&v1, &v2, maxlen);
+
+ /* then write the messages */
+ msglen = 0;
+ for (i = 0; i < npfx; i++) {
+ size_t len = pfx[i].len;
+
+ if (len + msglen > maxlen)
+ len = maxlen - msglen;
+ if (len)
+ vp_putblk(&v1, &v2, pfx[i].ptr, len);
+ msglen += len;
+ }
- if (len + msglen > maxlen)
- len = maxlen - msglen;
- if (len)
- vp_putblk(&v1, &v2, pfx[i].ptr, len);
- msglen += len;
- }
+ for (i = 0; i < nmsg; i++) {
+ size_t len = msg[i].len;
- for (i = 0; i < nmsg; i++) {
- size_t len = msg[i].len;
+ if (len + msglen > maxlen)
+ len = maxlen - msglen;
+ if (len)
+ vp_putblk(&v1, &v2, msg[i].ptr, len);
+ msglen += len;
+ }
- if (len + msglen > maxlen)
- len = maxlen - msglen;
- if (len)
- vp_putblk(&v1, &v2, msg[i].ptr, len);
- msglen += len;
+ /* for all but the last message we need to write the
+ * readers count byte.
+ */
+ if (curr_cell->next)
+ vp_putchr(&v1, &v2, 0);
}
- /* we must not write the read counter, it was already done,
- * plus we could ruin the one of the next writer.
+ /* we must not write the trailing read counter, it was already done,
+ * plus we could ruin the one of the next writer. Let's just unlock
+ * the front.
*/
- sent = lenlen + msglen + 1;
- BUG_ON_HOT(sent != needed);
- done_update_buf:
/* unlock the message area */
- HA_ATOMIC_STORE(lock_ptr, readers);
+ if (new_tail_ofs != tail_ofs)
+ HA_ATOMIC_STORE(lock_ptr, readers);
+
+ sent = cell.to_send_self;
/* notify potential readers */
if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) {
leave:
return sent;
+
+ wait_for_flush:
+ /* The leader will write our own pointer in the cell's next to
+ * mark it as released. Let's wait for this.
+ */
+ do {
+ next_cell = HA_ATOMIC_LOAD(&cell.next);
+ } while (next_cell != &cell && __ha_cpu_relax_for_read());
+
+ /* OK our message was queued. Retrieving the sent size in the ring cell
+ * allows another leader thread to zero it if it finally couldn't send
+ * it (should only happen when using too small ring buffers to store
+ * all competing threads' messages at once).
+ */
+ return HA_ATOMIC_LOAD(&cell.to_send_self);
}
/* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is