respond, it will prevent old messages from being purged and may block new
messages from being inserted into the ring. The proper way to send messages
to multiple servers is to use one distinct ring per log server, not to
- attach multiple servers to the same ring.
+ attach multiple servers to the same ring. Note that specific server directive
+ "log-proto" is used to set the protocol used to send messages.
size <size>
This is the optional size in bytes for the ring-buffer. Default value is
size 32764
timeout connect 5s
timeout server 10s
- server mysyslogsrv 127.0.0.1:6514
+ server mysyslogsrv 127.0.0.1:6514 log-proto octet-count
4. Proxies
global "spread-checks" keyword. This makes sense for instance when a lot
of backends use the same servers.
+log-proto <logproto>
+ The "log-proto" specifies the protocol used to forward event messages to
+ a server configured in a ring section. Possible values are "legacy"
+ and "octet-count" corresponding respectively to "Non-transparent-framing"
+ and "Octet counting" in rfc6587. "legacy" is the default.
+
maxconn <maxconn>
The "maxconn" parameter specifies the maximal number of concurrent
connections that will be sent to this server. If the number of incoming
si_ic(si)->flags |= CF_READ_NULL;
}
+/*
+ * IO Handler to handle message push to syslog tcp server
+ * using octet counting frames
+ */
+static void sink_forward_oc_io_handler(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ struct stream *s = si_strm(si);
+ struct sink *sink = strm_fe(s)->parent;
+ struct sink_forward_target *sft = appctx->ctx.sft.ptr;
+ struct ring *ring = sink->ctx.ring;
+ struct buffer *buf = &ring->buf;
+ uint64_t msg_len;
+ size_t len, cnt, ofs;
+ int ret = 0;
+ char *p;
+
+ /* if stopping was requested, close immediatly */
+ if (unlikely(stopping))
+ goto close;
+
+ /* for rex because it seems reset to timeout
+ * and we don't want expire on this case
+ * with a syslog server
+ */
+ si_oc(si)->rex = TICK_ETERNITY;
+ /* rto should not change but it seems the case */
+ si_oc(si)->rto = TICK_ETERNITY;
+
+ /* an error was detected */
+ if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
+ goto close;
+
+ /* con closed by server side */
+ if ((si_oc(si)->flags & CF_SHUTW))
+ goto close;
+
+ /* if the connection is not established, inform the stream that we want
+ * to be notified whenever the connection completes.
+ */
+ if (si_opposite(si)->state < SI_ST_EST) {
+ si_cant_get(si);
+ si_rx_conn_blk(si);
+ si_rx_endp_more(si);
+ return;
+ }
+
+ HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+ if (appctx != sft->appctx) {
+ HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+ goto close;
+ }
+ ofs = sft->ofs;
+
+ HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+ LIST_DEL_INIT(&appctx->wait_entry);
+ HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+ HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
+
+ /* explanation for the initialization below: it would be better to do
+ * this in the parsing function but this would occasionally result in
+ * dropped events because we'd take a reference on the oldest message
+ * and keep it while being scheduled. Thus instead let's take it the
+ * first time we enter here so that we have a chance to pass many
+ * existing messages before grabbing a reference to a location. This
+ * value cannot be produced after initialization.
+ */
+ if (unlikely(ofs == ~0)) {
+ ofs = 0;
+
+ HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+ ofs += ring->ofs;
+ }
+
+ /* we were already there, adjust the offset to be relative to
+ * the buffer's head and remove us from the counter.
+ */
+ ofs -= ring->ofs;
+ BUG_ON(ofs >= buf->size);
+ HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
+
+ /* in this loop, ofs always points to the counter byte that precedes
+ * the message so that we can take our reference there if we have to
+ * stop before the end (ret=0).
+ */
+ if (si_opposite(si)->state == SI_ST_EST) {
+ ret = 1;
+ while (ofs + 1 < b_data(buf)) {
+ cnt = 1;
+ len = b_peek_varint(buf, ofs + cnt, &msg_len);
+ if (!len)
+ break;
+ cnt += len;
+ BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
+
+ chunk_reset(&trash);
+ p = ulltoa(msg_len, trash.area, b_size(&trash));
+ if (p) {
+ trash.data = (p - trash.area) + 1;
+ *p = ' ';
+ }
+
+ if (!p || (trash.data + msg_len > b_size(&trash))) {
+ /* too large a message to ever fit, let's skip it */
+ ofs += cnt + msg_len;
+ continue;
+ }
+
+ trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
+
+ if (ci_putchk(si_ic(si), &trash) == -1) {
+ si_rx_room_blk(si);
+ ret = 0;
+ break;
+ }
+ ofs += cnt + msg_len;
+ }
+
+ HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+ ofs += ring->ofs;
+ sft->ofs = ofs;
+ }
+ HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+ if (ret) {
+ /* let's be woken up once new data arrive */
+ HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+ LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
+ HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+ si_rx_endp_done(si);
+ }
+ HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+
+ /* always drain data from server */
+ co_skip(si_oc(si), si_oc(si)->output);
+ return;
+
+close:
+ si_shutw(si);
+ si_shutr(si);
+ si_ic(si)->flags |= CF_READ_NULL;
+}
+
void __sink_forward_session_deinit(struct sink_forward_target *sft)
{
struct stream_interface *si;
.release = sink_forward_session_release,
};
+static struct applet sink_forward_oc_applet = {
+ .obj_type = OBJ_TYPE_APPLET,
+ .name = "<SINKFWDOC>", /* used for logging */
+ .fct = sink_forward_oc_io_handler,
+ .release = sink_forward_session_release,
+};
+
/*
* Create a new peer session in assigned state (connect will start automatically)
*/
struct appctx *appctx;
struct session *sess;
struct stream *s;
+ struct applet *applet = &sink_forward_applet;
+
+ if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
+ applet = &sink_forward_oc_applet;
- appctx = appctx_new(&sink_forward_applet, tid_bit);
+ appctx = appctx_new(applet, tid_bit);
if (!appctx)
goto out_close;