]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: ring: implement a wait mode for watchers
authorWilly Tarreau <w@1wt.eu>
Fri, 30 Aug 2019 09:17:01 +0000 (11:17 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 30 Aug 2019 09:58:58 +0000 (11:58 +0200)
Now it is possible for a reader to subscribe and wait for new events
sent to a ring buffer. When new events are written to a ring buffer,
the applets that are subscribed are woken up to display new events.
For now we only support this with the CLI applet called by "show events"
since the I/O handler is indeed a CLI I/O handler. But it's not
complicated to add other mechanisms to consume events and forward them
to external log servers for example. The wait mode is enabled by adding
"-w" after "show events <sink>". An extra "-n" was added to directly
seek to new events only.

doc/management.txt
include/types/ring.h
src/ring.c
src/sink.c

index 6709cee310e5214ddcad9b5fd2aee1e1688375b0..9973747ac12b59bd78dd4aa9dd1ebdd864790bf2 100644 (file)
@@ -1964,10 +1964,16 @@ show errors [<iid>|<proxy>] [request|response]
     is the slash ('/') in header name "header/bizarre", which is not a valid
     HTTP character for a header name.
 
-show events [<sink>]
+show events [<sink>] [-w] [-n]
   With no option, this lists all known event sinks and their types. With an
   option, it will dump all available events in the designated sink if it is of
-  type buffer.
+  type buffer. If option "-w" is passed after the sink name, then once the end
+  of the buffer is reached, the command will wait for new events and display
+  them. It is possible to stop the operation by entering any input (which will
+  be discarded) or by closing the session. Finally, option "-n" is used to
+  directly seek to the end of the buffer, which is often convenient when
+  combined with "-w" to only report new events. For convenience, "-wn" or "-nw"
+  may be used to enable both options at once.
 
 show fd [<fd>]
   Dump the list of either all open file descriptors or just the one number <fd>
index 3d69b22525f84ca2fe9872405b24ade4a980b748..86c507ff82e5532b4286e776c6bd32cc4c61415c 100644 (file)
@@ -96,6 +96,7 @@
 struct ring {
        struct buffer buf;   // storage area
        size_t ofs;          // absolute offset in history of the buffer's head
+       struct list waiters; // list of waiters, for now, CLI "show event"
        __decl_hathreads(HA_RWLOCK_T lock);
        int readers_count;
 };
index a38148cf1d7173c452ee3ee54af78df0e8207296..f0886b086662d98ef15b00ed4f143a8cb26b3ffe 100644 (file)
@@ -48,6 +48,7 @@ struct ring *ring_new(size_t size)
                goto fail;
 
        HA_RWLOCK_INIT(&ring->lock);
+       LIST_INIT(&ring->waiters);
        ring->readers_count = 0;
        ring->ofs = 0;
        ring->buf = b_make(area, size, 0, 0);
@@ -113,6 +114,7 @@ void ring_free(struct ring *ring)
 ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
 {
        struct buffer *buf = &ring->buf;
+       struct appctx *appctx;
        size_t totlen = 0;
        size_t lenlen;
        size_t dellen;
@@ -187,6 +189,11 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
 
        *b_tail(buf) = 0; buf->data++;; // new read counter
        sent = lenlen + totlen + 1;
+
+       /* notify potential readers */
+       list_for_each_entry(appctx, &ring->waiters, ctx.cli.l0)
+               appctx_wakeup(appctx);
+
  done_buf:
        HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
        return sent;
@@ -216,9 +223,12 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx)
 
 /* This function dumps all events from the ring whose pointer is in <p0> into
  * the appctx's output buffer, and takes from <o0> the seek offset into the
- * buffer's history (0 for oldest known event). It returns 0 if the output
- * buffer is full and it needs to be called again, otherwise non-zero. It is
- * meant to be used with cli_release_show_ring() to clean up.
+ * buffer's history (0 for oldest known event). It looks at <i0> for boolean
+ * options: bit0 means it must wait for new data or any key to be pressed. Bit1
+ * means it must seek directly to the end to wait for new contents. It returns
+ * 0 if the output buffer or events are missing is full and it needs to be
+ * called again, otherwise non-zero. It is meant to be used with
+ * cli_release_show_ring() to clean up.
  */
 int cli_io_handler_show_ring(struct appctx *appctx)
 {
@@ -235,6 +245,8 @@ int cli_io_handler_show_ring(struct appctx *appctx)
 
        HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
 
+       LIST_DEL_INIT(&appctx->ctx.cli.l0);
+
        /* explanation for the initialization below: it would be better to do
         * this in the parsing function but this would occasionally result in
         * dropped events because we'd take a reference on the oldest message
@@ -244,8 +256,14 @@ int cli_io_handler_show_ring(struct appctx *appctx)
         * value cannot be produced after initialization.
         */
        if (unlikely(ofs == ~0)) {
-               HA_ATOMIC_ADD(b_head(buf), 1);
-               ofs = ring->ofs;
+               ofs = 0;
+
+               /* going to the end means looking at tail-1 */
+               if (appctx->ctx.cli.i0 & 2)
+                       ofs += b_data(buf) - 1;
+
+               HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+               ofs += ring->ofs;
        }
 
        /* we were already there, adjust the offset to be relative to
@@ -291,6 +309,20 @@ int cli_io_handler_show_ring(struct appctx *appctx)
        ofs += ring->ofs;
        appctx->ctx.cli.o0 = ofs;
        HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+       if (ret && (appctx->ctx.cli.i0 & 1)) {
+               /* we've drained everything and are configured to wait for more
+                * data or an event (keypress, close)
+                */
+               if (!si_oc(si)->output && !(si_oc(si)->flags & CF_SHUTW)) {
+                       /* let's be woken up once new data arrive */
+                       LIST_ADDQ(&ring->waiters, &appctx->ctx.cli.l0);
+                       si_rx_endp_done(si);
+                       ret = 0;
+               }
+               /* always drain all the request */
+               co_skip(si_oc(si), si_oc(si)->output);
+       }
        return ret;
 }
 
