ring_area = (uint8_t *)ring->storage->area;
ring_size = ring->storage->size;
- tail_ofs = ring_tail(ring);
- head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
-
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
* dropped events because we'd take a reference on the oldest message
* and keep it while being scheduled. Thus instead let's take it the
* first time we enter here so that we have a chance to pass many
* existing messages before grabbing a reference to a location. This
- * value cannot be produced after initialization.
+ * value cannot be produced after initialization. The first offset
+ * needs to be taken under isolation as it must not move while we're
+ * trying to catch it.
*/
if (unlikely(*ofs_ptr == ~0)) {
+ thread_isolate();
+
+ head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
+ tail_ofs = ring_tail(ring);
+
if (flags & RING_WF_SEEK_NEW) {
/* going to the end means looking at tail-1 */
head_ofs = tail_ofs + ring_size - 1;
head_ofs -= ring_size;
}
- /* make ctx->ofs relative to the beginning of the buffer now */
- *ofs_ptr = head_ofs;
-
- /* and reserve our slot here (inc readers count) */
+ /* reserve our slot here (inc readers count) */
do {
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
+
+ thread_release();
+
+ /* store this precious offset in our context, and we're done */
+ *ofs_ptr = head_ofs;
}
/* we have the guarantee we can restart from our own head */
head_ofs = *ofs_ptr;
BUG_ON(head_ofs >= ring_size);
+ /* the tail will continue to move but we're getting a safe value
+ * here that will continue to work.
+ */
+ tail_ofs = ring_tail(ring);
+
/* we keep track of where we were and we don't release it before
* we've protected the next place.
*/