src/http_client.o src/listener.o src/dns.o src/vars.o src/debug.o \
src/tcp_rules.o src/sink.o src/h1_htx.o src/task.o src/mjson.o \
src/h2.o src/filters.o src/server_state.o src/payload.o \
- src/fcgi-app.o src/map.o src/htx.o src/h1.o src/pool.o \
+ src/fcgi-app.o src/map.o src/htx.o src/h1.o src/pool.o src/dns_ring.o \
src/cfgparse-global.o src/trace.o src/tcp_sample.o src/http_ext.o \
src/flt_http_comp.o src/mux_pt.o src/flt_trace.o src/mqtt.o \
src/acl.o src/sock.o src/mworker.o src/tcp_act.o src/ring.o \
#include <haproxy/connection-t.h>
#include <haproxy/buf-t.h>
#include <haproxy/dgram-t.h>
+#include <haproxy/dns_ring-t.h>
#include <haproxy/obj_type-t.h>
-#include <haproxy/ring-t.h>
#include <haproxy/stats-t.h>
#include <haproxy/task-t.h>
#include <haproxy/thread.h>
*/
struct dns_stream_server {
struct server *srv;
- struct ring *ring_req;
+ struct dns_ring *ring_req;
int max_slots;
int maxconn;
int idle_conns;
struct dns_dgram_server {
struct dgram_conn conn; /* transport layer */
- struct ring *ring_req;
+ struct dns_ring *ring_req;
size_t ofs_req; // ring buffer reader offset
};
struct task *task_exp;
struct eb_root query_ids; /* tree to quickly lookup/retrieve query ids currently in use */
size_t ofs; // ring buffer reader offset
- struct ring ring;
+ struct dns_ring ring;
struct {
uint16_t len;
uint16_t offset;
--- /dev/null
+/*
+ * include/haproxy/dns_ring-t.h
+ * This file provides definitions for ring buffers used for disposable data.
+ * This is a fork of ring-t.h for DNS usages.
+ *
+ * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, version 2.1
+ * exclusively.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef _HAPROXY_DNS_RING_T_H
+#define _HAPROXY_DNS_RING_T_H
+
+#include <haproxy/api-t.h>
+#include <haproxy/buf-t.h>
+#include <haproxy/thread.h>
+
+/* The code below handles circular buffers with single-producer and multiple
+ * readers (up to 255). The buffer storage area must remain always allocated.
+ * It's made of series of payload blocks followed by a readers count (RC).
+ * There is always a readers count at the beginning of the buffer as well. Each
+ * payload block is composed of a varint-encoded size (VI) followed by the
+ * actual payload (PL).
+ *
+ * The readers count is encoded on a single byte. It indicates how many readers
+ * are still waiting at this position. The writer writes after the buffer's
+ * tail, which initially starts just past the first readers count. Then it
+ * knows by reading this count that it must wake up the readers to indicate
+ * data availability. When a reader reads the payload block, it increments the
+ * next readers count and decrements the current one. The area between the
+ * initial readers count and the next one is protected from overwriting for as
+ * long as the initial count is non-null. As such these readers count are
+ * effective barriers against data recycling.
+ *
+ * Only the writer is allowed to update the buffer's tail/head. This ensures
+ * that events can remain as long as possible so that late readers can get the
+ * maximum history available. It also helps dealing with multi-thread accesses
+ * using a simple RW lock during the buffer head's manipulation. The writer
+ * will have to delete some old records starting at the head until the new
+ * message can fit or a non-null readers count is encountered. If a message
+ * cannot fit due to insufficient room, the message is lost and the drop
+ * counted must be incremented.
+ *
+ * Like any buffer, this buffer naturally wraps at the end and continues at the
+ * beginning. The creation process consists in immediately adding a null
+ * readers count byte into the buffer. The write process consists in always
+ * writing a payload block followed by a new readers count. The delete process
+ * consists in removing a null readers count and payload block. As such, there
+ * is always at least one readers count byte in the buffer available at the
+ * head for new readers to attach to, and one before the tail, both of which
+ * may be the same when the buffer doesn't contain any event. It is thus safe
+ * for any reader to simply keep the absolute offset of the last visited
+ * position and to restart from there. The write will update the buffer's
+ * absolute offset when deleting entries. All this also has the benefit of
+ * allowing a buffer to be hot-resized without losing its contents.
+ *
+ * Thus we have this :
+ * - init of empty buffer:
+ * head-, ,-tail
+ * [ RC | xxxxxxxxxxxxxxxxxxxxxxxxxx ]
+ *
+ * - reader attached:
+ * head-, ,-tail
+ * [ RC | xxxxxxxxxxxxxxxxxxxxxxxxxx ]
+ * ^- +1
+ *
+ * - append of one event:
+ * appended
+ * head-, <----------> ,-tail
+ * [ RC | VI | PL | RC | xxxxxxxxxxx ]
+ *
+ * - reader advancing:
+ * head-, ,-tail
+ * [ RC | VI | PL | RC | xxxxxxxxxxx ]
+ * ^- -1 ^- +1
+ *
+ * - writer removing older message:
+ * head-, ,-tail
+ * [ xxxxxxxxxxxx | RC | xxxxxxxxxxx ]
+ * <---------->
+ * removed
+ */
+
+struct dns_ring {
+ struct buffer buf; // storage area
+ struct list waiters; // list of waiters, for now, CLI "show event"
+ __decl_thread(HA_RWLOCK_T lock);
+ int readers_count;
+};
+
+#endif /* _HAPROXY_DNS_RING_T_H */
+
+/*
+ * Local variables:
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * End:
+ */
--- /dev/null
+/*
+ * include/haproxy/dns_ring.h
+ * Exported functions for ring buffers used for disposable data.
+ * This is a fork of ring.h for DNS usage.
+ *
+ * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, version 2.1
+ * exclusively.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef _HAPROXY_DNS_RING_H
+#define _HAPROXY_DNS_RING_H
+
+#include <stdlib.h>
+#include <import/ist.h>
+#include <haproxy/dns_ring-t.h>
+
+struct appctx;
+
+struct dns_ring *dns_ring_new(size_t size);
+void dns_ring_init(struct dns_ring *ring, void* area, size_t size);
+void dns_ring_free(struct dns_ring *ring);
+ssize_t dns_ring_write(struct dns_ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg);
+int dns_ring_attach(struct dns_ring *ring);
+void dns_ring_detach_appctx(struct dns_ring *ring, struct appctx *appctx, size_t ofs);
+
+#endif /* _HAPROXY_DNS_RING_H */
+
+/*
+ * Local variables:
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * End:
+ */
#include <haproxy/cli.h>
#include <haproxy/dgram.h>
#include <haproxy/dns.h>
+#include <haproxy/dns_ring.h>
#include <haproxy/errors.h>
#include <haproxy/fd.h>
#include <haproxy/log.h>
-#include <haproxy/ring.h>
#include <haproxy/sc_strm.h>
#include <haproxy/stconn.h>
#include <haproxy/stream.h>
struct ist myist;
myist = ist2(buf, len);
- ret = ring_write(ns->dgram->ring_req, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
+ ret = dns_ring_write(ns->dgram->ring_req, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
if (!ret) {
ns->counters->snd_error++;
HA_SPIN_UNLOCK(DNS_LOCK, &dgram->lock);
struct ist myist;
myist = ist2(buf, len);
- ret = ring_write(ns->stream->ring_req, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
+ ret = dns_ring_write(ns->stream->ring_req, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
if (!ret) {
ns->counters->snd_error++;
return -1;
{
int fd;
struct dns_nameserver *ns;
- struct ring *ring;
+ struct dns_ring *ring;
struct buffer *buf;
uint64_t msg_len;
size_t len, cnt, ofs;
ns->dgram = dgram;
dgram->ofs_req = ~0; /* init ring offset */
- dgram->ring_req = ring_new(2*DNS_TCP_MSG_RING_MAX_SIZE);
+ dgram->ring_req = dns_ring_new(2*DNS_TCP_MSG_RING_MAX_SIZE);
if (!dgram->ring_req) {
ha_alert("memory allocation error initializing the ring for nameserver.\n");
goto out;
}
/* attach the task as reader */
- if (!ring_attach(dgram->ring_req)) {
+ if (!dns_ring_attach(dgram->ring_req)) {
/* mark server attached to the ring */
ha_alert("nameserver sets too many watchers > 255 on ring. This is a bug and should not happen.\n");
goto out;
}
return 0;
out:
- ring_free(dgram->ring_req);
+ dns_ring_free(dgram->ring_req);
free(dgram);
{
struct stconn *sc = appctx_sc(appctx);
struct dns_session *ds = appctx->svcctx;
- struct ring *ring = &ds->ring;
+ struct dns_ring *ring = &ds->ring;
struct buffer *buf = &ring->buf;
uint64_t msg_len;
int available_room;
if (!ds)
return;
- /* We do not call ring_appctx_detach here
+ /* We do not call dns_ring_appctx_detach here
* because we want to keep readers counters
* to retry a conn with a different appctx.
*/
if (!ds->tx_ring_area)
goto error;
- ring_init(&ds->ring, ds->tx_ring_area, DNS_TCP_MSG_RING_MAX_SIZE);
+ dns_ring_init(&ds->ring, ds->tx_ring_area, DNS_TCP_MSG_RING_MAX_SIZE);
/* never fail because it is the first watcher attached to the ring */
- DISGUISE(ring_attach(&ds->ring));
+ DISGUISE(dns_ring_attach(&ds->ring));
if ((ds->task_exp = task_new_here()) == NULL)
goto error;
{
struct dns_nameserver *ns = (struct dns_nameserver *)context;
struct dns_stream_server *dss = ns->stream;
- struct ring *ring = dss->ring_req;
+ struct dns_ring *ring = dss->ring_req;
struct buffer *buf = &ring->buf;
uint64_t msg_len;
size_t len, cnt, ofs;
if (!LIST_ISEMPTY(&dss->free_sess)) {
ds = LIST_NEXT(&dss->free_sess, struct dns_session *, list);
- if (ring_write(&ds->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1) > 0) {
+ if (dns_ring_write(&ds->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1) > 0) {
ds->nb_queries++;
if (ds->nb_queries >= DNS_STREAM_MAX_PIPELINED_REQ)
LIST_DEL_INIT(&ds->list);
if (!LIST_ISEMPTY(&dss->idle_sess)) {
ds = LIST_NEXT(&dss->idle_sess, struct dns_session *, list);
- /* ring is empty so this ring_write should never fail */
- ring_write(&ds->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
+ /* ring is empty so this dns_ring_write should never fail */
+ dns_ring_write(&ds->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
ds->nb_queries++;
LIST_DEL_INIT(&ds->list);
/* allocate a new session */
ads = dns_session_new(dss);
if (ads) {
- /* ring is empty so this ring_write should never fail */
- ring_write(&ads->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
+ /* ring is empty so this dns_ring_write should never fail */
+ dns_ring_write(&ads->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
ads->nb_queries++;
LIST_INSERT(&dss->free_sess, &ads->list);
}
dss->maxconn = srv->maxconn;
dss->ofs_req = ~0; /* init ring offset */
- dss->ring_req = ring_new(2*DNS_TCP_MSG_RING_MAX_SIZE);
+ dss->ring_req = dns_ring_new(2*DNS_TCP_MSG_RING_MAX_SIZE);
if (!dss->ring_req) {
ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
goto out;
dss->task_req->context = ns;
/* attach the task as reader */
- if (!ring_attach(dss->ring_req)) {
+ if (!dns_ring_attach(dss->ring_req)) {
/* mark server attached to the ring */
ha_alert("server '%s': too many watchers for ring. this should never happen.\n", srv->id);
goto out;
if (dss && dss->task_req)
task_destroy(dss->task_req);
if (dss && dss->ring_req)
- ring_free(dss->ring_req);
+ dns_ring_free(dss->ring_req);
free(dss);
return -1;
--- /dev/null
+/*
+ * Ring buffer management
+ * This is a fork of ring.c for DNS usage.
+ *
+ * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, version 2.1
+ * exclusively.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdlib.h>
+#include <haproxy/api.h>
+#include <haproxy/applet.h>
+#include <haproxy/buf.h>
+#include <haproxy/cli.h>
+#include <haproxy/dns_ring.h>
+#include <haproxy/sc_strm.h>
+#include <haproxy/stconn.h>
+#include <haproxy/thread.h>
+
+/* Initialize a pre-allocated ring with the buffer area
+ * of size */
+void dns_ring_init(struct dns_ring *ring, void *area, size_t size)
+{
+ HA_RWLOCK_INIT(&ring->lock);
+ LIST_INIT(&ring->waiters);
+ ring->readers_count = 0;
+ ring->buf = b_make(area, size, 0, 0);
+ /* write the initial RC byte */
+ b_putchr(&ring->buf, 0);
+}
+
+/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
+ * allocation failure.
+ */
+struct dns_ring *dns_ring_new(size_t size)
+{
+ struct dns_ring *ring = NULL;
+ void *area = NULL;
+
+ if (size < 2)
+ goto fail;
+
+ ring = malloc(sizeof(*ring));
+ if (!ring)
+ goto fail;
+
+ area = malloc(size);
+ if (!area)
+ goto fail;
+
+ dns_ring_init(ring, area, size);
+ return ring;
+ fail:
+ free(area);
+ free(ring);
+ return NULL;
+}
+
+/* destroys and frees ring <ring> */
+void dns_ring_free(struct dns_ring *ring)
+{
+ if (!ring)
+ return;
+
+ free(ring->buf.area);
+ free(ring);
+}
+
+/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>
+ * to ring <ring>. The message is sent atomically. It may be truncated to
+ * <maxlen> bytes if <maxlen> is non-null. There is no distinction between the
+ * two lists, it's just a convenience to help the caller prepend some prefixes
+ * when necessary. It takes the ring's write lock to make sure no other thread
+ * will touch the buffer during the update. Returns the number of bytes sent,
+ * or <=0 on failure.
+ */
+ssize_t dns_ring_write(struct dns_ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
+{
+ struct buffer *buf = &ring->buf;
+ struct appctx *appctx;
+ size_t totlen = 0;
+ size_t lenlen;
+ uint64_t dellen;
+ int dellenlen;
+ ssize_t sent = 0;
+ int i;
+
+ /* we have to find some room to add our message (the buffer is
+ * never empty and at least contains the previous counter) and
+ * to update both the buffer contents and heads at the same
+ * time (it's doable using atomic ops but not worth the
+ * trouble, let's just lock). For this we first need to know
+ * the total message's length. We cannot measure it while
+ * copying due to the varint encoding of the length.
+ */
+ for (i = 0; i < npfx; i++)
+ totlen += pfx[i].len;
+ for (i = 0; i < nmsg; i++)
+ totlen += msg[i].len;
+
+ if (totlen > maxlen)
+ totlen = maxlen;
+
+ lenlen = varint_bytes(totlen);
+
+ HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
+ if (lenlen + totlen + 1 + 1 > b_size(buf))
+ goto done_buf;
+
+ while (b_room(buf) < lenlen + totlen + 1) {
+ /* we need to delete the oldest message (from the end),
+ * and we have to stop if there's a reader stuck there.
+ * Unless there's corruption in the buffer it's guaranteed
+ * that we have enough data to find 1 counter byte, a
+ * varint-encoded length (1 byte min) and the message
+ * payload (0 bytes min).
+ */
+ if (*b_head(buf))
+ goto done_buf;
+ dellenlen = b_peek_varint(buf, 1, &dellen);
+ if (!dellenlen)
+ goto done_buf;
+ BUG_ON(b_data(buf) < 1 + dellenlen + dellen);
+
+ b_del(buf, 1 + dellenlen + dellen);
+ }
+
+ /* OK now we do have room */
+ __b_put_varint(buf, totlen);
+
+ totlen = 0;
+ for (i = 0; i < npfx; i++) {
+ size_t len = pfx[i].len;
+
+ if (len + totlen > maxlen)
+ len = maxlen - totlen;
+ if (len)
+ __b_putblk(buf, pfx[i].ptr, len);
+ totlen += len;
+ }
+
+ for (i = 0; i < nmsg; i++) {
+ size_t len = msg[i].len;
+
+ if (len + totlen > maxlen)
+ len = maxlen - totlen;
+ if (len)
+ __b_putblk(buf, msg[i].ptr, len);
+ totlen += len;
+ }
+
+ *b_tail(buf) = 0; buf->data++; // new read counter
+ sent = lenlen + totlen + 1;
+
+ /* notify potential readers */
+ list_for_each_entry(appctx, &ring->waiters, wait_entry)
+ appctx_wakeup(appctx);
+
+ done_buf:
+ HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+ return sent;
+}
+
+/* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is
+ * meant to be used by low level appctx code such as CLI or ring forwarding.
+ * For higher level functions, please see the relevant parts in appctx or CLI.
+ * It returns non-zero on success or zero on failure if too many users are
+ * already attached. On success, the caller MUST call dns_ring_detach_appctx()
+ * to detach itself, even if it was never woken up.
+ */
+int dns_ring_attach(struct dns_ring *ring)
+{
+ int users = ring->readers_count;
+
+ do {
+ if (users >= 255)
+ return 0;
+ } while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1));
+ return 1;
+}
+
+/* detach an appctx from a ring. The appctx is expected to be waiting at offset
+ * <ofs> relative to the beginning of the storage, or ~0 if not waiting yet.
+ * Nothing is done if <ring> is NULL.
+ */
+void dns_ring_detach_appctx(struct dns_ring *ring, struct appctx *appctx, size_t ofs)
+{
+ if (!ring)
+ return;
+
+ HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
+ if (ofs != ~0) {
+ /* reader was still attached */
+ if (ofs < b_head_ofs(&ring->buf))
+ ofs += b_size(&ring->buf) - b_head_ofs(&ring->buf);
+ else
+ ofs -= b_head_ofs(&ring->buf);
+
+ BUG_ON(ofs >= b_size(&ring->buf));
+ LIST_DEL_INIT(&appctx->wait_entry);
+ HA_ATOMIC_DEC(b_peek(&ring->buf, ofs));
+ }
+ HA_ATOMIC_DEC(&ring->readers_count);
+ HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
+}
+
+/*
+ * Local variables:
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * End:
+ */
#include <haproxy/check.h>
#include <haproxy/cli.h>
#include <haproxy/dns.h>
+#include <haproxy/dns_ring.h>
#include <haproxy/errors.h>
#include <haproxy/fd.h>
#include <haproxy/http_rules.h>
#include <haproxy/protocol.h>
#include <haproxy/proxy.h>
#include <haproxy/resolvers.h>
-#include <haproxy/ring.h>
#include <haproxy/sample.h>
#include <haproxy/sc_strm.h>
#include <haproxy/server.h>
fd_delete(ns->dgram->conn.t.sock.fd);
close(ns->dgram->conn.t.sock.fd);
}
- ring_free(ns->dgram->ring_req);
+ dns_ring_free(ns->dgram->ring_req);
free(ns->dgram);
}
if (ns->stream) {
- ring_free(ns->stream->ring_req);
+ dns_ring_free(ns->stream->ring_req);
task_destroy(ns->stream->task_req);
task_destroy(ns->stream->task_rsp);
free(ns->stream);