]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: ring: add server statement to forward messages from a ring
authorEmeric Brun <ebrun@haproxy.com>
Thu, 28 May 2020 09:13:15 +0000 (11:13 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 31 May 2020 08:46:13 +0000 (10:46 +0200)
This patch adds new statement "server" into ring section, and the
related "timeout connect" and "timeout server".

server <name> <address> [param*]
  Used to configure a syslog tcp server to forward messages from ring buffer.
  This supports for all "server" parameters found in 5.2 paragraph.
  Some of these parameters are irrelevant for "ring" sections.

timeout connect <timeout>
  Set the maximum time to wait for a connection attempt to a server to succeed.

  Arguments :
    <timeout> is the timeout value specified in milliseconds by default, but
              can be in any other unit if the number is suffixed by the unit,
              as explained at the top of this document.

timeout server <timeout>
  Set the maximum time for pending data staying into output buffer.

  Arguments :
    <timeout> is the timeout value specified in milliseconds by default, but
              can be in any other unit if the number is suffixed by the unit,
              as explained at the top of this document.

  Example:
    global
        log ring@myring local7

    ring myring
        description "My local buffer"
        format rfc3164
        maxlen 1200
        size 32764
        timeout connect 5s
        timeout server 10s
        server mysyslogsrv 127.0.0.1:6514

doc/configuration.txt
include/common/hathreads.h
include/types/applet.h
include/types/sink.h
src/sink.c

index 8a67f4d12139386ff004aea952d39178a2ef50b7..15f9668581cae2f5078715b9204cd0f5866d8a2c 100644 (file)
@@ -2596,10 +2596,38 @@ maxlen <length>
   including formatted header. If an event message is longer than
   <length>, it will be truncated to this length.
 
+server <name> <address> [param*]
+  Used to configure a syslog tcp server to forward messages from ring buffer.
+  This supports for all "server" parameters found in 5.2 paragraph. Some of
+  these parameters are irrelevant for "ring" sections. Important point: there
+  is little reason to add more than one server to a ring, because all servers
+  will receive the exact same copy of the ring contents, and as such the ring
+  will progress at the speed of the slowest server. If one server does not
+  respond, it will prevent old messages from being purged and may block new
+  messages from being inserted into the ring. The proper way to send messages
+  to multiple servers is to use one distinct ring per log server, not to
+  attach multiple servers to the same ring.
+
 size <size>
   This is the optional size in bytes for the ring-buffer. Default value is
   set to BUFSIZE.
 
+timeout connect <timeout>
+  Set the maximum time to wait for a connection attempt to a server to succeed.
+
+  Arguments :
+    <timeout> is the timeout value specified in milliseconds by default, but
+              can be in any other unit if the number is suffixed by the unit,
+              as explained at the top of this document.
+
+timeout server <timeout>
+  Set the maximum time for pending data staying into output buffer.
+
+  Arguments :
+    <timeout> is the timeout value specified in milliseconds by default, but
+              can be in any other unit if the number is suffixed by the unit,
+              as explained at the top of this document.
+
   Example:
     global
         log ring@myring local7
@@ -2609,6 +2637,9 @@ size <size>
         format rfc3164
         maxlen 1200
         size 32764
+        timeout connect 5s
+        timeout server 10s
+        server mysyslogsrv 127.0.0.1:6514
 
 
 4. Proxies
index ae1009a9f9ab84ba360a40795e0cb100a2f8cb87..45ec1d2e06f0d7af6bb1ee887bce4b2fd5a95a1e 100644 (file)
@@ -610,6 +610,7 @@ enum lock_label {
        PROTO_LOCK,
        CKCH_LOCK,
        SNI_LOCK,
+       SFT_LOCK, /* sink forward target */
        OTHER_LOCK,
        LOCK_LABELS
 };
@@ -727,6 +728,7 @@ static inline const char *lock_label(enum lock_label label)
        case PROTO_LOCK:           return "PROTO";
        case CKCH_LOCK:            return "CKCH";
        case SNI_LOCK:             return "SNI";
+       case SFT_LOCK:             return "SFT";
        case OTHER_LOCK:           return "OTHER";
        case LOCK_LABELS:          break; /* keep compiler happy */
        };
