]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
netmgr: TCP improvements
authorWitold Kręcicki <wpk@culm.net>
Tue, 19 Nov 2019 10:56:00 +0000 (11:56 +0100)
committerEvan Hunt <each@isc.org>
Sat, 23 Nov 2019 00:46:31 +0000 (16:46 -0800)
- add timeout support for TCP and TCPDNS connections to protect against
  slowloris style attacks. currently, all timeouts are hard-coded.
- rework and simplify the TCPDNS state machine.

lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/netmgr.c
lib/isc/netmgr/tcp.c
lib/isc/netmgr/tcpdns.c

index 3961a6ff3b9bea8ddcbe615c9ca49681d6d4c1a4..037ffa145ff3f29cee0e6e5ae540f20718920269 100644 (file)
@@ -116,6 +116,7 @@ typedef enum isc__netievent_type {
        netievent_tcplisten,
        netievent_tcpstoplisten,
        netievent_tcpclose,
+       netievent_closecb,
 } isc__netievent_type;
 
 typedef struct isc__netievent_stop {
@@ -186,6 +187,7 @@ typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
 typedef isc__netievent__socket_t isc__netievent_startread_t;
 typedef isc__netievent__socket_t isc__netievent_pauseread_t;
 typedef isc__netievent__socket_t isc__netievent_resumeread_t;
+typedef isc__netievent__socket_t isc__netievent_closecb_t;
 
 typedef struct isc__netievent__socket_req {
        isc__netievent_type     type;
@@ -268,6 +270,9 @@ struct isc_nmsocket {
        isc_nmsocket_t          *parent;
        isc_quota_t             *quota;
        bool                    overquota;
+       uv_timer_t              timer;
+       bool                    timer_initialized;
+       uint64_t                read_timeout;
 
        /*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */
        isc_nmsocket_t          *outer;
@@ -366,7 +371,7 @@ struct isc_nmsocket {
         * might want to change it to something lockless in the
         * future.
         */
-       size_t                  ah;
+       atomic_int_fast32_t     ah;
        size_t                  ah_size;
        size_t                  *ah_frees;
        isc_nmhandle_t          **ah_handles;
@@ -398,6 +403,8 @@ isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type);
 /*%<
  * Allocate an ievent and set the type.
  */
+void
+isc__nm_put_ievent(isc_nm_t *mgr, void *ievent);
 
 void
 isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event);
@@ -471,6 +478,12 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock);
  * if there are no remaining references or active handles.
  */
 
+void
+isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0);
+/*%<
+ * Issue a 'handle closed' callback on the socket.
+ */
+
 isc_result_t
 isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region,
                 isc_nm_cb_t cb, void *cbarg);
index ba7094baddb70f41b372431497fe9d7639b766e7..e77b7c96f1927e524d2219320ca5026ba108d70d 100644 (file)
@@ -450,12 +450,15 @@ async_cb(uv_async_t *handle) {
                case netievent_tcpclose:
                        isc__nm_async_tcpclose(worker, ievent);
                        break;
+               case netievent_closecb:
+                       isc__nm_async_closecb(worker, ievent);
+                       break;
                default:
                        INSIST(0);
                        ISC_UNREACHABLE();
                }
-               isc_mem_put(worker->mgr->mctx, ievent,
-                           sizeof(isc__netievent_storage_t));
+
+               isc__nm_put_ievent(worker->mgr, ievent);
        }
 }
 
@@ -471,6 +474,11 @@ isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type) {
        return (event);
 }
 
+void
+isc__nm_put_ievent(isc_nm_t *mgr, void *ievent) {
+       isc_mem_put(mgr->mctx, ievent, sizeof(isc__netievent_storage_t));
+}
+
 void
 isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
        isc_queue_enqueue(worker->ievents, (uintptr_t)event);
@@ -552,6 +560,11 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
                isc_quota_detach(&sock->quota);
        }
 
+       if (sock->timer_initialized) {
+               uv_close((uv_handle_t *)&sock->timer, NULL);
+               sock->timer_initialized = false;
+       }
+
        isc_astack_destroy(sock->inactivehandles);
 
        while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) {
@@ -570,7 +583,6 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
        } else {
                isc_nm_detach(&sock->mgr);
        }
