vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
- /* the list stops on a NULL */
- for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_XCHG(&curr_cell->next, curr_cell)) {
- if (unlikely(tail_ofs == new_tail_ofs)) {
- /* report that this message was dropped.
- * Note: for now this must not happen!
+ if (likely(tail_ofs != new_tail_ofs)) {
+ /* the list stops on a NULL */
+ for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_LOAD(&curr_cell->next)) {
+ maxlen = curr_cell->maxlen;
+ pfx = curr_cell->pfx;
+ npfx = curr_cell->npfx;
+ msg = curr_cell->msg;
+ nmsg = curr_cell->nmsg;
+
+ /* let's write the message size */
+ vp_put_varint(&v1, &v2, maxlen);
+
+ /* then write the messages */
+ msglen = 0;
+ for (i = 0; i < npfx; i++) {
+ size_t len = pfx[i].len;
+
+ if (len + msglen > maxlen)
+ len = maxlen - msglen;
+ if (len)
+ vp_putblk(&v1, &v2, pfx[i].ptr, len);
+ msglen += len;
+ }
+
+ for (i = 0; i < nmsg; i++) {
+ size_t len = msg[i].len;
+
+ if (len + msglen > maxlen)
+ len = maxlen - msglen;
+ if (len)
+ vp_putblk(&v1, &v2, msg[i].ptr, len);
+ msglen += len;
+ }
+
+ /* for all but the last message we need to write the
+ * readers count byte.
*/
- HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
- continue;
+ if (curr_cell->next)
+ vp_putchr(&v1, &v2, 0);
}
- maxlen = curr_cell->maxlen;
- pfx = curr_cell->pfx;
- npfx = curr_cell->npfx;
- msg = curr_cell->msg;
- nmsg = curr_cell->nmsg;
-
- /* let's write the message size */
- vp_put_varint(&v1, &v2, maxlen);
-
- /* then write the messages */
- msglen = 0;
- for (i = 0; i < npfx; i++) {
- size_t len = pfx[i].len;
-
- if (len + msglen > maxlen)
- len = maxlen - msglen;
- if (len)
- vp_putblk(&v1, &v2, pfx[i].ptr, len);
- msglen += len;
+ /* now release */
+ for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
+ next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
+ HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
}
- for (i = 0; i < nmsg; i++) {
- size_t len = msg[i].len;
-
- if (len + msglen > maxlen)
- len = maxlen - msglen;
- if (len)
- vp_putblk(&v1, &v2, msg[i].ptr, len);
- msglen += len;
+ /* unlock the message area */
+ HA_ATOMIC_STORE(lock_ptr, readers);
+ } else {
+ /* messages were dropped, notify about this and release them */
+ for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
+ next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
+ HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
+ HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
}
-
- /* for all but the last message we need to write the
- * readers count byte.
- */
- if (curr_cell->next)
- vp_putchr(&v1, &v2, 0);
}
/* we must not write the trailing read counter, it was already done,
- * plus we could ruin the one of the next writer. Let's just unlock
- * the front.
+ * plus we could ruin the one of the next writer. And the front was
+ * unlocked either at the top if the ring was full, or just above if it
+ * could be properly filled.
*/
- /* unlock the message area */
- if (new_tail_ofs != tail_ofs)
- HA_ATOMIC_STORE(lock_ptr, readers);
-
sent = cell.to_send_self;
/* notify potential readers */