index 013c5023d177a92dee1b7269eec569bbad999323..a4e22e336ba60c3435557909317fcd2fa4707508 100644 (file)
@@ -178,6 +178,10 @@ struct appctx {
                        struct ckch_store *new_ckchs;
                        struct ckch_inst *next_ckchi;
                } ssl;
+               struct {
+                       void *ptr;
+               } sft; /* sink forward target */
+
                /* NOTE: please add regular applet contexts (ie: not
                 * CLI-specific ones) above, before "cli".
                 */
index ef11096793ce759f340f4e6ba6a5acc8279be5b2..029c20dfe813ef3e7aeb500e41510f1a63183095 100644 (file)
@@ -49,6 +49,14 @@ enum sink_fmt {
        SINK_FMT_RFC5424,   // extended syslog
 };
 
+struct sink_forward_target {
+       struct server *srv;    // used server
+       struct appctx *appctx; // appctx of current session
+       size_t ofs;            // ring buffer reader offset
+       __decl_hathreads(HA_SPINLOCK_T lock); // lock to protect current struct
+       struct sink_forward_target *next;
+};
+
 /* describes the configuration and current state of an event sink */
 struct sink {
        struct list sink_list;     // position in the sink list
@@ -57,6 +65,10 @@ struct sink {
        enum sink_fmt fmt;         // format expected by the sink
        enum sink_type type;       // type of storage
        uint32_t maxlen;           // max message length (truncated above)
+       struct proxy* forward_px;  // proxy used to forward
+       struct sink_forward_target *sft; // sink forward targets
+       struct task *forward_task; // task to handle forward targets conns
+       struct sig_handler *forward_sighandler; /* signal handler */
        struct {
                __decl_hathreads(HA_RWLOCK_T lock); // shared/excl for dropped
                struct ring *ring;    // used by ring buffer and STRM sender
index 50f035263a9dadd62a395105ae02e72943e202fa..e7a0c020000f668ab23ad4f31b0d9121b78056ec 100644 (file)
@@ -27,6 +27,7 @@
 #include <proto/cli.h>
 #include <proto/log.h>
 #include <proto/ring.h>
+#include <proto/signal.h>
 #include <proto/sink.h>
 #include <proto/stream_interface.h>
 
@@ -57,7 +58,7 @@ static struct sink *__sink_new(const char *name, const char *desc, enum sink_fmt
        if (sink)
                goto end;
 
-       sink = malloc(sizeof(*sink));
+       sink = calloc(1, sizeof(*sink));
        if (!sink)
                goto end;
 
@@ -316,6 +317,319 @@ static int cli_parse_show_events(char **args, char *payload, struct appctx *appc
        return ring_attach_cli(sink->ctx.ring, appctx);
 }
 
+/* Pre-configures a ring proxy to emmit connections */
+void sink_setup_proxy(struct proxy *px)
+{
+       px->last_change = now.tv_sec;
+       px->cap = PR_CAP_FE | PR_CAP_BE;
+       px->maxconn = 0;
+       px->conn_retries = 1;
+       px->timeout.server = TICK_ETERNITY;
+       px->timeout.client = TICK_ETERNITY;
+       px->timeout.connect = TICK_ETERNITY;
+       px->accept = NULL;
+       px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
+       px->bind_proc = 0; /* will be filled by users */
+}
+
+/*
+ * IO Handler to handle message push to syslog tcp server
+ */
+static void sink_forward_io_handler(struct appctx *appctx)
+{
+       struct stream_interface *si = appctx->owner;
+       struct stream *s = si_strm(si);
+       struct sink *sink = strm_fe(s)->parent;
+       struct sink_forward_target *sft = appctx->ctx.sft.ptr;
+       struct ring *ring = sink->ctx.ring;
+       struct buffer *buf = &ring->buf;
+       uint64_t msg_len;
+       size_t len, cnt, ofs;
+       int ret = 0;
+
+       /* if stopping was requested, close immediatly */
+       if (unlikely(stopping))
+               goto close;
+
+       /* for rex because it seems reset to timeout
+        * and we don't want expire on this case
+        * with a syslog server
+        */
+       si_oc(si)->rex = TICK_ETERNITY;
+       /* rto should not change but it seems the case */
+       si_oc(si)->rto = TICK_ETERNITY;
+
+       /* an error was detected */
+       if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
+               goto close;
+
+       /* con closed by server side */
+       if ((si_oc(si)->flags & CF_SHUTW))
+               goto close;
+
+       /* if the connection is not established, inform the stream that we want
+        * to be notified whenever the connection completes.
+        */
+       if (si_opposite(si)->state < SI_ST_EST) {
+               si_cant_get(si);
+               si_rx_conn_blk(si);
+               si_rx_endp_more(si);
+               return;
+       }
+
+       HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+       if (appctx != sft->appctx) {
+               HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+               goto close;
+       }
+       ofs = sft->ofs;
+
+       HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+       LIST_DEL_INIT(&appctx->wait_entry);
+       HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+       HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
+
+       /* explanation for the initialization below: it would be better to do
+        * this in the parsing function but this would occasionally result in
+        * dropped events because we'd take a reference on the oldest message
+        * and keep it while being scheduled. Thus instead let's take it the
+        * first time we enter here so that we have a chance to pass many
+        * existing messages before grabbing a reference to a location. This
+        * value cannot be produced after initialization.
+        */
+       if (unlikely(ofs == ~0)) {
+               ofs = 0;
+
+               HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+               ofs += ring->ofs;
+       }
+
+       /* we were already there, adjust the offset to be relative to
+        * the buffer's head and remove us from the counter.
+        */
+       ofs -= ring->ofs;
+       BUG_ON(ofs >= buf->size);
+       HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
+
+       /* in this loop, ofs always points to the counter byte that precedes
+        * the message so that we can take our reference there if we have to
+        * stop before the end (ret=0).
+        */
+       if (si_opposite(si)->state == SI_ST_EST) {
+               ret = 1;
+               while (ofs + 1 < b_data(buf)) {
+                       cnt = 1;
+                       len = b_peek_varint(buf, ofs + cnt, &msg_len);
+                       if (!len)
+                               break;
+                       cnt += len;
+                       BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
+
+                       if (unlikely(msg_len + 1 > b_size(&trash))) {
+                               /* too large a message to ever fit, let's skip it */
+                               ofs += cnt + msg_len;
+                               continue;
+                       }
+
+                       chunk_reset(&trash);
+                       len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
+                       trash.data += len;
+                       trash.area[trash.data++] = '\n';
+
+                       if (ci_putchk(si_ic(si), &trash) == -1) {
+                               si_rx_room_blk(si);
+                               ret = 0;
+                               break;
+                       }
+                       ofs += cnt + msg_len;
+               }
+
+               HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+               ofs += ring->ofs;
+               sft->ofs = ofs;
+       }
+       HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+       if (ret) {
+               /* let's be woken up once new data arrive */
+               HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+               LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
+               HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+               si_rx_endp_done(si);
+       }
+       HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+
+       /* always drain data from server */
+       co_skip(si_oc(si), si_oc(si)->output);
+       return;
+
+close:
+       si_shutw(si);
+       si_shutr(si);
+       si_ic(si)->flags |= CF_READ_NULL;
+}
+
+void __sink_forward_session_deinit(struct sink_forward_target *sft)
+{
+       struct stream_interface *si;
+       struct stream *s;
+       struct sink *sink;
+
+       if (!sft->appctx)
+               return;
+
+       si = sft->appctx->owner;
+       if (!si)
+               return;
+
+       s = si_strm(si);
+       if (!s)
+               return;
+
+       sink = strm_fe(s)->parent;
+       if (!sink)
+               return;
+
+       HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
+       LIST_DEL_INIT(&sft->appctx->wait_entry);
+       HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
+
+       sft->appctx = NULL;
+       task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
+}
+
+
+static void sink_forward_session_release(struct appctx *appctx)
+{
+       struct sink_forward_target *sft = appctx->ctx.peers.ptr;
+
+       if (!sft)
+               return;
+
+       HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+       if (sft->appctx == appctx)
+               __sink_forward_session_deinit(sft);
+       HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+}
+
+static struct applet sink_forward_applet = {
+       .obj_type = OBJ_TYPE_APPLET,
+       .name = "<SINKFWD>", /* used for logging */
+       .fct = sink_forward_io_handler,
+       .release = sink_forward_session_release,
+};
+
+/*
+ * Create a new peer session in assigned state (connect will start automatically)
+ */
+static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
+{
+       struct proxy *p = sink->forward_px;
+       struct appctx *appctx;
+       struct session *sess;
+       struct stream *s;
+
+       appctx = appctx_new(&sink_forward_applet, tid_bit);
+       if (!appctx)
+               goto out_close;
+
+       appctx->ctx.sft.ptr = (void *)sft;
+
+       sess = session_new(p, NULL, &appctx->obj_type);
+       if (!sess) {
+               ha_alert("out of memory in peer_session_create().\n");
+               goto out_free_appctx;
+       }
+
+       if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
+               ha_alert("Failed to initialize stream in peer_session_create().\n");
+               goto out_free_sess;
+       }
+
+
+       s->target = &sft->srv->obj_type;
+       if (!sockaddr_alloc(&s->target_addr))
+               goto out_free_strm;
+       *s->target_addr = sft->srv->addr;
+       s->flags = SF_ASSIGNED|SF_ADDR_SET;
+       s->si[1].flags |= SI_FL_NOLINGER;
+
+       s->do_log = NULL;
+       s->uniq_id = 0;
+
+       s->res.flags |= CF_READ_DONTWAIT;
+       /* for rto and rex to eternity to not expire on idle recv:
+        * We are using a syslog server.
+        */
+       s->res.rto = TICK_ETERNITY;
+       s->res.rex = TICK_ETERNITY;
+       sft->appctx = appctx;
+       task_wakeup(s->task, TASK_WOKEN_INIT);
+       return appctx;
+
+       /* Error unrolling */
+ out_free_strm:
+       LIST_DEL(&s->list);
+       pool_free(pool_head_stream, s);
+ out_free_sess:
+       session_free(sess);
+ out_free_appctx:
+       appctx_free(appctx);
+ out_close:
+       return NULL;
+}
+
+/*
+ * Task to handle connctions to forward servers
+ */
+static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
+{
+       struct sink *sink = (struct sink *)context;
+       struct sink_forward_target *sft = sink->sft;
+
+       task->expire = TICK_ETERNITY;
+
+       if (!stopping) {
+               while (sft) {
+                       HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+                       /* if appctx is NULL, start a new session */
+                       if (!sft->appctx)
+                               sft->appctx = sink_forward_session_create(sink, sft);
+                       HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+                       sft = sft->next;
+               }
+       }
+       else {
+               while (sft) {
+                       HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+                       /* awake applet to perform a clean close */
+                       if (sft->appctx)
+                               appctx_wakeup(sft->appctx);
+                       HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+                       sft = sft->next;
+               }
+       }
+
+       return task;
+}
+/*
+ * Init task to manage connctions to forward servers
+ *
+ * returns 0 in case of error.
+ */
+int sink_init_forward(struct sink *sink)
+{
+       sink->forward_task = task_new(MAX_THREADS_MASK);
+       if (!sink->forward_task)
+               return 0;
+
+       sink->forward_task->process = process_sink_forward;
+       sink->forward_task->context = (void *)sink;
+       sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
+       task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
+       return 1;
+}
 /*
  * Parse "ring" section and create corresponding sink buffer.
  *
@@ -327,6 +641,7 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
        int err_code = 0;
        const char *inv;
        size_t size = BUFSIZE;
+       struct proxy *p;
 
        if (strcmp(args[0], "ring") == 0) { /* new peers section */
                if (!*args[1]) {
@@ -354,6 +669,22 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
                        err_code |= ERR_ALERT | ERR_FATAL;
                        goto err;
                }
+
+               /* allocate new proxy to handle forwards */
+               p = calloc(1, sizeof *p);
+               if (!p) {
+                       ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+                       err_code |= ERR_ALERT | ERR_FATAL;
+                       goto err;
+               }
+
+               init_new_proxy(p);
+               sink_setup_proxy(p);
+               p->parent = cfg_sink;
+               p->id = strdup(args[1]);
+               p->conf.args.file = p->conf.file = strdup(file);
+               p->conf.args.line = p->conf.line = linenum;
+               cfg_sink->forward_px = p;
        }
        else if (strcmp(args[0], "size") == 0) {
                size = atol(args[1]);
@@ -370,6 +701,52 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
                        goto err;
                }
        }
+       else if (strcmp(args[0],"server") == 0) {
+               err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0);
+       }
+       else if (strcmp(args[0],"timeout") == 0) {
+               if (!cfg_sink || !cfg_sink->forward_px) {
+                       ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
+                       err_code |= ERR_ALERT | ERR_FATAL;
+                       goto err;
+               }
+
+                if (strcmp(args[1], "connect") == 0 ||
+                   strcmp(args[1], "server") == 0) {
+                       const char *res;
+                       unsigned int tout;
+
+                       if (!*args[2]) {
+                               ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
+                                        file, linenum, args[0], args[1]);
+                               err_code |= ERR_ALERT | ERR_FATAL;
+                               goto err;
+                       }
+                       res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
+                       if (res == PARSE_TIME_OVER) {
+                               ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
+                                        file, linenum, args[2], args[0], args[1]);
+                               err_code |= ERR_ALERT | ERR_FATAL;
+                               goto err;
+                       }
+                       else if (res == PARSE_TIME_UNDER) {
+                               ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
+                                        file, linenum, args[2], args[0], args[1]);
+                               err_code |= ERR_ALERT | ERR_FATAL;
+                               goto err;
+                       }
+                       else if (res) {
+                               ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
+                                        file, linenum, *res, args[0], args[1]);
+                               err_code |= ERR_ALERT | ERR_FATAL;
+                               goto err;
+                       }
+                        if (args[1][2] == 'c')
+                                cfg_sink->forward_px->timeout.connect = tout;
+                        else
+                                cfg_sink->forward_px->timeout.server = tout;
+               }
+       }
        else if (strcmp(args[0],"format") == 0) {
                if (!cfg_sink) {
                        ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
@@ -456,6 +833,7 @@ err:
 int cfg_post_parse_ring()
 {
        int err_code = 0;
+       struct server *srv;
 
        if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
                if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
@@ -464,8 +842,44 @@ int cfg_post_parse_ring()
                        cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
                        err_code |= ERR_ALERT;
                }
-       }
 
+               /* prepare forward server descriptors */
+               if (cfg_sink->forward_px) {
+                       srv = cfg_sink->forward_px->srv;
+                       while (srv) {
+                               struct sink_forward_target *sft;
+                               /* init ssl if needed */
+                               if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
+                                       if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
+                                               ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
+                                               err_code |= ERR_ALERT | ERR_FATAL;
+                                       }
+                               }
+
+                               /* allocate sink_forward_target descriptor */
+                               sft = calloc(1, sizeof(*sft));
+                               if (!sft) {
+                                       ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
+                                       err_code |= ERR_ALERT | ERR_FATAL;
+                                       break;
+                               }
+                               sft->srv = srv;
+                               sft->appctx = NULL;
+                               sft->ofs = ~0; /* init ring offset */
+                               sft->next = cfg_sink->sft;
+                               HA_SPIN_INIT(&sft->lock);
+
+                               /* mark server attached to the ring */
+                               if (!ring_attach(cfg_sink->ctx.ring)) {
+                                       ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
+                                       err_code |= ERR_ALERT | ERR_FATAL;
+                               }
+                               cfg_sink->sft = sft;
+                               srv = srv->next;
+                       }
+                       sink_init_forward(cfg_sink);
+               }
+       }
        cfg_sink = NULL;
 
        return err_code;