-
 }
 
 static void
@@ -596,11 +608,11 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
         * accept destruction.
         */
        LOCK(&sock->lock);
-       active_handles += sock->ah;
+       active_handles += atomic_load(&sock->ah);
        if (sock->children != NULL) {
                for (int i = 0; i < sock->nchildren; i++) {
                        LOCK(&sock->children[i].lock);
-                       active_handles += sock->children[i].ah;
+                       active_handles += atomic_load(&sock->children[i].ah);
                        UNLOCK(&sock->children[i].lock);
                }
        }
@@ -780,6 +792,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
                  isc_sockaddr_t *local)
 {
        isc_nmhandle_t *handle = NULL;
+       size_t handlenum;
        int pos;
 
        REQUIRE(VALID_NMSOCK(sock));
@@ -812,7 +825,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
 
        LOCK(&sock->lock);
        /* We need to add this handle to the list of active handles */
-       if (sock->ah == sock->ah_size) {
+       if ((size_t) atomic_load(&sock->ah) == sock->ah_size) {
                sock->ah_frees =
                        isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees,
                                           sock->ah_size * 2 *
@@ -831,7 +844,9 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
                sock->ah_size *= 2;
        }
 
-       pos = sock->ah_frees[sock->ah++];
+       handlenum = atomic_fetch_add(&sock->ah, 1);
+       pos = sock->ah_frees[handlenum];
+
        INSIST(sock->ah_handles[pos] == NULL);
        sock->ah_handles[pos] = handle;
        handle->ah_pos = pos;
@@ -875,62 +890,85 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
        *handle = (isc_nmhandle_t) {
                .magic = 0
        };
+
        isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra);
 }
 
 void
 isc_nmhandle_unref(isc_nmhandle_t *handle) {
+       isc_nmsocket_t *sock = NULL;
+       size_t handlenum;
+       bool reuse = false;
        int refs;
 
        REQUIRE(VALID_NMHANDLE(handle));
 
        refs = isc_refcount_decrement(&handle->references);
        INSIST(refs > 0);
-       if (refs == 1) {
-               isc_nmsocket_t *sock = handle->sock;
-               bool reuse = false;
+       if (refs > 1) {
+               return;
+       }
 
-               handle->sock = NULL;
-               if (handle->doreset != NULL) {
-                       handle->doreset(handle->opaque);
-               }
+       sock = handle->sock;
+       handle->sock = NULL;
 
-               /*
-                * We do it all under lock to avoid races with socket
-                * destruction.
-                */
-               LOCK(&sock->lock);
-               INSIST(sock->ah_handles[handle->ah_pos] == handle);
-               INSIST(sock->ah_size > handle->ah_pos);
-               INSIST(sock->ah > 0);
-               sock->ah_handles[handle->ah_pos] = NULL;
-               sock->ah_frees[--sock->ah] = handle->ah_pos;
-               handle->ah_pos = 0;
-
-               if (atomic_load(&sock->active)) {
-                       reuse = isc_astack_trypush(sock->inactivehandles,
-                                                  handle);
-               }
-               UNLOCK(&sock->lock);
+       if (handle->doreset != NULL) {
+               handle->doreset(handle->opaque);
+       }
 
-               /*
-                * Handle is closed. If the socket has a callback
-                * configured for that (e.g., to perform cleanup after
-                * request processing), call it now.
-                */
-               if (sock->closehandle_cb != NULL) {
-                       sock->closehandle_cb(sock);
-               }
+       /*
+        * We do all of this under lock to avoid races with socket
+        * destruction.
+        */
+       LOCK(&sock->lock);
 
-               if (!reuse) {
-                       nmhandle_free(sock, handle);
-               }
+       INSIST(sock->ah_handles[handle->ah_pos] == handle);
+       INSIST(sock->ah_size > handle->ah_pos);
+       INSIST(atomic_load(&sock->ah) > 0);
 
-               if (sock->ah == 0 &&
-                   !atomic_load(&sock->active) &&
-                   !atomic_load(&sock->destroying))
-               {
-                       nmsocket_maybe_destroy(sock);
+       sock->ah_handles[handle->ah_pos] = NULL;
+       handlenum = atomic_fetch_sub(&sock->ah, 1) - 1;
+       sock->ah_frees[handlenum] = handle->ah_pos;
+       handle->ah_pos = 0;
+
+       if (atomic_load(&sock->active)) {
+               reuse = isc_astack_trypush(sock->inactivehandles,
+                                          handle);
+       }
+
+       UNLOCK(&sock->lock);
+
+       if (!reuse) {
+               nmhandle_free(sock, handle);
+       }
+
+       /*
+        * The handle is closed. If the socket has a callback configured
+        * for that (e.g., to perform cleanup after request processing),
+        * call it now.
+        */
+       if (sock->closehandle_cb != NULL) {
+               if (sock->tid == isc_nm_tid()) {
+                       sock->closehandle_cb(sock);
+
+                       /*
+                        * If we do this asynchronously then
+                        * the async event will clean it up.
+                        */
+                       if (sock->ah == 0 &&
+                           !atomic_load(&sock->active) &&
+                           !atomic_load(&sock->destroying))
+                       {
+                               nmsocket_maybe_destroy(sock);
+                       }
+               } else {
+
+                       isc__netievent_closecb_t * event =
+                               isc__nm_get_ievent(sock->mgr,
+                                                  netievent_closecb);
+                       isc_nmsocket_attach(sock, &event->sock);
+                       isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+                                              (isc__netievent_t *) event);
                }
        }
 }
@@ -1055,6 +1093,21 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region,
        }
 }
 
