HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (ofs != ~0) {
/* reader was still attached */
- char *area = ring_area(ring);
+ uint8_t *area = (uint8_t *)ring_area(ring);
+ uint8_t readers;
BUG_ON(ofs >= ring_size(ring));
LIST_DEL_INIT(&appctx->wait_entry);
- HA_ATOMIC_DEC(area + ofs);
+
+ /* dec readers count */
+ do {
+ readers = _HA_ATOMIC_LOAD(area + ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax());
}
HA_ATOMIC_DEC(&ring->readers_count);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
{
size_t head_ofs, tail_ofs;
size_t ring_size;
- char *ring_area;
+ uint8_t *ring_area;
struct ist v1, v2;
uint64_t msg_len;
size_t len, cnt;
ssize_t copied;
+ uint8_t readers;
int ret;
- ring_area = ring->storage->area;
+ ring_area = (uint8_t *)ring->storage->area;
ring_size = ring->storage->size;
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
/* make ctx->ofs relative to the beginning of the buffer now */
*ofs_ptr = head_ofs;
- /* and reserve our slot here */
- HA_ATOMIC_INC(ring_area + head_ofs);
+ /* and reserve our slot here (inc readers count) */
+ do {
+ readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
}
/* we have the guarantee we can restart from our own head */
head_ofs = *ofs_ptr;
BUG_ON(head_ofs >= ring_size);
- HA_ATOMIC_DEC(ring_area + head_ofs);
+ /* dec readers count */
+ do {
+ readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax());
/* in this loop, head_ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
* origin, while pos is relative to the ring's head.
*/
ret = 1;
- vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
+ vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs);
while (1) {
if (vp_size(v1, v2) <= 1) {
vp_skip(&v1, &v2, cnt + msg_len);
}
- vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+ vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
+
+ /* inc readers count */
+ do {
+ readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
+ } while ((readers > RING_MAX_READERS ||
+ !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
- HA_ATOMIC_INC(ring_area + head_ofs);
if (last_ofs_ptr)
*last_ofs_ptr = tail_ofs;
*ofs_ptr = head_ofs;