struct sink_forward_target *sft = appctx->svcctx;
struct sink *sink = sft->sink;
struct ring *ring = sink->ctx.ring;
- struct buffer *buf = &ring->buf;
- uint64_t msg_len;
- size_t len, cnt, ofs, last_ofs;
- ssize_t copied;
+ size_t ofs, last_ofs;
int ret = 0;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
LIST_DEL_INIT(&appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
- HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
-
- /* explanation for the initialization below: it would be better to do
- * this in the parsing function but this would occasionally result in
- * dropped events because we'd take a reference on the oldest message
- * and keep it while being scheduled. Thus instead let's take it the
- * first time we enter here so that we have a chance to pass many
- * existing messages before grabbing a reference to a location. This
- * value cannot be produced after initialization.
- */
- if (unlikely(sft->ofs == ~0)) {
- sft->ofs = b_peek_ofs(buf, 0);
- HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
- }
-
- /* we were already there, adjust the offset to be relative to
- * the buffer's head and remove us from the counter.
- */
- ofs = sft->ofs - b_head_ofs(buf);
- if (sft->ofs < b_head_ofs(buf))
- ofs += b_size(buf);
- BUG_ON(ofs >= buf->size);
- HA_ATOMIC_DEC(b_peek(buf, ofs));
-
- /* in this loop, ofs always points to the counter byte that precedes
- * the message so that we can take our reference there if we have to
- * stop before the end (ret=0).
- */
- ret = 1;
- while (ofs + 1 < b_data(buf)) {
- cnt = 1;
- len = b_peek_varint(buf, ofs + cnt, &msg_len);
- if (!len)
- break;
- cnt += len;
- BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
-
- copied = applet_append_line(appctx, buf, ofs + cnt, msg_len);
- if (copied == -2) {
- /* too large a message to ever fit, let's skip it */
- ofs += cnt + msg_len;
- continue;
- }
- else if (copied == -1) {
- /* output full */
- ret = 0;
- break;
- }
- ofs += cnt + msg_len;
- }
-
- HA_ATOMIC_INC(b_peek(buf, ofs));
- last_ofs = b_tail_ofs(buf);
- sft->ofs = b_peek_ofs(buf, ofs);
-
- HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
+ ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line);
if (ret) {
/* let's be woken up once new data arrive */
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
- ofs = b_tail_ofs(buf);
+ ofs = b_tail_ofs(&ring->buf);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
if (ofs != last_ofs) {
/* more data was added into the ring between the