+void
+isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0) {
+       isc__netievent_closecb_t *ievent =
+               (isc__netievent_closecb_t *) ievent0;
+
+       REQUIRE(VALID_NMSOCK(ievent->sock));
+       REQUIRE(ievent->sock->tid == isc_nm_tid());
+       REQUIRE(ievent->sock->closehandle_cb != NULL);
+
+       UNUSED(worker);
+
+       ievent->sock->closehandle_cb(ievent->sock);
+       isc_nmsocket_detach(&ievent->sock);
+}
+
 bool
 isc__nm_acquire_interlocked(isc_nm_t *mgr) {
        LOCK(&mgr->lock);
index 4b6c9ca9a4787401a90781260bedcc45aeba3017..493d82a5aa51fb60d253cd334dde930b58b6eb1b 100644 (file)
@@ -242,25 +242,52 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker,
        uv_close(&sock->uv_handle.handle, stoplistening_cb);
 }
 
+static void
+readtimeout_cb(uv_timer_t *handle) {
+       isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data;
+
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->tid == isc_nm_tid());
+
+       /*
+        * Socket is actively processing something, so restart the timer
+        * and return.
+        */
+       if (atomic_load(&sock->processing)) {
+               uv_timer_start(handle, readtimeout_cb, sock->read_timeout, 0);
+               return;
+       }
+
+       /*
+        * Timeout; stop reading and process whatever we have.
+        */
+       uv_read_stop(&sock->uv_handle.stream);
+       if (sock->quota) {
+               isc_quota_detach(&sock->quota);
+       }
+       sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
+}
+
 isc_result_t
 isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
        isc_nmsocket_t *sock = NULL;
+       isc__netievent_startread_t *ievent = NULL;
 
        REQUIRE(VALID_NMHANDLE(handle));
        REQUIRE(VALID_NMSOCK(handle->sock));
 
        sock = handle->sock;
        sock->rcb.recv = cb;
-       sock->rcbarg = cbarg; /* That's obviously broken... */
+       sock->rcbarg = cbarg;
+
+       ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
+       ievent->sock = sock;
+
        if (sock->tid == isc_nm_tid()) {
-               int r = uv_read_start(&sock->uv_handle.stream,
-                                     isc__nm_alloc_cb, read_cb);
-               INSIST(r == 0);
+               isc__nm_async_startread(&sock->mgr->workers[sock->tid],
+                                       (isc__netievent_t *) ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
        } else {
-               isc__netievent_startread_t *ievent =
-                       isc__nm_get_ievent(sock->mgr,
-                                          netievent_tcpstartread);
-               ievent->sock = sock;
                isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
                                       (isc__netievent_t *) ievent);
        }
