isc_sockaddr_t peer;
isc_sockaddr_t local;
- isc_nm_opaquecb doreset; /* reset extra callback, external */
- isc_nm_opaquecb dofree; /* free extra callback, external */
+ isc_nm_opaquecb_t doreset; /* reset extra callback, external */
+ isc_nm_opaquecb_t dofree; /* free extra callback, external */
void * opaque;
char extra[];
};
isc_refcount_t references;
/*%
- * TCPDNS socket is not pipelining.
+ * TCPDNS socket has been set not to pipeliine.
*/
atomic_bool sequential;
+
+ /*%
+ * TCPDNS socket has exceeded the maximum number of
+ * simultaneous requests per connecton, so will be temporarily
+ * restricted from pipelining.
+ */
+ atomic_bool overlimit;
+
/*%
* TCPDNS socket in sequential mode is currently processing a packet,
* we need to wait until it finishes.
*/
atomic_bool processing;
+ /*%
+ * A TCP socket has had isc_nm_pauseread() called.
+ */
+ atomic_bool readpaused;
+
/*%
* 'spare' handles for that can be reused to avoid allocations,
* for UDP.
/*%
* List of active handles.
- * ah_size - size of ah_frees and ah_handles
- * ah_cpos - current position in ah_frees;
- * ah_handles - array of *handles.
+ * ah - current position in 'ah_frees'; this represents the
+ * current number of active handles;
+ * ah_size - size of the 'ah_frees' and 'ah_handles' arrays
+ * ah_handles - array pointers to active handles
+ *
* Adding a handle
- * - if ah_cpos == ah_size, realloc
- * - x = ah_frees[ah_cpos]
- * - ah_frees[ah_cpos++] = 0;
+ * - if ah == ah_size, reallocate
+ * - x = ah_frees[ah]
+ * - ah_frees[ah++] = 0;
* - ah_handles[x] = handle
* - x must be stored with the handle!
* Removing a handle:
- * - ah_frees[--ah_cpos] = x
+ * - ah_frees[--ah] = x
* - ah_handles[x] = NULL;
*
- * XXXWPK for now this is locked with socket->lock, but we might want
- * to change it to something lockless
+ * XXXWPK for now this is locked with socket->lock, but we
+ * might want to change it to something lockless
*/
+ size_t ah;
size_t ah_size;
- size_t ah_cpos;
size_t *ah_frees;
isc_nmhandle_t **ah_handles;
size_t buf_len;
unsigned char *buf;
+ /*
+ * This function will be called with handle->sock
+ * as the argument whenever a handle's references drop
+ * to zero, after its reset callback has been called.
+ */
+ isc_nm_opaquecb_t closehandle_cb;
+
isc__nm_readcb_t rcb;
void *rcbarg;
};
}
if (sock->buf != NULL) {
- isc_mem_put(sock->mgr->mctx, sock->buf, sock->buf_size);
+ isc_mem_free(sock->mgr->mctx, sock->buf);
}
if (sock->quota != NULL) {
* accept destruction.
*/
LOCK(&sock->lock);
- active_handles += sock->ah_cpos;
+ active_handles += sock->ah;
if (sock->children != NULL) {
for (int i = 0; i < sock->nchildren; i++) {
LOCK(&sock->children[i].lock);
- active_handles += sock->children[i].ah_cpos;
+ active_handles += sock->children[i].ah;
UNLOCK(&sock->children[i].lock);
}
}
isc_mutex_init(&sock->lock);
isc_condition_init(&sock->cond);
isc_refcount_init(&sock->references, 1);
+
atomic_init(&sock->active, true);
+ atomic_init(&sock->sequential, false);
+ atomic_init(&sock->overlimit, false);
+ atomic_init(&sock->processing, false);
+ atomic_init(&sock->readpaused, false);
sock->magic = NMSOCK_MAGIC;
}
isc__networker_t *worker = NULL;
REQUIRE(VALID_NMSOCK(sock));
-
+ if (buf->base == NULL) {
+ /* Empty buffer: might happen in case of error. */
+ return;
+ }
worker = &sock->mgr->workers[sock->tid];
REQUIRE(worker->udprecvbuf_inuse);
REQUIRE(buf->base == worker->udprecvbuf);
- UNUSED(buf);
-
worker->udprecvbuf_inuse = false;
}
LOCK(&sock->lock);
/* We need to add this handle to the list of active handles */
- if (sock->ah_cpos == sock->ah_size) {
+ if (sock->ah == sock->ah_size) {
sock->ah_frees =
isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees,
sock->ah_size * 2 *
sock->ah_size *= 2;
}
- pos = sock->ah_frees[sock->ah_cpos++];
+ pos = sock->ah_frees[sock->ah++];
INSIST(sock->ah_handles[pos] == NULL);
sock->ah_handles[pos] = handle;
handle->ah_pos = pos;
nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
size_t extra = sock->extrahandlesize;
- if (handle->dofree) {
+ if (handle->dofree != NULL) {
handle->dofree(handle->opaque);
}
LOCK(&sock->lock);
INSIST(sock->ah_handles[handle->ah_pos] == handle);
INSIST(sock->ah_size > handle->ah_pos);
- INSIST(sock->ah_cpos > 0);
+ INSIST(sock->ah > 0);
sock->ah_handles[handle->ah_pos] = NULL;
- sock->ah_frees[--sock->ah_cpos] = handle->ah_pos;
+ sock->ah_frees[--sock->ah] = handle->ah_pos;
handle->ah_pos = 0;
if (atomic_load(&sock->active)) {
}
UNLOCK(&sock->lock);
+ /*
+ * Handle is closed. If the socket has a callback
+ * configured for that (e.g., to perform cleanup after
+ * request processing), call it now.
+ */
+ if (sock->closehandle_cb != NULL) {
+ sock->closehandle_cb(sock);
+ }
+
if (!reuse) {
nmhandle_free(sock, handle);
}
- if (sock->ah_cpos == 0 &&
+ if (sock->ah == 0 &&
!atomic_load(&sock->active) &&
!atomic_load(&sock->destroying))
{
void
isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
- isc_nm_opaquecb doreset, isc_nm_opaquecb dofree)
+ isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree)
{
REQUIRE(VALID_NMHANDLE(handle));
#include "netmgr-int.h"
+#define TCPDNS_CLIENTS_PER_CONN 23
+/*%<
+ *
+ * Maximum number of simultaneous handles in flight supported for a single
+ * connected TCPDNS socket. This value was chosen arbitrarily, and may be
+ * changed in the future.
+ */
+
static void
dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg);
+static void
+resume_processing(void *arg);
+
static inline size_t
dnslen(unsigned char* base) {
return ((base[0] << 8) + (base[1]));
if (sock->buf == NULL) {
/* We don't have the buffer at all */
size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF;
- sock->buf = isc_mem_get(sock->mgr->mctx, alloc_len);
+ sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len);
sock->buf_size = alloc_len;
} else {
/* We have the buffer but it's too small */
}
}
-
/*
* Accept callback for TCP-DNS connection
*/
isc_nmsocket_attach(handle->sock, &dnssock->outer);
dnssock->peer = handle->sock->peer;
dnssock->iface = handle->sock->iface;
+ dnssock->closehandle_cb = resume_processing;
isc_nm_read(handle, dnslisten_readcb, dnssock);
}
+static bool
+connection_limit(isc_nmsocket_t *sock) {
+ int ah;
+
+ REQUIRE(sock->type == isc_nm_tcpdnssocket && sock->outer != NULL);
+
+ if (atomic_load(&sock->sequential)) {
+ /*
+ * We're already non-pipelining, so there's
+ * no need to check per-connection limits.
+ */
+ return (false);
+ }
+
+ LOCK(&sock->lock);
+ ah = sock->ah;
+ UNLOCK(&sock->lock);
+
+ if (ah >= TCPDNS_CLIENTS_PER_CONN) {
+ atomic_store(&sock->overlimit, true);
+ isc_nm_pauseread(sock->outer);
+ return (true);
+ }
+
+ return (false);
+}
+
+/* Process all complete packets out of incoming buffer */
+static void
+processbuffer(isc_nmsocket_t *dnssock) {
+ REQUIRE(VALID_NMSOCK(dnssock));
+
+ /* While we have a complete packet in the buffer */
+ while (dnssock->buf_len > 2 &&
+ dnslen(dnssock->buf) <= dnssock->buf_len - 2 &&
+ !connection_limit(dnssock))
+ {
+ isc_nmhandle_t *dnshandle = NULL;
+ isc_region_t r2 = {
+ .base = dnssock->buf + 2,
+ .length = dnslen(dnssock->buf)
+ };
+ size_t len;
+
+ dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
+ atomic_store(&dnssock->processing, true);
+ dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
+
+ /*
+ * If the recv callback wants to hold on to the
+ * handle, it needs to attach to it.
+ */
+ isc_nmhandle_unref(dnshandle);
+
+ len = dnslen(dnssock->buf) + 2;
+ dnssock->buf_len -= len;
+ if (len > 0) {
+ memmove(dnssock->buf, dnssock->buf + len,
+ dnssock->buf_len);
+ }
+
+ /* Check here to make sure we do the processing at least once */
+ if (atomic_load(&dnssock->processing)) {
+ return;
+ }
+ }
+}
+
/*
* We've got a read on our underlying socket, need to check if we have
* a complete DNS packet and, if so - call the callback
/*
* We have something in the buffer, we need to glue it.
*/
+ if (dnssock->buf_len > 0) {
+ if (dnssock->buf_len == 1) {
+ /* Make sure we have the length */
+ dnssock->buf[1] = base[0];
+ dnssock->buf_len = 2;
+ base++;
+ len--;
+ }
+
+ processbuffer(dnssock);
+ }
+
if (dnssock->buf_len > 0) {
size_t plen;
/* At this point we definitely have 2 bytes there. */
plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 -
dnssock->buf_len));
- if (plen > dnssock->buf_size) {
- alloc_dnsbuf(dnssock, plen);
+
+ if (dnssock->buf_len + plen > NM_BIG_BUF) {
+ /*
+ * XXX: continuing to read will overrun the
+ * socket buffer. We may need to force the
+ * connection to close so the client will have
+ * to open a new one.
+ */
+ return;
+ }
+
+ if (dnssock->buf_len + plen > dnssock->buf_size) {
+ alloc_dnsbuf(dnssock, dnssock->buf_len + plen);
}
memmove(dnssock->buf + dnssock->buf_len, base, plen);
len -= plen;
/* Do we have a complete packet in the buffer? */
- if (dnslen(dnssock->buf) == dnssock->buf_len - 2) {
+ if (dnslen(dnssock->buf) >= dnssock->buf_len - 2 &&
+ !connection_limit(dnssock))
+ {
isc_nmhandle_t *dnshandle = NULL;
isc_region_t r2 = {
.base = dnssock->buf + 2,
.length = dnslen(dnssock->buf)
};
+
dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
atomic_store(&dnssock->processing, true);
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
* At this point we've processed whatever was previously in the
* socket buffer. If there are more messages to be found in what
* we've read, and if we're either pipelining or not processing
- * anything else, then we can process those messages now.
+ * anything else currently, then we can process those messages now.
*/
while (len >= 2 && dnslen(base) <= len - 2 &&
- !(atomic_load(&dnssock->sequential) &&
- atomic_load(&dnssock->processing)))
+ (!atomic_load(&dnssock->sequential) ||
+ !atomic_load(&dnssock->processing)) &&
+ !connection_limit(dnssock))
{
isc_nmhandle_t *dnshandle = NULL;
isc_region_t r2 = {
}
}
-/* Process all complete packets out of incoming buffer */
-static void
-processbuffer(isc_nmsocket_t *dnssock) {
- REQUIRE(VALID_NMSOCK(dnssock));
-
- /* While we have a complete packet in the buffer */
- while (dnssock->buf_len > 2 &&
- dnslen(dnssock->buf) <= dnssock->buf_len - 2)
- {
- isc_nmhandle_t *dnshandle = NULL;
- isc_region_t r2 = {
- .base = dnssock->buf + 2,
- .length = dnslen(dnssock->buf)
- };
- size_t len;
-
- dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
- atomic_store(&dnssock->processing, true);
- dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
-
- /*
- * If the recv callback wants to hold on to the
- * handle, it needs to attach to it.
- */
- isc_nmhandle_unref(dnshandle);
-
- len = dnslen(dnssock->buf) + 2;
- dnssock->buf_len -= len;
- if (len > 0) {
- memmove(dnssock->buf, dnssock->buf + len,
- dnssock->buf_len);
- }
-
- /* Check here to make sure we do the processing at least once */
- if (atomic_load(&dnssock->processing)) {
- return;
- }
- }
-}
-
/*
* isc_nm_listentcpdns listens for connections and accepts
* them immediately, then calls the cb for each incoming DNS packet
/*
* We don't want pipelining on this connection. That means
- * that we can launch query processing only when the previous
- * one returned.
- *
- * The socket MUST be unpaused after the query is processed.
- * This is done by isc_nm_resumeread() in tcpdnssend_cb() below.
- *
- * XXX: The callback is not currently executed in failure cases!
+ * that we need to pause after reading each request, and
+ * resume only after the request has been processed. This
+ * is done in resume_processing(), which is the socket's
+ * closehandle_cb callback, called whenever a handle
+ * is released.
*/
isc_nm_pauseread(handle->sock->outer);
atomic_store(&handle->sock->sequential, true);
void *cbarg;
} tcpsend_t;
+static void
+resume_processing(void *arg) {
+ isc_nmsocket_t *sock = (isc_nmsocket_t *) arg;
+
+ REQUIRE(VALID_NMSOCK(sock));
+
+ if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) {
+ return;
+ }
+
+ /*
+ * If we're in sequential mode or over the
+ * clients-per-connection limit, the sock can
+ * resume reading now.
+ */
+ if (atomic_load(&sock->overlimit) || atomic_load(&sock->sequential)) {
+ atomic_store(&sock->overlimit, false);
+ atomic_store(&sock->processing, false);
+ isc_nm_resumeread(sock->outer);
+ }
+}
+
static void
tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
tcpsend_t *ts = (tcpsend_t *) cbarg;
isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
/*
- * The response was sent, if we're in sequential mode resume
- * processing.
+ * The response was sent; if we're in sequential or overlimit
+ * mode, resume processing now.
*/
- if (atomic_load(&ts->orighandle->sock->sequential)) {
+ if (atomic_load(&ts->orighandle->sock->sequential) ||
+ atomic_load(&ts->orighandle->sock->overlimit))
+ {
atomic_store(&ts->orighandle->sock->processing, false);
+ atomic_store(&ts->orighandle->sock->overlimit, false);
processbuffer(ts->orighandle->sock);
isc_nm_resumeread(handle->sock);
}