#include <common/compat.h>
#include <common/config.h>
#include <common/hathreads.h>
+#include <types/applet.h>
+#include <proto/cli.h>
#include <proto/ring.h>
+#include <proto/stream_interface.h>
/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
* allocation failure.
return sent;
}
+/* Tries to attach CLI handler <appctx> as a new reader on ring <ring>. This is
+ * meant to be used when registering a CLI function to dump a buffer, so it
+ * returns zero on success, or non-zero on failure with a message in the appctx
+ * CLI context.
+ */
+int ring_attach_cli(struct ring *ring, struct appctx *appctx)
+{
+ int users = ring->readers_count;
+
+ do {
+ if (users >= 1)
+ return cli_err(appctx,
+ "Sorry, too many watchers (255) on this ring buffer. "
+ "What could it have so interesting to attract so many watchers ?");
+
+ } while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1));
+
+ appctx->ctx.cli.p0 = ring;
+ appctx->ctx.cli.p1 = 0; // start from the oldest event
+ return 0;
+}
+
+/* This function dumps all events from the ring whose pointer is in <p0> into
+ * the appctx's output buffer, and takes from <p1> the seek offset into the
+ * buffer's history (0 for oldest known event). It returns 0 if the output
+ * buffer is full and it needs to be called again, otherwise non-zero. It is
+ * meant to be used with cli_release_show_ring() to clean up.
+ */
+int cli_io_handler_show_ring(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ struct ring *ring = appctx->ctx.cli.p0;
+ struct buffer *buf = &ring->buf;
+ size_t ofs = (unsigned long)appctx->ctx.cli.p1;
+ uint64_t msg_len;
+ size_t len, cnt;
+ int ret;
+
+ if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
+ return 1;
+
+ HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
+
+ /* explanation for the initialization below: it would be better to do
+ * this in the parsing function but this would occasionally result in
+ * dropped events because we'd take a reference on the oldest message
+ * and keep it while being scheduled. Thus instead let's take it the
+ * first time we enter here so that we have a chance to pass many
+ * existing messages before grabbing a reference to a location.
+ */
+ if (unlikely(!ofs)) {
+ HA_ATOMIC_ADD(b_head(buf), 1);
+ ofs += ring->ofs;
+ }
+
+ /* we were already there, adjust the offset to be relative to
+ * the buffer's head and remove us from the counter.
+ */
+ ofs -= ring->ofs;
+ BUG_ON(ofs >= buf->size);
+ HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
+
+ /* in this loop, ofs always points to the counter byte that precedes
+ * the message so that we can take our reference there if we have to
+ * stop before the end (ret=0).
+ */
+ ret = 1;
+ while (ofs + 1 < b_data(buf)) {
+ cnt = 1;
+ len = b_peek_varint(buf, ofs + cnt, &msg_len);
+ if (!len)
+ break;
+ cnt += len;
+ BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
+
+ if (unlikely(msg_len + 1 > b_size(&trash))) {
+ /* too large a message to ever fit, let's skip it */
+ ofs += cnt + msg_len;
+ continue;
+ }
+
+ chunk_reset(&trash);
+ len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
+ trash.data += len;
+ trash.area[trash.data++] = '\n';
+
+ if (ci_putchk(si_ic(si), &trash) == -1) {
+ si_rx_room_blk(si);
+ ret = 0;
+ break;
+ }
+ ofs += cnt + msg_len;
+ }
+
+ HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+ ofs += ring->ofs;
+ appctx->ctx.cli.p1 = (void *)ofs;
+ HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+ return ret;
+}
+
+/* must be called after cli_io_handler_show_ring() above */
+void cli_io_release_show_ring(struct appctx *appctx)
+{
+ struct ring *ring = appctx->ctx.cli.p0;
+ size_t ofs = (unsigned long)appctx->ctx.cli.p1;
+
+ if (!ring)
+ return;
+
+ HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
+ ofs -= ring->ofs;
+ BUG_ON(ofs >= b_size(&ring->buf));
+ HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1);
+ HA_ATOMIC_SUB(&ring->readers_count, 1);
+ HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+}
+
+
/*
* Local variables:
* c-indent-level: 8