@@ -275,12 +302,23 @@ isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0) {
        isc_nmsocket_t *sock = ievent->sock;
 
        REQUIRE(worker->id == isc_nm_tid());
+       if (sock->read_timeout != 0) {
+               if (!sock->timer_initialized) {
+                       uv_timer_init(&worker->loop, &sock->timer);
+                       sock->timer.data = sock;
+                       sock->timer_initialized = true;
+               }
+               uv_timer_start(&sock->timer, readtimeout_cb,
+                              sock->read_timeout, 0);
+       }
 
        uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, read_cb);
 }
 
 isc_result_t
 isc_nm_pauseread(isc_nmsocket_t *sock) {
+       isc__netievent_pauseread_t *ievent = NULL;
+
        REQUIRE(VALID_NMSOCK(sock));
 
        if (atomic_load(&sock->readpaused)) {
@@ -288,15 +326,14 @@ isc_nm_pauseread(isc_nmsocket_t *sock) {
        }
 
        atomic_store(&sock->readpaused, true);
+       ievent = isc__nm_get_ievent(sock->mgr, netievent_tcppauseread);
+       ievent->sock = sock;
 
        if (sock->tid == isc_nm_tid()) {
-               int r = uv_read_stop(&sock->uv_handle.stream);
-               INSIST(r == 0);
+               isc__nm_async_pauseread(&sock->mgr->workers[sock->tid],
+                                      (isc__netievent_t *) ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
        } else {
-               isc__netievent_pauseread_t *ievent =
-                       isc__nm_get_ievent(sock->mgr,
-                                          netievent_tcppauseread);
-               ievent->sock = sock;
                isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
                                       (isc__netievent_t *) ievent);
        }
@@ -309,15 +346,20 @@ isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0) {
        isc__netievent_pauseread_t *ievent =
                (isc__netievent_pauseread_t *) ievent0;
        isc_nmsocket_t *sock = ievent->sock;
-       REQUIRE(VALID_NMSOCK(sock));
 
+       REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(worker->id == isc_nm_tid());
 
+       if (sock->timer_initialized) {
+               uv_timer_stop(&sock->timer);
+       }
        uv_read_stop(&sock->uv_handle.stream);
 }
 
 isc_result_t
 isc_nm_resumeread(isc_nmsocket_t *sock) {
+       isc__netievent_startread_t *ievent = NULL;
+
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->rcb.recv != NULL);
 
@@ -327,16 +369,14 @@ isc_nm_resumeread(isc_nmsocket_t *sock) {
 
        atomic_store(&sock->readpaused, false);
 
+       ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
+       ievent->sock = sock;
+
        if (sock->tid == isc_nm_tid()) {
-               int r = uv_read_start(&sock->uv_handle.stream,
-                                     isc__nm_alloc_cb, read_cb);
-               INSIST(r == 0);
+               isc__nm_async_startread(&sock->mgr->workers[sock->tid],
+                                       (isc__netievent_t *) ievent);
+               isc__nm_put_ievent(sock->mgr, ievent);
        } else {
-               /* It's the same as startread */
-               isc__netievent_startread_t *ievent =
-                       isc__nm_get_ievent(sock->mgr,
-                                          netievent_tcpstartread);
-               ievent->sock = sock;
                isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
                                       (isc__netievent_t *) ievent);
        }
@@ -359,6 +399,11 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
 
                INSIST(sock->rcb.recv != NULL);
                sock->rcb.recv(sock->tcphandle, &region, sock->rcbarg);
+               if (sock->timer_initialized && sock->read_timeout != 0) {
+                       /* The timer will be updated */
+                       uv_timer_start(&sock->timer, readtimeout_cb,
+                                      sock->read_timeout, 0);
+               }
                isc__nm_free_uvbuf(sock, buf);
                return;
        }
