struct ring *ring = sink->ctx.ring;
struct buffer *buf = &ring->buf;
uint64_t msg_len;
- size_t len, cnt, ofs;
+ size_t len, cnt, ofs, last_ofs;
int ret = 0;
/* if stopping was requested, close immediately */
HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
sft->ofs = ofs;
+ last_ofs = ring->ofs;
}
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
/* let's be woken up once new data arrive */
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
+ ofs = ring->ofs;
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
- applet_have_no_more_data(appctx);
+ if (ofs != last_ofs) {
+ /* more data was added into the ring between the
+ * unlock and the lock, and the writer might not
+ * have seen us. We need to reschedule a read.
+ */
+ applet_have_more_data(appctx);
+ } else
+ applet_have_no_more_data(appctx);
}
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);