ha_free(&sink);
}
+/* Helper function to create new high-level ring buffer (as in ring section from
+ * the config)
+ *
+ * Returns NULL on failure
+ */
+static struct sink *sink_new_ringbuf(const char *id, const char *description,
+ const char *file, int linenum, char **err_msg)
+{
+ struct sink *sink;
+ struct proxy *p = NULL; // forward_px
+
+ /* allocate new proxy to handle forwards */
+ p = calloc(1, sizeof(*p));
+ if (!p) {
+ memprintf(err_msg, "out of memory");
+ goto err;
+ }
+
+ init_new_proxy(p);
+ sink_setup_proxy(p);
+ p->id = strdup(id);
+ p->conf.args.file = p->conf.file = strdup(file);
+ p->conf.args.line = p->conf.line = linenum;
+
+ sink = sink_new_buf(id, description, LOG_FORMAT_RAW, BUFSIZE);
+ if (!sink || sink->type != SINK_TYPE_BUFFER) {
+ memprintf(err_msg, "unable to create a new sink buffer for ring '%s'", id);
+ goto err;
+ }
+
+ sink->forward_px = p;
+
+ /* link sink forward_target to proxy */
+ p->parent = sink;
+ sink->forward_px = p;
+
+ return sink;
+
+ err:
+ free_proxy(p);
+ return NULL;
+}
+
/*
* Parse "ring" section and create corresponding sink buffer.
*
int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
{
int err_code = 0;
+ char *err_msg = NULL;
const char *inv;
- size_t size = BUFSIZE;
- struct proxy *p;
if (strcmp(args[0], "ring") == 0) { /* new ring section */
if (!*args[1]) {
goto err;
}
- cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
- if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
- ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
+ cfg_sink = sink_new_ringbuf(args[1], args[1], file, linenum, &err_msg);
+ if (!cfg_sink) {
+ ha_alert("parsing [%s:%d] : %s.\n", file, linenum, err_msg);
+ ha_free(&err_msg);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
+
/* set maxlen value to 0 for now, we rely on this in postparsing
* to know if it was explicitly set using the "maxlen" parameter
*/
cfg_sink->maxlen = 0;
-
- /* allocate new proxy to handle forwards */
- p = calloc(1, sizeof *p);
- if (!p) {
- ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
- err_code |= ERR_ALERT | ERR_FATAL;
- goto err;
- }
-
- init_new_proxy(p);
- sink_setup_proxy(p);
- p->parent = cfg_sink;
- p->id = strdup(args[1]);
- p->conf.args.file = p->conf.file = strdup(file);
- p->conf.args.line = p->conf.line = linenum;
- cfg_sink->forward_px = p;
}
else if (strcmp(args[0], "size") == 0) {
+ size_t size;
+
if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
*/
struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
{
- struct proxy *p = NULL;
struct sink *sink = NULL;
struct server *srv = NULL;
struct sink_forward_target *sft = NULL;
+ char *err_msg = NULL;
- /* allocate new proxy to handle
- * forward to a stream server
- */
- p = calloc(1, sizeof *p);
- if (!p) {
+ /* prepare description for the sink */
+ chunk_reset(&trash);
+ chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
+
+ /* allocate a new sink buffer */
+ sink = sink_new_ringbuf(logsrv->ring_name, trash.area, logsrv->conf.file, logsrv->conf.line, &err_msg);
+ if (!sink) {
+ ha_alert("%s.\n", err_msg);
+ ha_free(&err_msg);
goto error;
- }
+ }
- init_new_proxy(p);
- sink_setup_proxy(p);
- p->id = strdup(logsrv->ring_name);
- p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
- p->conf.args.line = p->conf.line = logsrv->conf.line;
+ /* disable sink->maxlen, we already have logsrv->maxlen */
+ sink->maxlen = ~0;
+
+ /* set ring format from logsrv format */
+ sink->fmt = logsrv->format;
- /* Set default connect and server timeout */
- p->timeout.connect = MS_TO_TICKS(1000);
- p->timeout.server = MS_TO_TICKS(5000);
+ /* Set default connect and server timeout for sink forward proxy */
+ sink->forward_px->timeout.connect = MS_TO_TICKS(1000);
+ sink->forward_px->timeout.server = MS_TO_TICKS(5000);
/* allocate a new server to forward messages
* from ring buffer
*/
- srv = new_server(p);
+ srv = new_server(sink->forward_px);
if (!srv)
goto error;
if (srv_init_per_thr(srv) == -1)
goto error;
- /* the servers are linked backwards
- * first into proxy
- */
- srv->next = p->srv;
- p->srv = srv;
-
/* allocate sink_forward_target descriptor */
sft = calloc(1, sizeof(*sft));
if (!sft)
sft->ofs = ~0;
HA_SPIN_INIT(&sft->lock);
- /* prepare description for the sink */
- chunk_reset(&trash);
- chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
-
- /* allocate a new sink buffer */
- sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
- if (!sink || sink->type != SINK_TYPE_BUFFER) {
- goto error;
- }
-
- /* link sink_forward_target to proxy */
- sink->forward_px = p;
- p->parent = sink;
+ /* link srv with sink forward proxy: the servers are linked
+ * backwards first into proxy
+ */
+ srv->next = sink->forward_px->srv;
+ sink->forward_px->srv = srv;
/* insert into sink_forward_targets
* list into sink
/* should never fail since there is
* only one reader
*/
- goto error;
+ goto error_final;
}
/* initialize sink buffer forwarding */
if (!sink_init_forward(sink))
- goto error;
+ goto error_final;
/* reset familyt of logsrv to consider the ring buffer target */
logsrv->addr.ss_family = AF_UNSPEC;
return sink;
-error:
- if (srv)
- srv_drop(srv);
-
- if (p) {
- if (p->id)
- free(p->id);
- if (p->conf.file)
- free(p->conf.file);
-
- free(p);
- }
+ error:
+ srv_drop(srv);
if (sft)
free(sft);
- if (sink) {
- ring_free(sink->ctx.ring);
-
- LIST_DELETE(&sink->sink_list);
- free(sink->name);
- free(sink->desc);
- free(sink);
- }
+ error_final:
+ sink_free(sink);
return NULL;
}