@@ -440,6 +485,7 @@ accept_connection(isc_nmsocket_t *ssock) {
        handle = isc__nmhandle_get(csock, NULL, &local);
 
        INSIST(ssock->rcb.accept != NULL);
+       csock->read_timeout = 1000;
        ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg);
        isc_nmsocket_detach(&csock);
 
@@ -568,6 +614,16 @@ tcp_close_cb(uv_handle_t *uvhandle) {
        isc__nmsocket_prep_destroy(sock);
 }
 
+static void
+timer_close_cb(uv_handle_t *uvhandle) {
+       isc_nmsocket_t *sock = uvhandle->data;
+
+       REQUIRE(VALID_NMSOCK(sock));
+
+       isc_nmsocket_detach(&sock->server);
+       uv_close(&sock->uv_handle.handle, tcp_close_cb);
+}
+
 static void
 tcp_close_direct(isc_nmsocket_t *sock) {
        REQUIRE(VALID_NMSOCK(sock));
@@ -587,9 +643,13 @@ tcp_close_direct(isc_nmsocket_t *sock) {
                        }
                }
        }
-
-       isc_nmsocket_detach(&sock->server);
-       uv_close(&sock->uv_handle.handle, tcp_close_cb);
+       if (sock->timer_initialized) {
+               uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
+               sock->timer_initialized = false;
+       } else {
+               isc_nmsocket_detach(&sock->server);
+               uv_close(&sock->uv_handle.handle, tcp_close_cb);
+       }
 }
 
 void
index 8e86a39474da48bdae23c27ae5c165c140053e56..a06d5f7b0f61b4ae51f16fa14c6cb02050b739f6 100644 (file)
@@ -47,8 +47,16 @@ dnslen(unsigned char* base) {
        return ((base[0] << 8) + (base[1]));
 }
 
+/*
+ * Regular TCP buffer, should suffice in most cases.
+ */
 #define NM_REG_BUF 4096
-#define NM_BIG_BUF 65536
+/*
+ * Two full DNS packets with lengths.
+ * netmgr receives 64k at most so there's no risk
+ * of overrun.
+ */
+#define NM_BIG_BUF (65535+2)*2
 static inline void
 alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
        REQUIRE(len <= NM_BIG_BUF);
@@ -66,6 +74,23 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
        }
 }
 
+static void
+timer_close_cb(uv_handle_t *handle) {
+       isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data;
+       INSIST(VALID_NMSOCK(sock));
+       sock->timer_initialized = false;
+       atomic_store(&sock->closed, true);
+       isc_nmsocket_detach(&sock);
+}
+
+static void
+dnstcp_readtimeout(uv_timer_t *timer) {
+       isc_nmsocket_t *sock = (isc_nmsocket_t *) timer->data;
+       REQUIRE(VALID_NMSOCK(sock));
+       isc_nmsocket_detach(&sock->outer);
+       uv_close((uv_handle_t*) &sock->timer, timer_close_cb);
+}
+
 /*
  * Accept callback for TCP-DNS connection
  */
@@ -94,77 +119,71 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        isc_nmsocket_attach(handle->sock, &dnssock->outer);
        dnssock->peer = handle->sock->peer;
        dnssock->iface = handle->sock->iface;
+       dnssock->read_timeout = 5000;
+       dnssock->tid = isc_nm_tid();
        dnssock->closehandle_cb = resume_processing;
 
+       uv_timer_init(&dnssock->mgr->workers[isc_nm_tid()].loop,
+                     &dnssock->timer);
+       dnssock->timer.data = dnssock;
+       dnssock->timer_initialized = true;
+       uv_timer_start(&dnssock->timer, dnstcp_readtimeout,
+                      dnssock->read_timeout, 0);
+
        isc_nm_read(handle, dnslisten_readcb, dnssock);
 }
 
-static bool
-connection_limit(isc_nmsocket_t *sock) {
-       int ah;
-
-       REQUIRE(sock->type == isc_nm_tcpdnssocket && sock->outer != NULL);
-
-       if (atomic_load(&sock->sequential)) {
-               /*
-                * We're already non-pipelining, so there's
-                * no need to check per-connection limits.
-                */
-               return (false);
-       }
+/*
+ * Process a single packet from the incoming buffer.
+ *
+ * Return ISC_R_SUCCESS and attach 'handlep' to a handle if something
+ * was processed; return ISC_R_NOMORE if there isn't a full message
+ * to be processed.
+ *
+ * The caller will need to unreference the handle.
+ */
+static isc_result_t
+processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
+       size_t len;
 
-       LOCK(&sock->lock);
-       ah = sock->ah;
-       UNLOCK(&sock->lock);
+       REQUIRE(VALID_NMSOCK(dnssock));
+       REQUIRE(handlep != NULL && *handlep == NULL);
 
-       if (ah >= TCPDNS_CLIENTS_PER_CONN) {
-               atomic_store(&sock->overlimit, true);
-               isc_nm_pauseread(sock->outer);
-               return (true);
+       /*
+        * If we don't even have the length yet, we can't do
+        * anything.
+        */
+       if (dnssock->buf_len < 2) {
+               return (ISC_R_NOMORE);
        }
 
-       return (false);
-}
-
-/* Process all complete packets out of incoming buffer */
-static void
-processbuffer(isc_nmsocket_t *dnssock) {
-       REQUIRE(VALID_NMSOCK(dnssock));
-
-       /* While we have a complete packet in the buffer */
-       while (dnssock->buf_len > 2 &&
-              dnslen(dnssock->buf) <= dnssock->buf_len - 2 &&
-              !connection_limit(dnssock))
-       {
+       /*
+        * Process the first packet from the buffer, leaving
+        * the rest (if any) for later.
+        */
+       len = dnslen(dnssock->buf);
+       if (len <= dnssock->buf_len - 2) {
                isc_nmhandle_t *dnshandle = NULL;
                isc_region_t r2 = {
                        .base = dnssock->buf + 2,
-                       .length = dnslen(dnssock->buf)
+                       .length = len
                };
-               size_t len;
 
                dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
-               atomic_store(&dnssock->processing, true);
                dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
 
-               /*
-                * If the recv callback wants to hold on to the
-                * handle, it needs to attach to it.
-                */
-               isc_nmhandle_unref(dnshandle);
-
-               len = dnslen(dnssock->buf) + 2;
+               len += 2;
                dnssock->buf_len -= len;
                if (len > 0) {
                        memmove(dnssock->buf, dnssock->buf + len,
                                dnssock->buf_len);
                }
 
-               /* Check here to make sure we do the processing at least once */
-               if (atomic_load(&dnssock->processing)) {
-                       return;
-               }
+               *handlep = dnshandle;
+               return (ISC_R_SUCCESS);
        }
+
+       return (ISC_R_NOMORE);
 }
 
 /*
@@ -174,8 +193,8 @@ processbuffer(isc_nmsocket_t *dnssock) {
 static void
 dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
        isc_nmsocket_t *dnssock = (isc_nmsocket_t *) arg;
-       isc_sockaddr_t local;
        unsigned char *base = NULL;
+       bool done = false;
        size_t len;
 
        REQUIRE(VALID_NMSOCK(dnssock));
@@ -183,133 +202,63 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
 
        if (region == NULL) {
                /* Connection closed */
-               atomic_store(&dnssock->closed, true);
-               isc_nmsocket_detach(&dnssock->outer);
-               isc_nmsocket_detach(&dnssock);
+               isc__nm_tcpdns_close(dnssock);
                return;
        }
 
-       local = isc_nmhandle_localaddr(handle);
-
        base = region->base;
        len = region->length;
 
-       /*
-        * We have something in the buffer, we need to glue it.
-        */
-       if (dnssock->buf_len > 0) {
-               if (dnssock->buf_len == 1) {
-                       /* Make sure we have the length */
-                       dnssock->buf[1] = base[0];
-                       dnssock->buf_len = 2;
-                       base++;
-                       len--;
-               }
-
-               processbuffer(dnssock);
+       if (dnssock->buf_len + len > dnssock->buf_size) {
+               alloc_dnsbuf(dnssock, dnssock->buf_len + len);
        }
+       memmove(dnssock->buf + dnssock->buf_len, base, len);
+       dnssock->buf_len += len;
 
-       if (dnssock->buf_len > 0) {
-               size_t plen;
-
-               if (dnssock->buf_len == 1) {
-                       /* Make sure we have the length */
-                       dnssock->buf[1] = base[0];
-                       dnssock->buf_len = 2;
-                       base++;
-                       len--;
-               }
-
-               /* At this point we definitely have 2 bytes there. */
-               plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 -
-                                    dnssock->buf_len));
+       do {
+               isc_result_t result;
+               isc_nmhandle_t *dnshandle = NULL;
 
-               if (dnssock->buf_len + plen > NM_BIG_BUF) {
+               result = processbuffer(dnssock, &dnshandle);
+               if (result != ISC_R_SUCCESS) {
                        /*
-                        * XXX: continuing to read will overrun the
-                        * socket buffer. We may need to force the
-                        * connection to close so the client will have
-                        * to open a new one.
+                        * There wasn't anything in the buffer to process.
                         */
                        return;
                }
 
-               if (dnssock->buf_len + plen > dnssock->buf_size) {
-                       alloc_dnsbuf(dnssock, dnssock->buf_len + plen);
-               }
-
-               memmove(dnssock->buf + dnssock->buf_len, base, plen);
-               dnssock->buf_len += plen;
-               base += plen;
-               len -= plen;
-
-               /* Do we have a complete packet in the buffer? */
-               if (dnslen(dnssock->buf) >= dnssock->buf_len - 2 &&
-                   !connection_limit(dnssock))
-               {
-                       isc_nmhandle_t *dnshandle = NULL;
-                       isc_region_t r2 = {
-                               .base = dnssock->buf + 2,
-                               .length = dnslen(dnssock->buf)
-                       };
-
-                       dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
-                       atomic_store(&dnssock->processing, true);
-                       dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
-                       dnssock->buf_len = 0;
+               /*
+                * We have a packet: stop timeout timers
+                */
+               atomic_store(&dnssock->outer->processing, true);
+               uv_timer_stop(&dnssock->timer);
 
+               if (dnssock->sequential) {
                        /*
-                        * If the recv callback wants to hold on to the
-                        * handle, it needs to attach to it.
+                        * We're in sequential mode and we processed
+                        * one packet, so we're done until the next read
+                        * completes.
                         */
-                       isc_nmhandle_unref(dnshandle);
+                       isc_nm_pauseread(dnssock->outer);
+                       done = true;
+               } else {
+                       /*
+                        * We're pipelining, so we now resume processing
+                        * packets until the clients-per-connection limit
+                        * is reached (as determined by the number of
+                        * active handles on the socket). When the limit
+                        * is reached, pause reading.
+                        */
+                       if (atomic_load(&dnssock->ah) >=
+                           TCPDNS_CLIENTS_PER_CONN)
+                       {
+                               isc_nm_pauseread(dnssock->outer);
+                               done = true;
+                       }
                }
-       }
-
-       /*
-        * At this point we've processed whatever was previously in the
-        * socket buffer. If there are more messages to be found in what
-        * we've read, and if we're either pipelining or not processing
-        * anything else currently, then we can process those messages now.
-        */
-       while (len >= 2 && dnslen(base) <= len - 2 &&
-              (!atomic_load(&dnssock->sequential) ||
-               !atomic_load(&dnssock->processing)) &&
-              !connection_limit(dnssock))
-       {
-               isc_nmhandle_t *dnshandle = NULL;
-               isc_region_t r2 = {
-                       .base = base + 2,
-                       .length = dnslen(base)
-               };
 
-               len -= dnslen(base) + 2;
-               base += dnslen(base) + 2;
-
-               dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
-               atomic_store(&dnssock->processing, true);
-               dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
-
-               /*
-                * If the recv callback wants to hold on to the
-                * handle, it needs to attach to it.
-                */
                isc_nmhandle_unref(dnshandle);
-       }
-
-       /*
-        * We have less than a full message remaining; it can be
-        * stored in the socket buffer for next time.
-        */
-       if (len > 0) {
-               if (len > dnssock->buf_size) {
-                       alloc_dnsbuf(dnssock, len);
-               }
-
-               INSIST(len <= dnssock->buf_size);
-               memmove(dnssock->buf, base, len);
-               dnssock->buf_len = len;
-       }
+       } while (!done);
 }
 
 /*
@@ -394,23 +343,64 @@ typedef struct tcpsend {
 static void
 resume_processing(void *arg) {
        isc_nmsocket_t *sock = (isc_nmsocket_t *) arg;
+       isc_result_t result;
 
        REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->tid == isc_nm_tid());
 
        if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) {
                return;
        }
 
+       if (atomic_load(&sock->ah) == 0) {
+               /* Nothing is active; sockets can timeout now */
+               atomic_store(&sock->outer->processing, false);
+               uv_timer_start(&sock->timer, dnstcp_readtimeout,
+                              sock->read_timeout, 0);
+       }
+
        /*
-        * If we're in sequential mode or over the
-        * clients-per-connection limit, the sock can
-        * resume reading now.
+        * For sequential sockets: Process what's in the buffer, or
+        * if there aren't any messages buffered, resume reading.
         */