@@ -308,6 +340,7 @@ void cli_io_release_show_ring(struct appctx *appctx)
                /* reader was still attached */
                ofs -= ring->ofs;
                BUG_ON(ofs >= b_size(&ring->buf));
+               LIST_DEL_INIT(&appctx->ctx.cli.l0);
                HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1);
        }
        HA_ATOMIC_SUB(&ring->readers_count, 1);
index 8d2ce91830039ffb06a621815022a2d02e02c33d..ed49062d8e6bfbfa02d22198951a62654f2721d0 100644 (file)
@@ -203,12 +203,13 @@ int sink_announce_dropped(struct sink *sink)
 static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
 {
        struct sink *sink;
+       int arg;
 
        args++; // make args[1] the 1st arg
 
        if (!*args[1]) {
                /* no arg => report the list of supported sink */
-               chunk_printf(&trash, "Supported events sinks:\n");
+               chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
                list_for_each_entry(sink, &sink_list, sink_list) {
                        chunk_appendf(&trash, "    %-10s : type=%s, %u dropped, %s\n",
                                      sink->name,
@@ -232,6 +233,16 @@ static int cli_parse_show_events(char **args, char *payload, struct appctx *appc
        if (sink->type != SINK_TYPE_BUFFER)
                return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
 
+       for (arg = 2; *args[arg]; arg++) {
+               if (strcmp(args[arg], "-w") == 0)
+                       appctx->ctx.cli.i0 |= 1; // wait mode
+               else if (strcmp(args[arg], "-n") == 0)
+                       appctx->ctx.cli.i0 |= 2; // seek to new
+               else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
+                       appctx->ctx.cli.i0 |= 3; // seek to new + wait
+               else
+                       return cli_err(appctx, "unknown option");
+       }
        return ring_attach_cli(sink->ctx.ring, appctx);
 }