]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: sink: now report the number of dropped events on output
authorWilly Tarreau <w@1wt.eu>
Tue, 27 Aug 2019 14:41:06 +0000 (16:41 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 27 Aug 2019 15:14:19 +0000 (17:14 +0200)
The principle is that when emitting a message, if some dropped events
were logged, we first attempt to report this counter before going
further. This is done under an exclusive lock while all logs are
produced under a shared lock. This ensures that the dropped line is
accurately reported and doesn't accidently arrive after a later
event.

include/proto/sink.h
include/types/sink.h
src/sink.c

index 0d8dd7dfc59f51b782cb48dcf428fbf3756271e4..29475364d0db5dc94394cdf702a2952457a0d4a3 100644 (file)
@@ -29,7 +29,46 @@ extern struct list sink_list;
 
 struct sink *sink_find(const char *name);
 struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd);
-void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg);
+ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg);
+int sink_announce_dropped(struct sink *sink);
+
+
+/* tries to send <nmsg> message parts (up to 8, ignored above) from message
+ * array <msg> to sink <sink>. Formating according to the sink's preference is
+ * done here. Lost messages are accounted for in the sink's counter. If there
+ * were lost messages, an attempt is first made to indicate it.
+ */
+static inline void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg)
+{
+       ssize_t sent;
+
+       if (unlikely(sink->ctx.dropped > 0)) {
+               /* We need to take an exclusive lock so that other producers
+                * don't do the same thing at the same time and above all we
+                * want to be sure others have finished sending their messages
+                * so that the dropped event arrives exactly at the right
+                * position.
+                */
+               HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.lock);
+               sent = sink_announce_dropped(sink);
+               HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.lock);
+
+               if (!sent) {
+                       /* we failed, we don't try to send our log as if it
+                        * would pass by chance, we'd get disordered events.
+                        */
+                       goto fail;
+               }
+       }
+
+       HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &sink->ctx.lock);
+       sent = __sink_write(sink, msg, nmsg);
+       HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &sink->ctx.lock);
+
+ fail:
+       if (unlikely(sent <= 0))
+               HA_ATOMIC_ADD(&sink->ctx.dropped, 1);
+}
 
 #endif /* _PROTO_SINK_H */
 
index 5465ca63dd5fc9b6d366438b48dd389160683581..e36a2965394f4836725d5bf8a2011e69c7e93e1e 100644 (file)
@@ -60,9 +60,9 @@ struct sink {
        uint8_t syslog_minlvl;     // used by syslog & short formats
        uint32_t maxlen;           // max message length (truncated above)
        struct {
+               __decl_hathreads(HA_RWLOCK_T lock); // shared/excl for dropped
                struct ring *ring;    // used by ring buffer and STRM sender
                unsigned int dropped; // dropped events since last one.
-               __decl_hathreads(HA_RWLOCK_T lock); // used by some types
                int fd;               // fd num for FD type sink
        } ctx;
 };
index 5746c4bc2dea316f17d0d2fde9436403e34f1ec7..8d2ce91830039ffb06a621815022a2d02e02c33d 100644 (file)
@@ -145,14 +145,16 @@ struct sink *sink_new_buf(const char *name, const char *desc, enum sink_fmt fmt,
 
 /* tries to send <nmsg> message parts (up to 8, ignored above) from message
  * array <msg> to sink <sink>. Formating according to the sink's preference is
- * done here. Lost messages are accounted for in the sink's counter.
+ * done here. Lost messages are NOT accounted for. It is preferable to call
+ * sink_write() instead which will also try to emit the number of dropped
+ * messages when there are any. It returns >0 if it could write anything,
+ * <=0 otherwise.
  */
-void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg)
+ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg)
 {
        char short_hdr[4];
        struct ist pfx[4];
        size_t npfx = 0;
-       size_t sent = 0;
 
        if (sink->fmt == SINK_FMT_SHORT) {
                short_hdr[0] = '<';
@@ -165,17 +167,36 @@ void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg)
         }
 
        if (sink->type == SINK_TYPE_FD) {
-               sent = fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
-               /* sent > 0 if the message was delivered */
+               return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
        }
        else if (sink->type == SINK_TYPE_BUFFER) {
-               sent = ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
-               /* sent > 0 if the message was delivered */
+               return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
        }
+       return 0;
+}
 
-       /* account for errors now */
-       if (sent <= 0)
-               HA_ATOMIC_ADD(&sink->ctx.dropped, 1);
+/* Tries to emit a message indicating the number of dropped events. In case of
+ * success, the amount of drops is reduced by as much. It's supposed to be
+ * called under an exclusive lock on the sink to avoid multiple produces doing
+ * the same. On success, >0 is returned, otherwise <=0 on failure.
+ */
+int sink_announce_dropped(struct sink *sink)
+{
+       unsigned int dropped;
+       struct buffer msg;
+       struct ist msgvec[1];
+       char logbuf[64];
+
+       while (unlikely((dropped = sink->ctx.dropped) > 0)) {
+               chunk_init(&msg, logbuf, sizeof(logbuf));
+               chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
+               msgvec[0] = ist2(msg.area, msg.data);
+               if (__sink_write(sink, msgvec, 1) <= 0)
+                       return 0;
+               /* success! */
+               HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
+       }
+       return 1;
 }
 
 /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */