From: Witold KrÄ™cicki Date: Tue, 19 Nov 2019 10:56:00 +0000 (+0100) Subject: netmgr: TCP improvements X-Git-Tag: v9.15.7~75^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b7a72b1667fa3e73de217b00372ee6437bcd7916;p=thirdparty%2Fbind9.git netmgr: TCP improvements - add timeout support for TCP and TCPDNS connections to protect against slowloris style attacks. currently, all timeouts are hard-coded. - rework and simplify the TCPDNS state machine. --- diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 3961a6ff3b9..037ffa145ff 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -116,6 +116,7 @@ typedef enum isc__netievent_type { netievent_tcplisten, netievent_tcpstoplisten, netievent_tcpclose, + netievent_closecb, } isc__netievent_type; typedef struct isc__netievent_stop { @@ -186,6 +187,7 @@ typedef isc__netievent__socket_t isc__netievent_tcpclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; typedef isc__netievent__socket_t isc__netievent_resumeread_t; +typedef isc__netievent__socket_t isc__netievent_closecb_t; typedef struct isc__netievent__socket_req { isc__netievent_type type; @@ -268,6 +270,9 @@ struct isc_nmsocket { isc_nmsocket_t *parent; isc_quota_t *quota; bool overquota; + uv_timer_t timer; + bool timer_initialized; + uint64_t read_timeout; /*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */ isc_nmsocket_t *outer; @@ -366,7 +371,7 @@ struct isc_nmsocket { * might want to change it to something lockless in the * future. */ - size_t ah; + atomic_int_fast32_t ah; size_t ah_size; size_t *ah_frees; isc_nmhandle_t **ah_handles; @@ -398,6 +403,8 @@ isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type); /*%< * Allocate an ievent and set the type. */ +void +isc__nm_put_ievent(isc_nm_t *mgr, void *ievent); void isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event); @@ -471,6 +478,12 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock); * if there are no remaining references or active handles. */ +void +isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0); +/*%< + * Issue a 'handle closed' callback on the socket. + */ + isc_result_t isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, void *cbarg); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index ba7094baddb..e77b7c96f19 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -450,12 +450,15 @@ async_cb(uv_async_t *handle) { case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); break; + case netievent_closecb: + isc__nm_async_closecb(worker, ievent); + break; default: INSIST(0); ISC_UNREACHABLE(); } - isc_mem_put(worker->mgr->mctx, ievent, - sizeof(isc__netievent_storage_t)); + + isc__nm_put_ievent(worker->mgr, ievent); } } @@ -471,6 +474,11 @@ isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type) { return (event); } +void +isc__nm_put_ievent(isc_nm_t *mgr, void *ievent) { + isc_mem_put(mgr->mctx, ievent, sizeof(isc__netievent_storage_t)); +} + void isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { isc_queue_enqueue(worker->ievents, (uintptr_t)event); @@ -552,6 +560,11 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { isc_quota_detach(&sock->quota); } + if (sock->timer_initialized) { + uv_close((uv_handle_t *)&sock->timer, NULL); + sock->timer_initialized = false; + } + isc_astack_destroy(sock->inactivehandles); while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) { @@ -570,7 +583,6 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { } else { isc_nm_detach(&sock->mgr); } - } static void @@ -596,11 +608,11 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { * accept destruction. */ LOCK(&sock->lock); - active_handles += sock->ah; + active_handles += atomic_load(&sock->ah); if (sock->children != NULL) { for (int i = 0; i < sock->nchildren; i++) { LOCK(&sock->children[i].lock); - active_handles += sock->children[i].ah; + active_handles += atomic_load(&sock->children[i].ah); UNLOCK(&sock->children[i].lock); } } @@ -780,6 +792,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, isc_sockaddr_t *local) { isc_nmhandle_t *handle = NULL; + size_t handlenum; int pos; REQUIRE(VALID_NMSOCK(sock)); @@ -812,7 +825,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, LOCK(&sock->lock); /* We need to add this handle to the list of active handles */ - if (sock->ah == sock->ah_size) { + if ((size_t) atomic_load(&sock->ah) == sock->ah_size) { sock->ah_frees = isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees, sock->ah_size * 2 * @@ -831,7 +844,9 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, sock->ah_size *= 2; } - pos = sock->ah_frees[sock->ah++]; + handlenum = atomic_fetch_add(&sock->ah, 1); + pos = sock->ah_frees[handlenum]; + INSIST(sock->ah_handles[pos] == NULL); sock->ah_handles[pos] = handle; handle->ah_pos = pos; @@ -875,62 +890,85 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { *handle = (isc_nmhandle_t) { .magic = 0 }; + isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra); } void isc_nmhandle_unref(isc_nmhandle_t *handle) { + isc_nmsocket_t *sock = NULL; + size_t handlenum; + bool reuse = false; int refs; REQUIRE(VALID_NMHANDLE(handle)); refs = isc_refcount_decrement(&handle->references); INSIST(refs > 0); - if (refs == 1) { - isc_nmsocket_t *sock = handle->sock; - bool reuse = false; + if (refs > 1) { + return; + } - handle->sock = NULL; - if (handle->doreset != NULL) { - handle->doreset(handle->opaque); - } + sock = handle->sock; + handle->sock = NULL; - /* - * We do it all under lock to avoid races with socket - * destruction. - */ - LOCK(&sock->lock); - INSIST(sock->ah_handles[handle->ah_pos] == handle); - INSIST(sock->ah_size > handle->ah_pos); - INSIST(sock->ah > 0); - sock->ah_handles[handle->ah_pos] = NULL; - sock->ah_frees[--sock->ah] = handle->ah_pos; - handle->ah_pos = 0; - - if (atomic_load(&sock->active)) { - reuse = isc_astack_trypush(sock->inactivehandles, - handle); - } - UNLOCK(&sock->lock); + if (handle->doreset != NULL) { + handle->doreset(handle->opaque); + } - /* - * Handle is closed. If the socket has a callback - * configured for that (e.g., to perform cleanup after - * request processing), call it now. - */ - if (sock->closehandle_cb != NULL) { - sock->closehandle_cb(sock); - } + /* + * We do all of this under lock to avoid races with socket + * destruction. + */ + LOCK(&sock->lock); - if (!reuse) { - nmhandle_free(sock, handle); - } + INSIST(sock->ah_handles[handle->ah_pos] == handle); + INSIST(sock->ah_size > handle->ah_pos); + INSIST(atomic_load(&sock->ah) > 0); - if (sock->ah == 0 && - !atomic_load(&sock->active) && - !atomic_load(&sock->destroying)) - { - nmsocket_maybe_destroy(sock); + sock->ah_handles[handle->ah_pos] = NULL; + handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; + sock->ah_frees[handlenum] = handle->ah_pos; + handle->ah_pos = 0; + + if (atomic_load(&sock->active)) { + reuse = isc_astack_trypush(sock->inactivehandles, + handle); + } + + UNLOCK(&sock->lock); + + if (!reuse) { + nmhandle_free(sock, handle); + } + + /* + * The handle is closed. If the socket has a callback configured + * for that (e.g., to perform cleanup after request processing), + * call it now. + */ + if (sock->closehandle_cb != NULL) { + if (sock->tid == isc_nm_tid()) { + sock->closehandle_cb(sock); + + /* + * If we do this asynchronously then + * the async event will clean it up. + */ + if (sock->ah == 0 && + !atomic_load(&sock->active) && + !atomic_load(&sock->destroying)) + { + nmsocket_maybe_destroy(sock); + } + } else { + + isc__netievent_closecb_t * event = + isc__nm_get_ievent(sock->mgr, + netievent_closecb); + isc_nmsocket_attach(sock, &event->sock); + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) event); } } } @@ -1055,6 +1093,21 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, } } +void +isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_closecb_t *ievent = + (isc__netievent_closecb_t *) ievent0; + + REQUIRE(VALID_NMSOCK(ievent->sock)); + REQUIRE(ievent->sock->tid == isc_nm_tid()); + REQUIRE(ievent->sock->closehandle_cb != NULL); + + UNUSED(worker); + + ievent->sock->closehandle_cb(ievent->sock); + isc_nmsocket_detach(&ievent->sock); +} + bool isc__nm_acquire_interlocked(isc_nm_t *mgr) { LOCK(&mgr->lock); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 4b6c9ca9a47..493d82a5aa5 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -242,25 +242,52 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker, uv_close(&sock->uv_handle.handle, stoplistening_cb); } +static void +readtimeout_cb(uv_timer_t *handle) { + isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); + + /* + * Socket is actively processing something, so restart the timer + * and return. + */ + if (atomic_load(&sock->processing)) { + uv_timer_start(handle, readtimeout_cb, sock->read_timeout, 0); + return; + } + + /* + * Timeout; stop reading and process whatever we have. + */ + uv_read_stop(&sock->uv_handle.stream); + if (sock->quota) { + isc_quota_detach(&sock->quota); + } + sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); +} + isc_result_t isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { isc_nmsocket_t *sock = NULL; + isc__netievent_startread_t *ievent = NULL; REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); sock = handle->sock; sock->rcb.recv = cb; - sock->rcbarg = cbarg; /* That's obviously broken... */ + sock->rcbarg = cbarg; + + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread); + ievent->sock = sock; + if (sock->tid == isc_nm_tid()) { - int r = uv_read_start(&sock->uv_handle.stream, - isc__nm_alloc_cb, read_cb); - INSIST(r == 0); + isc__nm_async_startread(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + isc__nm_put_ievent(sock->mgr, ievent); } else { - isc__netievent_startread_t *ievent = - isc__nm_get_ievent(sock->mgr, - netievent_tcpstartread); - ievent->sock = sock; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) ievent); } @@ -275,12 +302,23 @@ isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0) { isc_nmsocket_t *sock = ievent->sock; REQUIRE(worker->id == isc_nm_tid()); + if (sock->read_timeout != 0) { + if (!sock->timer_initialized) { + uv_timer_init(&worker->loop, &sock->timer); + sock->timer.data = sock; + sock->timer_initialized = true; + } + uv_timer_start(&sock->timer, readtimeout_cb, + sock->read_timeout, 0); + } uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, read_cb); } isc_result_t isc_nm_pauseread(isc_nmsocket_t *sock) { + isc__netievent_pauseread_t *ievent = NULL; + REQUIRE(VALID_NMSOCK(sock)); if (atomic_load(&sock->readpaused)) { @@ -288,15 +326,14 @@ isc_nm_pauseread(isc_nmsocket_t *sock) { } atomic_store(&sock->readpaused, true); + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcppauseread); + ievent->sock = sock; if (sock->tid == isc_nm_tid()) { - int r = uv_read_stop(&sock->uv_handle.stream); - INSIST(r == 0); + isc__nm_async_pauseread(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + isc__nm_put_ievent(sock->mgr, ievent); } else { - isc__netievent_pauseread_t *ievent = - isc__nm_get_ievent(sock->mgr, - netievent_tcppauseread); - ievent->sock = sock; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) ievent); } @@ -309,15 +346,20 @@ isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0) { isc__netievent_pauseread_t *ievent = (isc__netievent_pauseread_t *) ievent0; isc_nmsocket_t *sock = ievent->sock; - REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_NMSOCK(sock)); REQUIRE(worker->id == isc_nm_tid()); + if (sock->timer_initialized) { + uv_timer_stop(&sock->timer); + } uv_read_stop(&sock->uv_handle.stream); } isc_result_t isc_nm_resumeread(isc_nmsocket_t *sock) { + isc__netievent_startread_t *ievent = NULL; + REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->rcb.recv != NULL); @@ -327,16 +369,14 @@ isc_nm_resumeread(isc_nmsocket_t *sock) { atomic_store(&sock->readpaused, false); + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread); + ievent->sock = sock; + if (sock->tid == isc_nm_tid()) { - int r = uv_read_start(&sock->uv_handle.stream, - isc__nm_alloc_cb, read_cb); - INSIST(r == 0); + isc__nm_async_startread(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + isc__nm_put_ievent(sock->mgr, ievent); } else { - /* It's the same as startread */ - isc__netievent_startread_t *ievent = - isc__nm_get_ievent(sock->mgr, - netievent_tcpstartread); - ievent->sock = sock; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) ievent); } @@ -359,6 +399,11 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { INSIST(sock->rcb.recv != NULL); sock->rcb.recv(sock->tcphandle, ®ion, sock->rcbarg); + if (sock->timer_initialized && sock->read_timeout != 0) { + /* The timer will be updated */ + uv_timer_start(&sock->timer, readtimeout_cb, + sock->read_timeout, 0); + } isc__nm_free_uvbuf(sock, buf); return; } @@ -440,6 +485,7 @@ accept_connection(isc_nmsocket_t *ssock) { handle = isc__nmhandle_get(csock, NULL, &local); INSIST(ssock->rcb.accept != NULL); + csock->read_timeout = 1000; ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg); isc_nmsocket_detach(&csock); @@ -568,6 +614,16 @@ tcp_close_cb(uv_handle_t *uvhandle) { isc__nmsocket_prep_destroy(sock); } +static void +timer_close_cb(uv_handle_t *uvhandle) { + isc_nmsocket_t *sock = uvhandle->data; + + REQUIRE(VALID_NMSOCK(sock)); + + isc_nmsocket_detach(&sock->server); + uv_close(&sock->uv_handle.handle, tcp_close_cb); +} + static void tcp_close_direct(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); @@ -587,9 +643,13 @@ tcp_close_direct(isc_nmsocket_t *sock) { } } } - - isc_nmsocket_detach(&sock->server); - uv_close(&sock->uv_handle.handle, tcp_close_cb); + if (sock->timer_initialized) { + uv_close((uv_handle_t *)&sock->timer, timer_close_cb); + sock->timer_initialized = false; + } else { + isc_nmsocket_detach(&sock->server); + uv_close(&sock->uv_handle.handle, tcp_close_cb); + } } void diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 8e86a39474d..a06d5f7b0f6 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -47,8 +47,16 @@ dnslen(unsigned char* base) { return ((base[0] << 8) + (base[1])); } +/* + * Regular TCP buffer, should suffice in most cases. + */ #define NM_REG_BUF 4096 -#define NM_BIG_BUF 65536 +/* + * Two full DNS packets with lengths. + * netmgr receives 64k at most so there's no risk + * of overrun. + */ +#define NM_BIG_BUF (65535+2)*2 static inline void alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { REQUIRE(len <= NM_BIG_BUF); @@ -66,6 +74,23 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { } } +static void +timer_close_cb(uv_handle_t *handle) { + isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + INSIST(VALID_NMSOCK(sock)); + sock->timer_initialized = false; + atomic_store(&sock->closed, true); + isc_nmsocket_detach(&sock); +} + +static void +dnstcp_readtimeout(uv_timer_t *timer) { + isc_nmsocket_t *sock = (isc_nmsocket_t *) timer->data; + REQUIRE(VALID_NMSOCK(sock)); + isc_nmsocket_detach(&sock->outer); + uv_close((uv_handle_t*) &sock->timer, timer_close_cb); +} + /* * Accept callback for TCP-DNS connection */ @@ -94,77 +119,71 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nmsocket_attach(handle->sock, &dnssock->outer); dnssock->peer = handle->sock->peer; dnssock->iface = handle->sock->iface; + dnssock->read_timeout = 5000; + dnssock->tid = isc_nm_tid(); dnssock->closehandle_cb = resume_processing; + uv_timer_init(&dnssock->mgr->workers[isc_nm_tid()].loop, + &dnssock->timer); + dnssock->timer.data = dnssock; + dnssock->timer_initialized = true; + uv_timer_start(&dnssock->timer, dnstcp_readtimeout, + dnssock->read_timeout, 0); + isc_nm_read(handle, dnslisten_readcb, dnssock); } -static bool -connection_limit(isc_nmsocket_t *sock) { - int ah; - - REQUIRE(sock->type == isc_nm_tcpdnssocket && sock->outer != NULL); - - if (atomic_load(&sock->sequential)) { - /* - * We're already non-pipelining, so there's - * no need to check per-connection limits. - */ - return (false); - } +/* + * Process a single packet from the incoming buffer. + * + * Return ISC_R_SUCCESS and attach 'handlep' to a handle if something + * was processed; return ISC_R_NOMORE if there isn't a full message + * to be processed. + * + * The caller will need to unreference the handle. + */ +static isc_result_t +processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { + size_t len; - LOCK(&sock->lock); - ah = sock->ah; - UNLOCK(&sock->lock); + REQUIRE(VALID_NMSOCK(dnssock)); + REQUIRE(handlep != NULL && *handlep == NULL); - if (ah >= TCPDNS_CLIENTS_PER_CONN) { - atomic_store(&sock->overlimit, true); - isc_nm_pauseread(sock->outer); - return (true); + /* + * If we don't even have the length yet, we can't do + * anything. + */ + if (dnssock->buf_len < 2) { + return (ISC_R_NOMORE); } - return (false); -} - -/* Process all complete packets out of incoming buffer */ -static void -processbuffer(isc_nmsocket_t *dnssock) { - REQUIRE(VALID_NMSOCK(dnssock)); - - /* While we have a complete packet in the buffer */ - while (dnssock->buf_len > 2 && - dnslen(dnssock->buf) <= dnssock->buf_len - 2 && - !connection_limit(dnssock)) - { + /* + * Process the first packet from the buffer, leaving + * the rest (if any) for later. + */ + len = dnslen(dnssock->buf); + if (len <= dnssock->buf_len - 2) { isc_nmhandle_t *dnshandle = NULL; isc_region_t r2 = { .base = dnssock->buf + 2, - .length = dnslen(dnssock->buf) + .length = len }; - size_t len; dnshandle = isc__nmhandle_get(dnssock, NULL, NULL); - atomic_store(&dnssock->processing, true); dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); - /* - * If the recv callback wants to hold on to the - * handle, it needs to attach to it. - */ - isc_nmhandle_unref(dnshandle); - - len = dnslen(dnssock->buf) + 2; + len += 2; dnssock->buf_len -= len; if (len > 0) { memmove(dnssock->buf, dnssock->buf + len, dnssock->buf_len); } - /* Check here to make sure we do the processing at least once */ - if (atomic_load(&dnssock->processing)) { - return; - } + *handlep = dnshandle; + return (ISC_R_SUCCESS); } + + return (ISC_R_NOMORE); } /* @@ -174,8 +193,8 @@ processbuffer(isc_nmsocket_t *dnssock) { static void dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { isc_nmsocket_t *dnssock = (isc_nmsocket_t *) arg; - isc_sockaddr_t local; unsigned char *base = NULL; + bool done = false; size_t len; REQUIRE(VALID_NMSOCK(dnssock)); @@ -183,133 +202,63 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { if (region == NULL) { /* Connection closed */ - atomic_store(&dnssock->closed, true); - isc_nmsocket_detach(&dnssock->outer); - isc_nmsocket_detach(&dnssock); + isc__nm_tcpdns_close(dnssock); return; } - local = isc_nmhandle_localaddr(handle); - base = region->base; len = region->length; - /* - * We have something in the buffer, we need to glue it. - */ - if (dnssock->buf_len > 0) { - if (dnssock->buf_len == 1) { - /* Make sure we have the length */ - dnssock->buf[1] = base[0]; - dnssock->buf_len = 2; - base++; - len--; - } - - processbuffer(dnssock); + if (dnssock->buf_len + len > dnssock->buf_size) { + alloc_dnsbuf(dnssock, dnssock->buf_len + len); } + memmove(dnssock->buf + dnssock->buf_len, base, len); + dnssock->buf_len += len; - if (dnssock->buf_len > 0) { - size_t plen; - - if (dnssock->buf_len == 1) { - /* Make sure we have the length */ - dnssock->buf[1] = base[0]; - dnssock->buf_len = 2; - base++; - len--; - } - - /* At this point we definitely have 2 bytes there. */ - plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 - - dnssock->buf_len)); + do { + isc_result_t result; + isc_nmhandle_t *dnshandle = NULL; - if (dnssock->buf_len + plen > NM_BIG_BUF) { + result = processbuffer(dnssock, &dnshandle); + if (result != ISC_R_SUCCESS) { /* - * XXX: continuing to read will overrun the - * socket buffer. We may need to force the - * connection to close so the client will have - * to open a new one. + * There wasn't anything in the buffer to process. */ return; } - if (dnssock->buf_len + plen > dnssock->buf_size) { - alloc_dnsbuf(dnssock, dnssock->buf_len + plen); - } - - memmove(dnssock->buf + dnssock->buf_len, base, plen); - dnssock->buf_len += plen; - base += plen; - len -= plen; - - /* Do we have a complete packet in the buffer? */ - if (dnslen(dnssock->buf) >= dnssock->buf_len - 2 && - !connection_limit(dnssock)) - { - isc_nmhandle_t *dnshandle = NULL; - isc_region_t r2 = { - .base = dnssock->buf + 2, - .length = dnslen(dnssock->buf) - }; - - dnshandle = isc__nmhandle_get(dnssock, NULL, &local); - atomic_store(&dnssock->processing, true); - dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); - dnssock->buf_len = 0; + /* + * We have a packet: stop timeout timers + */ + atomic_store(&dnssock->outer->processing, true); + uv_timer_stop(&dnssock->timer); + if (dnssock->sequential) { /* - * If the recv callback wants to hold on to the - * handle, it needs to attach to it. + * We're in sequential mode and we processed + * one packet, so we're done until the next read + * completes. */ - isc_nmhandle_unref(dnshandle); + isc_nm_pauseread(dnssock->outer); + done = true; + } else { + /* + * We're pipelining, so we now resume processing + * packets until the clients-per-connection limit + * is reached (as determined by the number of + * active handles on the socket). When the limit + * is reached, pause reading. + */ + if (atomic_load(&dnssock->ah) >= + TCPDNS_CLIENTS_PER_CONN) + { + isc_nm_pauseread(dnssock->outer); + done = true; + } } - } - - /* - * At this point we've processed whatever was previously in the - * socket buffer. If there are more messages to be found in what - * we've read, and if we're either pipelining or not processing - * anything else currently, then we can process those messages now. - */ - while (len >= 2 && dnslen(base) <= len - 2 && - (!atomic_load(&dnssock->sequential) || - !atomic_load(&dnssock->processing)) && - !connection_limit(dnssock)) - { - isc_nmhandle_t *dnshandle = NULL; - isc_region_t r2 = { - .base = base + 2, - .length = dnslen(base) - }; - len -= dnslen(base) + 2; - base += dnslen(base) + 2; - - dnshandle = isc__nmhandle_get(dnssock, NULL, &local); - atomic_store(&dnssock->processing, true); - dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); - - /* - * If the recv callback wants to hold on to the - * handle, it needs to attach to it. - */ isc_nmhandle_unref(dnshandle); - } - - /* - * We have less than a full message remaining; it can be - * stored in the socket buffer for next time. - */ - if (len > 0) { - if (len > dnssock->buf_size) { - alloc_dnsbuf(dnssock, len); - } - - INSIST(len <= dnssock->buf_size); - memmove(dnssock->buf, base, len); - dnssock->buf_len = len; - } + } while (!done); } /* @@ -394,23 +343,64 @@ typedef struct tcpsend { static void resume_processing(void *arg) { isc_nmsocket_t *sock = (isc_nmsocket_t *) arg; + isc_result_t result; REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) { return; } + if (atomic_load(&sock->ah) == 0) { + /* Nothing is active; sockets can timeout now */ + atomic_store(&sock->outer->processing, false); + uv_timer_start(&sock->timer, dnstcp_readtimeout, + sock->read_timeout, 0); + } + /* - * If we're in sequential mode or over the - * clients-per-connection limit, the sock can - * resume reading now. + * For sequential sockets: Process what's in the buffer, or + * if there aren't any messages buffered, resume reading. */ - if (atomic_load(&sock->overlimit) || atomic_load(&sock->sequential)) { - atomic_store(&sock->overlimit, false); - atomic_store(&sock->processing, false); - isc_nm_resumeread(sock->outer); + if (sock->sequential) { + isc_nmhandle_t *handle = NULL; + + result = processbuffer(sock, &handle); + if (result == ISC_R_SUCCESS) { + atomic_store(&sock->outer->processing, true); + uv_timer_stop(&sock->timer); + isc_nmhandle_unref(handle); + } else if (sock->outer != NULL) { + isc_nm_resumeread(sock->outer); + } + + return; } + + /* + * For pipelined sockets: If we're under the clients-per-connection + * limit, resume processing until we reach the limit again. + */ + do { + isc_nmhandle_t *dnshandle = NULL; + + result = processbuffer(sock, &dnshandle); + if (result != ISC_R_SUCCESS) { + /* + * Nothing in the buffer; resume reading. + */ + if (sock->outer != NULL) { + isc_nm_resumeread(sock->outer); + } + + break; + } + + uv_timer_stop(&sock->timer); + atomic_store(&sock->outer->processing, true); + isc_nmhandle_unref(dnshandle); + } while (atomic_load(&sock->ah) < TCPDNS_CLIENTS_PER_CONN); } static void @@ -422,19 +412,6 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { ts->cb(ts->orighandle, result, ts->cbarg); isc_mem_put(ts->mctx, ts->region.base, ts->region.length); - /* - * The response was sent; if we're in sequential or overlimit - * mode, resume processing now. - */ - if (atomic_load(&ts->orighandle->sock->sequential) || - atomic_load(&ts->orighandle->sock->overlimit)) - { - atomic_store(&ts->orighandle->sock->processing, false); - atomic_store(&ts->orighandle->sock->overlimit, false); - processbuffer(ts->orighandle->sock); - isc_nm_resumeread(handle->sock); - } - isc_nmhandle_unref(ts->orighandle); isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts)); } @@ -483,12 +460,11 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, return (isc__nm_tcp_send(t->handle, &t->region, tcpdnssend_cb, t)); } + void isc__nm_tcpdns_close(isc_nmsocket_t *sock) { if (sock->outer != NULL) { isc_nmsocket_detach(&sock->outer); } - - atomic_store(&sock->closed, true); - isc__nmsocket_prep_destroy(sock); + uv_close((uv_handle_t*) &sock->timer, timer_close_cb); }