]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
place a limit on pipelined queries that can be processed simultaneously
authorEvan Hunt <each@isc.org>
Fri, 8 Nov 2019 18:52:49 +0000 (10:52 -0800)
committerEvan Hunt <each@isc.org>
Mon, 18 Nov 2019 02:59:39 +0000 (18:59 -0800)
when the TCPDNS_CLIENTS_PER_CONN limit has been exceeded for a TCP
DNS connection, switch to sequential mode to ensure that memory cannot
be exhausted by too many simultaneous queries.

lib/isc/include/isc/netmgr.h
lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/netmgr/tcpdns.c
lib/ns/client.c

index e68bed759c700d8ea130879a01b9761491b0402d..1696f3cb8c5fd00c4783698fcd1f49ca4dafec71 100644 (file)
@@ -95,7 +95,7 @@ isc_nmhandle_getdata(isc_nmhandle_t *handle);
 void *
 isc_nmhandle_getextra(isc_nmhandle_t *handle);
 
-typedef void (*isc_nm_opaquecb)(void *arg);
+typedef void (*isc_nm_opaquecb_t)(void *arg);
 
 bool
 isc_nmhandle_is_stream(isc_nmhandle_t *handle);
@@ -109,7 +109,7 @@ isc_nmhandle_is_stream(isc_nmhandle_t *handle);
  */
 void
 isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
-                    isc_nm_opaquecb doreset, isc_nm_opaquecb dofree);
+                    isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree);
 
 isc_sockaddr_t
 isc_nmhandle_peeraddr(isc_nmhandle_t *handle);
@@ -273,7 +273,17 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle);
  * Disable pipelining on this connection. Each DNS packet
  * will be only processed after the previous completes.
  *
- * This cannot be reversed once set for a given connection
+ * The socket must be unpaused after the query is processed.
+ * This is done the response is sent, or if we're dropping the
+ * query, it will be done when a handle is fully dereferenced
+ * by calling the socket's closehandle_cb callback.
+ *
+ * Note: This can only be run while a message is being processed;
+ * if it is run before any messages are read, no messages will
+ * be read.
+ *
+ * Also note: once this has been set, it cannot be reversed for a
+ * given connection.
  */
 
 void
index b95be9873b0e97a4a73cd1889bdf5fcf57633fa6..c1edf1ca9baff376c13d7bb252ea5496561e40d6 100644 (file)
@@ -89,8 +89,8 @@ struct isc_nmhandle {
 
        isc_sockaddr_t          peer;
        isc_sockaddr_t          local;
-       isc_nm_opaquecb         doreset; /* reset extra callback, external */
-       isc_nm_opaquecb         dofree;  /* free extra callback, external */
+       isc_nm_opaquecb_t       doreset; /* reset extra callback, external */
+       isc_nm_opaquecb_t       dofree;  /* free extra callback, external */
        void *                  opaque;
        char                    extra[];
 };