-       if (atomic_load(&sock->overlimit) || atomic_load(&sock->sequential)) {
-               atomic_store(&sock->overlimit, false);
-               atomic_store(&sock->processing, false);
-               isc_nm_resumeread(sock->outer);
+       if (sock->sequential) {
+               isc_nmhandle_t *handle = NULL;
+
+               result = processbuffer(sock, &handle);
+               if (result == ISC_R_SUCCESS) {
+                       atomic_store(&sock->outer->processing, true);
+                       uv_timer_stop(&sock->timer);
+                       isc_nmhandle_unref(handle);
+               } else if (sock->outer != NULL) {
+                       isc_nm_resumeread(sock->outer);
+               }
+
+               return;
        }
+
+       /*
+        * For pipelined sockets: If we're under the clients-per-connection
+        * limit, resume processing until we reach the limit again.
+        */
+       do {
+               isc_nmhandle_t *dnshandle = NULL;
+
+               result = processbuffer(sock, &dnshandle);
+               if (result != ISC_R_SUCCESS) {
+                       /*
+                        * Nothing in the buffer; resume reading.
+                        */
+                       if (sock->outer != NULL) {
+                               isc_nm_resumeread(sock->outer);
+                       }
+
+                       break;
+               }
+
+               uv_timer_stop(&sock->timer);
+               atomic_store(&sock->outer->processing, true);
+               isc_nmhandle_unref(dnshandle);
+       } while (atomic_load(&sock->ah) < TCPDNS_CLIENTS_PER_CONN);
 }
 
 static void
@@ -422,19 +412,6 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
        ts->cb(ts->orighandle, result, ts->cbarg);
        isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
 
-       /*
-        * The response was sent; if we're in sequential or overlimit
-        * mode, resume processing now.
-        */
-       if (atomic_load(&ts->orighandle->sock->sequential) ||
-           atomic_load(&ts->orighandle->sock->overlimit))
-       {
-               atomic_store(&ts->orighandle->sock->processing, false);
-               atomic_store(&ts->orighandle->sock->overlimit, false);
-               processbuffer(ts->orighandle->sock);
-               isc_nm_resumeread(handle->sock);
-       }
-
        isc_nmhandle_unref(ts->orighandle);
        isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts));
 }
@@ -483,12 +460,11 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
        return (isc__nm_tcp_send(t->handle, &t->region, tcpdnssend_cb, t));
 }
 
+
 void
 isc__nm_tcpdns_close(isc_nmsocket_t *sock) {
        if (sock->outer != NULL) {
                isc_nmsocket_detach(&sock->outer);
        }
-
-       atomic_store(&sock->closed, true);
-       isc__nmsocket_prep_destroy(sock);
+       uv_close((uv_handle_t*) &sock->timer, timer_close_cb);
 }