From 975564784fee089d6d03d8e85e93f2bff5d7253b Mon Sep 17 00:00:00 2001 From: Emeric Brun Date: Sat, 30 May 2020 01:42:45 +0200 Subject: [PATCH] MEDIUM: ring: add new srv statement to support octet counting forward log-proto The "log-proto" specifies the protocol used to forward event messages to a server configured in a ring section. Possible values are "legacy" and "octet-count" corresponding respectively to "Non-transparent-framing" and "Octet counting" in rfc6587. "legacy" is the default. Notes: a separated io_handler was created to avoid per messages test and to prepare code to set different log protocols such as request- response based ones. --- doc/configuration.txt | 11 ++- include/types/server.h | 7 ++ src/server.c | 14 ++++ src/sink.c | 157 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 186 insertions(+), 3 deletions(-) diff --git a/doc/configuration.txt b/doc/configuration.txt index 15f9668581..0a6086c2c4 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -2606,7 +2606,8 @@ server
[param*] respond, it will prevent old messages from being purged and may block new messages from being inserted into the ring. The proper way to send messages to multiple servers is to use one distinct ring per log server, not to - attach multiple servers to the same ring. + attach multiple servers to the same ring. Note that specific server directive + "log-proto" is used to set the protocol used to send messages. size This is the optional size in bytes for the ring-buffer. Default value is @@ -2639,7 +2640,7 @@ timeout server size 32764 timeout connect 5s timeout server 10s - server mysyslogsrv 127.0.0.1:6514 + server mysyslogsrv 127.0.0.1:6514 log-proto octet-count 4. Proxies @@ -13080,6 +13081,12 @@ downinter global "spread-checks" keyword. This makes sense for instance when a lot of backends use the same servers. +log-proto + The "log-proto" specifies the protocol used to forward event messages to + a server configured in a ring section. Possible values are "legacy" + and "octet-count" corresponding respectively to "Non-transparent-framing" + and "Octet counting" in rfc6587. "legacy" is the default. + maxconn The "maxconn" parameter specifies the maximal number of concurrent connections that will be sent to this server. If the number of incoming diff --git a/include/types/server.h b/include/types/server.h index 80119a3832..7b1ae5f9cc 100644 --- a/include/types/server.h +++ b/include/types/server.h @@ -177,6 +177,12 @@ enum srv_initaddr { #define SRV_SSL_O_EARLY_DATA 0x400 /* Allow using early data */ #endif +/* log servers ring's protocols options */ +enum srv_log_proto { + SRV_LOG_PROTO_LEGACY, // messages on TCP separated by LF + SRV_LOG_PROTO_OCTET_COUNTING, // TCP frames: MSGLEN SP MSG +}; + /* The server names dictionary */ extern struct dict server_name_dict; @@ -291,6 +297,7 @@ struct server { char *hostname; /* server hostname */ struct sockaddr_storage init_addr; /* plain IP address specified on the init-addr line */ unsigned int init_addr_methods; /* initial address setting, 3-bit per method, ends at 0, enough to store 10 entries */ + enum srv_log_proto log_proto; /* used proto to emmit messages on server lines from ring section */ #ifdef USE_OPENSSL char *sni_expr; /* Temporary variable to store a sample expression for SNI */ diff --git a/src/server.c b/src/server.c index e4044cd7a6..e710b4810c 100644 --- a/src/server.c +++ b/src/server.c @@ -2275,6 +2275,20 @@ int parse_server(const char *file, int linenum, char **args, struct proxy *curpr newsrv->uweight = newsrv->iweight = w; cur_arg += 2; } + else if (!strcmp(args[cur_arg], "log-proto")) { + if (!strcmp(args[cur_arg + 1], "legacy")) + newsrv->log_proto = SRV_LOG_PROTO_LEGACY; + else if (!strcmp(args[cur_arg + 1], "octet-count")) + newsrv->log_proto = SRV_LOG_PROTO_OCTET_COUNTING; + else { + ha_alert("parsing [%s:%d]: '%s' expects one of 'legacy' or " + "'octet-count' but got '%s'\n", + file, linenum, args[cur_arg], args[cur_arg + 1]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + cur_arg += 2; + } else if (!strcmp(args[cur_arg], "minconn")) { newsrv->minconn = atol(args[cur_arg + 1]); cur_arg += 2; diff --git a/src/sink.c b/src/sink.c index e7a0c02000..9eca4814b7 100644 --- a/src/sink.c +++ b/src/sink.c @@ -470,6 +470,150 @@ close: si_ic(si)->flags |= CF_READ_NULL; } +/* + * IO Handler to handle message push to syslog tcp server + * using octet counting frames + */ +static void sink_forward_oc_io_handler(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct stream *s = si_strm(si); + struct sink *sink = strm_fe(s)->parent; + struct sink_forward_target *sft = appctx->ctx.sft.ptr; + struct ring *ring = sink->ctx.ring; + struct buffer *buf = &ring->buf; + uint64_t msg_len; + size_t len, cnt, ofs; + int ret = 0; + char *p; + + /* if stopping was requested, close immediatly */ + if (unlikely(stopping)) + goto close; + + /* for rex because it seems reset to timeout + * and we don't want expire on this case + * with a syslog server + */ + si_oc(si)->rex = TICK_ETERNITY; + /* rto should not change but it seems the case */ + si_oc(si)->rto = TICK_ETERNITY; + + /* an error was detected */ + if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) + goto close; + + /* con closed by server side */ + if ((si_oc(si)->flags & CF_SHUTW)) + goto close; + + /* if the connection is not established, inform the stream that we want + * to be notified whenever the connection completes. + */ + if (si_opposite(si)->state < SI_ST_EST) { + si_cant_get(si); + si_rx_conn_blk(si); + si_rx_endp_more(si); + return; + } + + HA_SPIN_LOCK(SFT_LOCK, &sft->lock); + if (appctx != sft->appctx) { + HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); + goto close; + } + ofs = sft->ofs; + + HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); + LIST_DEL_INIT(&appctx->wait_entry); + HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); + + HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); + + /* explanation for the initialization below: it would be better to do + * this in the parsing function but this would occasionally result in + * dropped events because we'd take a reference on the oldest message + * and keep it while being scheduled. Thus instead let's take it the + * first time we enter here so that we have a chance to pass many + * existing messages before grabbing a reference to a location. This + * value cannot be produced after initialization. + */ + if (unlikely(ofs == ~0)) { + ofs = 0; + + HA_ATOMIC_ADD(b_peek(buf, ofs), 1); + ofs += ring->ofs; + } + + /* we were already there, adjust the offset to be relative to + * the buffer's head and remove us from the counter. + */ + ofs -= ring->ofs; + BUG_ON(ofs >= buf->size); + HA_ATOMIC_SUB(b_peek(buf, ofs), 1); + + /* in this loop, ofs always points to the counter byte that precedes + * the message so that we can take our reference there if we have to + * stop before the end (ret=0). + */ + if (si_opposite(si)->state == SI_ST_EST) { + ret = 1; + while (ofs + 1 < b_data(buf)) { + cnt = 1; + len = b_peek_varint(buf, ofs + cnt, &msg_len); + if (!len) + break; + cnt += len; + BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); + + chunk_reset(&trash); + p = ulltoa(msg_len, trash.area, b_size(&trash)); + if (p) { + trash.data = (p - trash.area) + 1; + *p = ' '; + } + + if (!p || (trash.data + msg_len > b_size(&trash))) { + /* too large a message to ever fit, let's skip it */ + ofs += cnt + msg_len; + continue; + } + + trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt); + + if (ci_putchk(si_ic(si), &trash) == -1) { + si_rx_room_blk(si); + ret = 0; + break; + } + ofs += cnt + msg_len; + } + + HA_ATOMIC_ADD(b_peek(buf, ofs), 1); + ofs += ring->ofs; + sft->ofs = ofs; + } + HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); + + if (ret) { + /* let's be woken up once new data arrive */ + HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); + LIST_ADDQ(&ring->waiters, &appctx->wait_entry); + HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); + si_rx_endp_done(si); + } + HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); + + /* always drain data from server */ + co_skip(si_oc(si), si_oc(si)->output); + return; + +close: + si_shutw(si); + si_shutr(si); + si_ic(si)->flags |= CF_READ_NULL; +} + void __sink_forward_session_deinit(struct sink_forward_target *sft) { struct stream_interface *si; @@ -520,6 +664,13 @@ static struct applet sink_forward_applet = { .release = sink_forward_session_release, }; +static struct applet sink_forward_oc_applet = { + .obj_type = OBJ_TYPE_APPLET, + .name = "", /* used for logging */ + .fct = sink_forward_oc_io_handler, + .release = sink_forward_session_release, +}; + /* * Create a new peer session in assigned state (connect will start automatically) */ @@ -529,8 +680,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink struct appctx *appctx; struct session *sess; struct stream *s; + struct applet *applet = &sink_forward_applet; + + if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) + applet = &sink_forward_oc_applet; - appctx = appctx_new(&sink_forward_applet, tid_bit); + appctx = appctx_new(applet, tid_bit); if (!appctx) goto out_close; -- 2.39.5