@@ -312,15 +312,28 @@ struct isc_nmsocket {
        isc_refcount_t          references;
 
        /*%
-        * TCPDNS socket is not pipelining.
+        * TCPDNS socket has been set not to pipeliine.
         */
        atomic_bool             sequential;
+
+       /*%
+        * TCPDNS socket has exceeded the maximum number of
+        * simultaneous requests per connecton, so will be temporarily
+        * restricted from pipelining.
+        */
+       atomic_bool             overlimit;
+
        /*%
         * TCPDNS socket in sequential mode is currently processing a packet,
         * we need to wait until it finishes.
         */
        atomic_bool             processing;
 
+       /*%
+        * A TCP socket has had isc_nm_pauseread() called.
+        */
+       atomic_bool             readpaused;
+
        /*%
         * 'spare' handles for that can be reused to avoid allocations,
         * for UDP.
@@ -334,24 +347,26 @@ struct isc_nmsocket {
 
        /*%
         * List of active handles.
-        * ah_size - size of ah_frees and ah_handles
-        * ah_cpos - current position in ah_frees;
-        * ah_handles - array of *handles.
+        * ah - current position in 'ah_frees'; this represents the
+        *      current number of active handles;
+        * ah_size - size of the 'ah_frees' and 'ah_handles' arrays
+        * ah_handles - array pointers to active handles
+        *
         * Adding a handle
-        *  - if ah_cpos == ah_size, realloc
-        *  - x = ah_frees[ah_cpos]
-        *  - ah_frees[ah_cpos++] = 0;
+        *  - if ah == ah_size, reallocate
+        *  - x = ah_frees[ah]
+        *  - ah_frees[ah++] = 0;
         *  - ah_handles[x] = handle
         *  - x must be stored with the handle!
         * Removing a handle:
-        *  - ah_frees[--ah_cpos] = x
+        *  - ah_frees[--ah] = x
         *  - ah_handles[x] = NULL;
         *
-        * XXXWPK for now this is locked with socket->lock, but we might want
-        * to change it to something lockless
+        * XXXWPK for now this is locked with socket->lock, but we
+        * might want to change it to something lockless
         */
+       size_t                  ah;
        size_t                  ah_size;
-       size_t                  ah_cpos;
        size_t                  *ah_frees;
        isc_nmhandle_t          **ah_handles;
 
@@ -360,6 +375,13 @@ struct isc_nmsocket {
        size_t                  buf_len;
        unsigned char           *buf;
 
+       /*
+        * This function will be called with handle->sock
+        * as the argument whenever a handle's references drop
+        * to zero, after its reset callback has been called.
+        */
+       isc_nm_opaquecb_t       closehandle_cb;
+
        isc__nm_readcb_t        rcb;
        void                    *rcbarg;
 };
index 962bc1af033f9dbd80cc763f233e3cb12912955c..da8e73d81e42bc76e2ecb6eadff25ae202eb8cbc 100644 (file)
@@ -529,7 +529,7 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
        }
 
        if (sock->buf != NULL) {
-               isc_mem_put(sock->mgr->mctx, sock->buf, sock->buf_size);
+               isc_mem_free(sock->mgr->mctx, sock->buf);
        }
 
        if (sock->quota != NULL) {
@@ -580,11 +580,11 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
         * accept destruction.
         */
        LOCK(&sock->lock);
-       active_handles += sock->ah_cpos;
+       active_handles += sock->ah;
        if (sock->children != NULL) {
                for (int i = 0; i < sock->nchildren; i++) {
                        LOCK(&sock->children[i].lock);
-                       active_handles += sock->children[i].ah_cpos;
+                       active_handles += sock->children[i].ah;
                        UNLOCK(&sock->children[i].lock);
                }
        }
@@ -701,7 +701,12 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr,
        isc_mutex_init(&sock->lock);
        isc_condition_init(&sock->cond);
        isc_refcount_init(&sock->references, 1);
+
        atomic_init(&sock->active, true);
+       atomic_init(&sock->sequential, false);
+       atomic_init(&sock->overlimit, false);
+       atomic_init(&sock->processing, false);
+       atomic_init(&sock->readpaused, false);
 
        sock->magic = NMSOCK_MAGIC;
 }
@@ -729,14 +734,15 @@ isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) {
        isc__networker_t *worker = NULL;
 
        REQUIRE(VALID_NMSOCK(sock));
-
+       if (buf->base == NULL) {
+               /* Empty buffer: might happen in case of error. */
+               return;
+       }
        worker = &sock->mgr->workers[sock->tid];
 
        REQUIRE(worker->udprecvbuf_inuse);
        REQUIRE(buf->base == worker->udprecvbuf);
 
-       UNUSED(buf);
-
        worker->udprecvbuf_inuse = false;
 }
 
@@ -791,7 +797,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
 
        LOCK(&sock->lock);
        /* We need to add this handle to the list of active handles */
