/* returns the number of bytes in the ring */
static inline size_t ring_data(const struct ring *ring)
{
- return ((ring->storage->head <= ring->storage->tail) ?
- 0 : ring->storage->size) + ring->storage->tail - ring->storage->head;
+ size_t tail = HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK;
+
+ return ((ring->storage->head <= tail) ?
+ 0 : ring->storage->size) + tail - ring->storage->head;
}
/* returns the allocated size in bytes for the ring */
return ring->storage->head;
}
-/* returns the tail offset of the ring */
+/* returns the ring's tail offset without the lock bit */
static inline size_t ring_tail(const struct ring *ring)
{
- return ring->storage->tail;
+ return HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK;
}
/* duplicates ring <src> over ring <dst> for no more than <max> bytes or no
ring_area = ring->storage->area;
ring_size = ring->storage->size;
- HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (needed + 1 > ring_size)
goto leave;
- head_ofs = ring_head(ring);
- tail_ofs = ring_tail(ring);
+ /* try to get exclusivity on the ring's tail. For this we set the
+ * tail's highest bit, and the one that gets it wins. Many tests were
+ * run on this and the approach below is optimal for armv8.1 atomics,
+ * second-to-optimal with both x86_64 and second-to-optimal on armv8.0.
+ * x86_64 would benefit slightly more from an xchg() which would
+ * require the readers to loop during changes, and armv8.0 is slightly
+ * better there as well (+5%). The CAS is bad for both (requires a
+ * preload), though it might degrade better on large x86 compared to
+ * a busy loop that the compiler would implement for the FETCH_OR.
+ * Alternately we could kill 12 upper bits on a 64-bit tail ofs and
+ * use XADD. Not tested, and would require to undo or watch for the
+ * change (use it as a ticket).
+ */
+ while (1) {
+ tail_ofs = HA_ATOMIC_FETCH_OR(&ring->storage->tail, RING_TAIL_LOCK);
+ if (!(tail_ofs & RING_TAIL_LOCK))
+ break;
+ pl_wait_unlock_long(&ring->storage->tail, RING_TAIL_LOCK);
+ }
+
+ HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
+
+ head_ofs = ring->storage->head;
/* this is the byte before tail, it contains the users count */
lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
/* update the new space in the buffer */
ring->storage->head = head_ofs;
- ring->storage->tail = tail_ofs;
+ HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs);
/* notify potential readers */
if (sent) {
appctx_wakeup(appctx);
}
- leave:
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+ leave:
return sent;
}
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
- head_ofs = ring->storage->head;
- tail_ofs = ring->storage->tail;
+ head_ofs = ring_head(ring);
+ tail_ofs = ring_tail(ring);
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
goto err;
}
+ if (size > RING_TAIL_LOCK) {
+ ha_alert("parsing [%s:%d] : too large size '%llu' for new sink buffer, the limit on this platform is %llu bytes.\n", file, linenum, (ullong)size, (ullong)RING_TAIL_LOCK);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto err;
+ }
+
if (cfg_sink->store) {
ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;