#include <proto/cli.h>
#include <proto/log.h>
#include <proto/ring.h>
+#include <proto/signal.h>
#include <proto/sink.h>
#include <proto/stream_interface.h>
if (sink)
goto end;
- sink = malloc(sizeof(*sink));
+ sink = calloc(1, sizeof(*sink));
if (!sink)
goto end;
return ring_attach_cli(sink->ctx.ring, appctx);
}
+/* Pre-configures a ring proxy to emmit connections */
+void sink_setup_proxy(struct proxy *px)
+{
+ px->last_change = now.tv_sec;
+ px->cap = PR_CAP_FE | PR_CAP_BE;
+ px->maxconn = 0;
+ px->conn_retries = 1;
+ px->timeout.server = TICK_ETERNITY;
+ px->timeout.client = TICK_ETERNITY;
+ px->timeout.connect = TICK_ETERNITY;
+ px->accept = NULL;
+ px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
+ px->bind_proc = 0; /* will be filled by users */
+}
+
+/*
+ * IO Handler to handle message push to syslog tcp server
+ */
+static void sink_forward_io_handler(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ struct stream *s = si_strm(si);
+ struct sink *sink = strm_fe(s)->parent;
+ struct sink_forward_target *sft = appctx->ctx.sft.ptr;
+ struct ring *ring = sink->ctx.ring;
+ struct buffer *buf = &ring->buf;
+ uint64_t msg_len;
+ size_t len, cnt, ofs;
+ int ret = 0;
+
+ /* if stopping was requested, close immediatly */
+ if (unlikely(stopping))
+ goto close;
+
+ /* for rex because it seems reset to timeout
+ * and we don't want expire on this case
+ * with a syslog server
+ */
+ si_oc(si)->rex = TICK_ETERNITY;
+ /* rto should not change but it seems the case */
+ si_oc(si)->rto = TICK_ETERNITY;
+
+ /* an error was detected */
+ if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
+ goto close;
+
+ /* con closed by server side */
+ if ((si_oc(si)->flags & CF_SHUTW))
+ goto close;
+
+ /* if the connection is not established, inform the stream that we want
+ * to be notified whenever the connection completes.
+ */
+ if (si_opposite(si)->state < SI_ST_EST) {
+ si_cant_get(si);
+ si_rx_conn_blk(si);
+ si_rx_endp_more(si);
+ return;
+ }
+
+ HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+ if (appctx != sft->appctx) {
+ HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+ goto close;
+ }
+ ofs = sft->ofs;
+
+ HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+ LIST_DEL_INIT(&appctx->wait_entry);
+ HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+ HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
+
+ /* explanation for the initialization below: it would be better to do
+ * this in the parsing function but this would occasionally result in
+ * dropped events because we'd take a reference on the oldest message
+ * and keep it while being scheduled. Thus instead let's take it the
+ * first time we enter here so that we have a chance to pass many
+ * existing messages before grabbing a reference to a location. This
+ * value cannot be produced after initialization.
+ */
+ if (unlikely(ofs == ~0)) {
+ ofs = 0;
+
+ HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+ ofs += ring->ofs;
+ }
+
+ /* we were already there, adjust the offset to be relative to
+ * the buffer's head and remove us from the counter.
+ */
+ ofs -= ring->ofs;
+ BUG_ON(ofs >= buf->size);
+ HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
+
+ /* in this loop, ofs always points to the counter byte that precedes
+ * the message so that we can take our reference there if we have to
+ * stop before the end (ret=0).
+ */
+ if (si_opposite(si)->state == SI_ST_EST) {
+ ret = 1;
+ while (ofs + 1 < b_data(buf)) {
+ cnt = 1;
+ len = b_peek_varint(buf, ofs + cnt, &msg_len);
+ if (!len)
+ break;
+ cnt += len;
+ BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
+
+ if (unlikely(msg_len + 1 > b_size(&trash))) {
+ /* too large a message to ever fit, let's skip it */
+ ofs += cnt + msg_len;
+ continue;
+ }
+
+ chunk_reset(&trash);
+ len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
+ trash.data += len;
+ trash.area[trash.data++] = '\n';
+
+ if (ci_putchk(si_ic(si), &trash) == -1) {
+ si_rx_room_blk(si);
+ ret = 0;
+ break;
+ }
+ ofs += cnt + msg_len;
+ }
+
+ HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+ ofs += ring->ofs;
+ sft->ofs = ofs;
+ }
+ HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+ if (ret) {
+ /* let's be woken up once new data arrive */
+ HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+ LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
+ HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+ si_rx_endp_done(si);
+ }
+ HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+
+ /* always drain data from server */
+ co_skip(si_oc(si), si_oc(si)->output);
+ return;
+
+close:
+ si_shutw(si);
+ si_shutr(si);
+ si_ic(si)->flags |= CF_READ_NULL;
+}
+
+void __sink_forward_session_deinit(struct sink_forward_target *sft)
+{
+ struct stream_interface *si;
+ struct stream *s;
+ struct sink *sink;
+
+ if (!sft->appctx)
+ return;
+
+ si = sft->appctx->owner;
+ if (!si)
+ return;
+
+ s = si_strm(si);
+ if (!s)
+ return;
+
+ sink = strm_fe(s)->parent;
+ if (!sink)
+ return;
+
+ HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
+ LIST_DEL_INIT(&sft->appctx->wait_entry);
+ HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
+
+ sft->appctx = NULL;
+ task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
+}
+
+
+static void sink_forward_session_release(struct appctx *appctx)
+{
+ struct sink_forward_target *sft = appctx->ctx.peers.ptr;
+
+ if (!sft)
+ return;
+
+ HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+ if (sft->appctx == appctx)
+ __sink_forward_session_deinit(sft);
+ HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+}
+
+static struct applet sink_forward_applet = {
+ .obj_type = OBJ_TYPE_APPLET,
+ .name = "<SINKFWD>", /* used for logging */
+ .fct = sink_forward_io_handler,
+ .release = sink_forward_session_release,
+};
+
+/*
+ * Create a new peer session in assigned state (connect will start automatically)
+ */
+static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
+{
+ struct proxy *p = sink->forward_px;
+ struct appctx *appctx;
+ struct session *sess;
+ struct stream *s;
+
+ appctx = appctx_new(&sink_forward_applet, tid_bit);
+ if (!appctx)
+ goto out_close;
+
+ appctx->ctx.sft.ptr = (void *)sft;
+
+ sess = session_new(p, NULL, &appctx->obj_type);
+ if (!sess) {
+ ha_alert("out of memory in peer_session_create().\n");
+ goto out_free_appctx;
+ }
+
+ if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
+ ha_alert("Failed to initialize stream in peer_session_create().\n");
+ goto out_free_sess;
+ }
+
+
+ s->target = &sft->srv->obj_type;
+ if (!sockaddr_alloc(&s->target_addr))
+ goto out_free_strm;
+ *s->target_addr = sft->srv->addr;
+ s->flags = SF_ASSIGNED|SF_ADDR_SET;
+ s->si[1].flags |= SI_FL_NOLINGER;
+
+ s->do_log = NULL;
+ s->uniq_id = 0;
+
+ s->res.flags |= CF_READ_DONTWAIT;
+ /* for rto and rex to eternity to not expire on idle recv:
+ * We are using a syslog server.
+ */
+ s->res.rto = TICK_ETERNITY;
+ s->res.rex = TICK_ETERNITY;
+ sft->appctx = appctx;
+ task_wakeup(s->task, TASK_WOKEN_INIT);
+ return appctx;
+
+ /* Error unrolling */
+ out_free_strm:
+ LIST_DEL(&s->list);
+ pool_free(pool_head_stream, s);
+ out_free_sess:
+ session_free(sess);
+ out_free_appctx:
+ appctx_free(appctx);
+ out_close:
+ return NULL;
+}
+
+/*
+ * Task to handle connctions to forward servers
+ */
+static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
+{
+ struct sink *sink = (struct sink *)context;
+ struct sink_forward_target *sft = sink->sft;
+
+ task->expire = TICK_ETERNITY;
+
+ if (!stopping) {
+ while (sft) {
+ HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+ /* if appctx is NULL, start a new session */
+ if (!sft->appctx)
+ sft->appctx = sink_forward_session_create(sink, sft);
+ HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+ sft = sft->next;
+ }
+ }
+ else {
+ while (sft) {
+ HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+ /* awake applet to perform a clean close */
+ if (sft->appctx)
+ appctx_wakeup(sft->appctx);
+ HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+ sft = sft->next;
+ }
+ }
+
+ return task;
+}
+/*
+ * Init task to manage connctions to forward servers
+ *
+ * returns 0 in case of error.
+ */
+int sink_init_forward(struct sink *sink)
+{
+ sink->forward_task = task_new(MAX_THREADS_MASK);
+ if (!sink->forward_task)
+ return 0;
+
+ sink->forward_task->process = process_sink_forward;
+ sink->forward_task->context = (void *)sink;
+ sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
+ task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
+ return 1;
+}
/*
* Parse "ring" section and create corresponding sink buffer.
*
int err_code = 0;
const char *inv;
size_t size = BUFSIZE;
+ struct proxy *p;
if (strcmp(args[0], "ring") == 0) { /* new peers section */
if (!*args[1]) {
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
+
+ /* allocate new proxy to handle forwards */
+ p = calloc(1, sizeof *p);
+ if (!p) {
+ ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto err;
+ }
+
+ init_new_proxy(p);
+ sink_setup_proxy(p);
+ p->parent = cfg_sink;
+ p->id = strdup(args[1]);
+ p->conf.args.file = p->conf.file = strdup(file);
+ p->conf.args.line = p->conf.line = linenum;
+ cfg_sink->forward_px = p;
}
else if (strcmp(args[0], "size") == 0) {
size = atol(args[1]);
goto err;
}
}
+ else if (strcmp(args[0],"server") == 0) {
+ err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0);
+ }
+ else if (strcmp(args[0],"timeout") == 0) {
+ if (!cfg_sink || !cfg_sink->forward_px) {
+ ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto err;
+ }
+
+ if (strcmp(args[1], "connect") == 0 ||
+ strcmp(args[1], "server") == 0) {
+ const char *res;
+ unsigned int tout;
+
+ if (!*args[2]) {
+ ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
+ file, linenum, args[0], args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto err;
+ }
+ res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
+ if (res == PARSE_TIME_OVER) {
+ ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
+ file, linenum, args[2], args[0], args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto err;
+ }
+ else if (res == PARSE_TIME_UNDER) {
+ ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
+ file, linenum, args[2], args[0], args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto err;
+ }
+ else if (res) {
+ ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
+ file, linenum, *res, args[0], args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto err;
+ }
+ if (args[1][2] == 'c')
+ cfg_sink->forward_px->timeout.connect = tout;
+ else
+ cfg_sink->forward_px->timeout.server = tout;
+ }
+ }
else if (strcmp(args[0],"format") == 0) {
if (!cfg_sink) {
ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
int cfg_post_parse_ring()
{
int err_code = 0;
+ struct server *srv;
if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
err_code |= ERR_ALERT;
}
- }
+ /* prepare forward server descriptors */
+ if (cfg_sink->forward_px) {
+ srv = cfg_sink->forward_px->srv;
+ while (srv) {
+ struct sink_forward_target *sft;
+ /* init ssl if needed */
+ if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
+ if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
+ ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ }
+ }
+
+ /* allocate sink_forward_target descriptor */
+ sft = calloc(1, sizeof(*sft));
+ if (!sft) {
+ ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ break;
+ }
+ sft->srv = srv;
+ sft->appctx = NULL;
+ sft->ofs = ~0; /* init ring offset */
+ sft->next = cfg_sink->sft;
+ HA_SPIN_INIT(&sft->lock);
+
+ /* mark server attached to the ring */
+ if (!ring_attach(cfg_sink->ctx.ring)) {
+ ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ }
+ cfg_sink->sft = sft;
+ srv = srv->next;
+ }
+ sink_init_forward(cfg_sink);
+ }
+ }
cfg_sink = NULL;
return err_code;