-       if (sock->ah_cpos == sock->ah_size) {
+       if (sock->ah == sock->ah_size) {
                sock->ah_frees =
                        isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees,
                                           sock->ah_size * 2 *
@@ -810,7 +816,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
                sock->ah_size *= 2;
        }
 
-       pos = sock->ah_frees[sock->ah_cpos++];
+       pos = sock->ah_frees[sock->ah++];
        INSIST(sock->ah_handles[pos] == NULL);
        sock->ah_handles[pos] = handle;
        handle->ah_pos = pos;
@@ -847,7 +853,7 @@ static void
 nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
        size_t extra = sock->extrahandlesize;
 
-       if (handle->dofree) {
+       if (handle->dofree != NULL) {
                handle->dofree(handle->opaque);
        }
 
@@ -881,9 +887,9 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) {
                LOCK(&sock->lock);
                INSIST(sock->ah_handles[handle->ah_pos] == handle);
                INSIST(sock->ah_size > handle->ah_pos);
-               INSIST(sock->ah_cpos > 0);
+               INSIST(sock->ah > 0);
                sock->ah_handles[handle->ah_pos] = NULL;
-               sock->ah_frees[--sock->ah_cpos] = handle->ah_pos;
+               sock->ah_frees[--sock->ah] = handle->ah_pos;
                handle->ah_pos = 0;
 
                if (atomic_load(&sock->active)) {
@@ -892,11 +898,20 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) {
                }
                UNLOCK(&sock->lock);
 
+               /*
+                * Handle is closed. If the socket has a callback
+                * configured for that (e.g., to perform cleanup after
+                * request processing), call it now.
+                */
+               if (sock->closehandle_cb != NULL) {
+                       sock->closehandle_cb(sock);
+               }
+
                if (!reuse) {
                        nmhandle_free(sock, handle);
                }
 
-               if (sock->ah_cpos == 0 &&
+               if (sock->ah == 0 &&
                    !atomic_load(&sock->active) &&
                    !atomic_load(&sock->destroying))
                {
@@ -914,7 +929,7 @@ isc_nmhandle_getdata(isc_nmhandle_t *handle) {
 
 void
 isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
-                    isc_nm_opaquecb doreset, isc_nm_opaquecb dofree)
+                    isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree)
 {
        REQUIRE(VALID_NMHANDLE(handle));
 
index f0aabd28f8eeb90ec2676243bac9be2e4fc8f484..59861b604f5ca62337656c2412b38096983fba43 100644 (file)
@@ -280,6 +280,12 @@ isc_result_t
 isc_nm_pauseread(isc_nmsocket_t *sock) {
        REQUIRE(VALID_NMSOCK(sock));
 
+       if (atomic_load(&sock->readpaused)) {
+               return (ISC_R_SUCCESS);
+       }
+
+       atomic_store(&sock->readpaused, true);
+
        if (sock->tid == isc_nm_tid()) {
                int r = uv_read_stop(&sock->uv_handle.stream);
                INSIST(r == 0);
@@ -312,6 +318,12 @@ isc_nm_resumeread(isc_nmsocket_t *sock) {
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->rcb.recv != NULL);
 
+       if (!atomic_load(&sock->readpaused)) {
+               return (ISC_R_SUCCESS);
+       }
+
+       atomic_store(&sock->readpaused, false);
+
        if (sock->tid == isc_nm_tid()) {
                int r = uv_read_start(&sock->uv_handle.stream,
                                      isc__nm_alloc_cb, read_cb);
index 0c38ecf49474172a81d5c48c1546ab9b2b9a9dcf..8e86a39474da48bdae23c27ae5c165c140053e56 100644 (file)
 
 #include "netmgr-int.h"
 
+#define TCPDNS_CLIENTS_PER_CONN 23
+/*%<
+ *
+ * Maximum number of simultaneous handles in flight supported for a single
+ * connected TCPDNS socket. This value was chosen arbitrarily, and may be
+ * changed in the future.
+ */
+
 static void
 dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg);
 
+static void
+resume_processing(void *arg);
+
 static inline size_t
 dnslen(unsigned char* base) {
        return ((base[0] << 8) + (base[1]));
@@ -45,7 +56,7 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
        if (sock->buf == NULL) {
                /* We don't have the buffer at all */
                size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF;
-               sock->buf = isc_mem_get(sock->mgr->mctx, alloc_len);
+               sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len);
                sock->buf_size = alloc_len;
        } else {
                /* We have the buffer but it's too small */
@@ -55,7 +66,6 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
        }
 }
 
-
 /*
  * Accept callback for TCP-DNS connection
  */
@@ -84,10 +94,79 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        isc_nmsocket_attach(handle->sock, &dnssock->outer);
        dnssock->peer = handle->sock->peer;
        dnssock->iface = handle->sock->iface;
+       dnssock->closehandle_cb = resume_processing;
 
        isc_nm_read(handle, dnslisten_readcb, dnssock);
 }
 
+static bool
+connection_limit(isc_nmsocket_t *sock) {
+       int ah;
+
+       REQUIRE(sock->type == isc_nm_tcpdnssocket && sock->outer != NULL);
+
+       if (atomic_load(&sock->sequential)) {
+               /*
+                * We're already non-pipelining, so there's
+                * no need to check per-connection limits.
+                */
+               return (false);
+       }
+
+       LOCK(&sock->lock);
+       ah = sock->ah;
+       UNLOCK(&sock->lock);
+
+       if (ah >= TCPDNS_CLIENTS_PER_CONN) {
+               atomic_store(&sock->overlimit, true);
+               isc_nm_pauseread(sock->outer);
+               return (true);
+       }
+
+       return (false);
+}
+
+/* Process all complete packets out of incoming buffer */
+static void
+processbuffer(isc_nmsocket_t *dnssock) {
+       REQUIRE(VALID_NMSOCK(dnssock));
+
+       /* While we have a complete packet in the buffer */
+       while (dnssock->buf_len > 2 &&
+              dnslen(dnssock->buf) <= dnssock->buf_len - 2 &&
+              !connection_limit(dnssock))
+       {
+               isc_nmhandle_t *dnshandle = NULL;
+               isc_region_t r2 = {
+                       .base = dnssock->buf + 2,
+                       .length = dnslen(dnssock->buf)
+               };
+               size_t len;
+
+               dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
+               atomic_store(&dnssock->processing, true);
+               dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
+
+               /*
+                * If the recv callback wants to hold on to the
+                * handle, it needs to attach to it.
+                */
+               isc_nmhandle_unref(dnshandle);
+
+               len = dnslen(dnssock->buf) + 2;
+               dnssock->buf_len -= len;
+               if (len > 0) {
+                       memmove(dnssock->buf, dnssock->buf + len,
+                               dnssock->buf_len);
+               }
+
+               /* Check here to make sure we do the processing at least once */
+               if (atomic_load(&dnssock->processing)) {
+                       return;
+               }
+       }
+}
+
 /*
  * We've got a read on our underlying socket, need to check if we have
  * a complete DNS packet and, if so - call the callback
@@ -118,6 +197,18 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
        /*
         * We have something in the buffer, we need to glue it.
         */
+       if (dnssock->buf_len > 0) {
+               if (dnssock->buf_len == 1) {
+                       /* Make sure we have the length */
+                       dnssock->buf[1] = base[0];
+                       dnssock->buf_len = 2;
+                       base++;
+                       len--;
+               }
+
+               processbuffer(dnssock);
+       }
+
        if (dnssock->buf_len > 0) {
                size_t plen;
 
@@ -132,8 +223,19 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
                /* At this point we definitely have 2 bytes there. */
                plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 -
                                     dnssock->buf_len));
-               if (plen > dnssock->buf_size) {
-                       alloc_dnsbuf(dnssock, plen);
+
+               if (dnssock->buf_len + plen > NM_BIG_BUF) {
+                       /*
+                        * XXX: continuing to read will overrun the
+                        * socket buffer. We may need to force the
+                        * connection to close so the client will have
+                        * to open a new one.
+                        */
+                       return;
+               }
+
+               if (dnssock->buf_len + plen > dnssock->buf_size) {
+                       alloc_dnsbuf(dnssock, dnssock->buf_len + plen);
                }
 
                memmove(dnssock->buf + dnssock->buf_len, base, plen);
@@ -142,12 +244,15 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
                len -= plen;
 
                /* Do we have a complete packet in the buffer? */
-               if (dnslen(dnssock->buf) == dnssock->buf_len - 2) {
+               if (dnslen(dnssock->buf) >= dnssock->buf_len - 2 &&
+                   !connection_limit(dnssock))
+               {
                        isc_nmhandle_t *dnshandle = NULL;
                        isc_region_t r2 = {
                                .base = dnssock->buf + 2,
                                .length = dnslen(dnssock->buf)
                        };
+
                        dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
                        atomic_store(&dnssock->processing, true);
                        dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
@@ -165,11 +270,12 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
         * At this point we've processed whatever was previously in the
         * socket buffer. If there are more messages to be found in what
         * we've read, and if we're either pipelining or not processing
-        * anything else, then we can process those messages now.
+        * anything else currently, then we can process those messages now.
         */
        while (len >= 2 && dnslen(base) <= len - 2 &&
-              !(atomic_load(&dnssock->sequential) &&
-                atomic_load(&dnssock->processing)))
+              (!atomic_load(&dnssock->sequential) ||
+               !atomic_load(&dnssock->processing)) &&
+              !connection_limit(dnssock))
        {
                isc_nmhandle_t *dnshandle = NULL;
                isc_region_t r2 = {
@@ -206,46 +312,6 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
        }
 }
 
-/* Process all complete packets out of incoming buffer */
-static void
-processbuffer(isc_nmsocket_t *dnssock) {
-       REQUIRE(VALID_NMSOCK(dnssock));
-
-       /* While we have a complete packet in the buffer */
-       while (dnssock->buf_len > 2 &&
-              dnslen(dnssock->buf) <= dnssock->buf_len - 2)
-       {
-               isc_nmhandle_t *dnshandle = NULL;
-               isc_region_t r2 = {
-                       .base = dnssock->buf + 2,
-                       .length = dnslen(dnssock->buf)
-               };
-               size_t len;
-
-               dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
-               atomic_store(&dnssock->processing, true);
-               dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
-
-               /*
-                * If the recv callback wants to hold on to the
-                * handle, it needs to attach to it.
-                */
-               isc_nmhandle_unref(dnshandle);
-
-               len = dnslen(dnssock->buf) + 2;
-               dnssock->buf_len -= len;
-               if (len > 0) {
-                       memmove(dnssock->buf, dnssock->buf + len,
-                               dnssock->buf_len);
-               }
-
-               /* Check here to make sure we do the processing at least once */
-               if (atomic_load(&dnssock->processing)) {
-                       return;
-               }
-       }
-}
-
 /*
  * isc_nm_listentcpdns listens for connections and accepts
  * them immediately, then calls the cb for each incoming DNS packet
@@ -306,13 +372,11 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) {
 
        /*
         * We don't want pipelining on this connection. That means
-        * that we can launch query processing only when the previous
-        * one returned.
-        *
-        * The socket MUST be unpaused after the query is processed.
-        * This is done by isc_nm_resumeread() in tcpdnssend_cb() below.
-        *
-        * XXX: The callback is not currently executed in failure cases!
+        * that we need to pause after reading each request, and
+        * resume only after the request has been processed. This
+        * is done in resume_processing(), which is the socket's
+        * closehandle_cb callback, called whenever a handle
+        * is released.
         */
        isc_nm_pauseread(handle->sock->outer);
        atomic_store(&handle->sock->sequential, true);
@@ -327,6 +391,28 @@ typedef struct tcpsend {
        void                    *cbarg;
 } tcpsend_t;
 
+static void
+resume_processing(void *arg) {
+       isc_nmsocket_t *sock = (isc_nmsocket_t *) arg;
+
+       REQUIRE(VALID_NMSOCK(sock));
+
+       if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) {
+               return;
+       }
+
+       /*
+        * If we're in sequential mode or over the
+        * clients-per-connection limit, the sock can
+        * resume reading now.
+        */
+       if (atomic_load(&sock->overlimit) || atomic_load(&sock->sequential)) {
+               atomic_store(&sock->overlimit, false);
+               atomic_store(&sock->processing, false);
+               isc_nm_resumeread(sock->outer);
+       }
+}
+
 static void
 tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        tcpsend_t *ts = (tcpsend_t *) cbarg;
@@ -337,11 +423,14 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
 
        /*
-        * The response was sent, if we're in sequential mode resume
-        * processing.
+        * The response was sent; if we're in sequential or overlimit
+        * mode, resume processing now.
         */
-       if (atomic_load(&ts->orighandle->sock->sequential)) {
+       if (atomic_load(&ts->orighandle->sock->sequential) ||
+           atomic_load(&ts->orighandle->sock->overlimit))
+       {
                atomic_store(&ts->orighandle->sock->processing, false);
+               atomic_store(&ts->orighandle->sock->overlimit, false);
                processbuffer(ts->orighandle->sock);
                isc_nm_resumeread(handle->sock);
        }
index e360ba1422c477445ed85621ea391feebf917370..189172f7e33df465599a42a1fb553c057dc74e08 100644 (file)
@@ -2473,7 +2473,6 @@ ns_clientmgr_destroy(ns_clientmgr_t **managerp) {
 
        MTRACE("destroy");
 
-       /* XXXWPK TODO we need to pause netmgr here */
        /*
         * Check for success because we may already be task-exclusive
         * at this point.  Only if we succeed at obtaining an exclusive