netievent_tcplisten,
netievent_tcpstoplisten,
netievent_tcpclose,
+ netievent_closecb,
} isc__netievent_type;
typedef struct isc__netievent_stop {
typedef isc__netievent__socket_t isc__netievent_startread_t;
typedef isc__netievent__socket_t isc__netievent_pauseread_t;
typedef isc__netievent__socket_t isc__netievent_resumeread_t;
+typedef isc__netievent__socket_t isc__netievent_closecb_t;
typedef struct isc__netievent__socket_req {
isc__netievent_type type;
isc_nmsocket_t *parent;
isc_quota_t *quota;
bool overquota;
+ uv_timer_t timer;
+ bool timer_initialized;
+ uint64_t read_timeout;
/*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */
isc_nmsocket_t *outer;
* might want to change it to something lockless in the
* future.
*/
- size_t ah;
+ atomic_int_fast32_t ah;
size_t ah_size;
size_t *ah_frees;
isc_nmhandle_t **ah_handles;
/*%<
* Allocate an ievent and set the type.
*/
+void
+isc__nm_put_ievent(isc_nm_t *mgr, void *ievent);
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event);
* if there are no remaining references or active handles.
*/
+void
+isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0);
+/*%<
+ * Issue a 'handle closed' callback on the socket.
+ */
+
isc_result_t
isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region,
isc_nm_cb_t cb, void *cbarg);
case netievent_tcpclose:
isc__nm_async_tcpclose(worker, ievent);
break;
+ case netievent_closecb:
+ isc__nm_async_closecb(worker, ievent);
+ break;
default:
INSIST(0);
ISC_UNREACHABLE();
}
- isc_mem_put(worker->mgr->mctx, ievent,
- sizeof(isc__netievent_storage_t));
+
+ isc__nm_put_ievent(worker->mgr, ievent);
}
}
return (event);
}
+void
+isc__nm_put_ievent(isc_nm_t *mgr, void *ievent) {
+ isc_mem_put(mgr->mctx, ievent, sizeof(isc__netievent_storage_t));
+}
+
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
isc_queue_enqueue(worker->ievents, (uintptr_t)event);
isc_quota_detach(&sock->quota);
}
+ if (sock->timer_initialized) {
+ uv_close((uv_handle_t *)&sock->timer, NULL);
+ sock->timer_initialized = false;
+ }
+
isc_astack_destroy(sock->inactivehandles);
while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) {
} else {
isc_nm_detach(&sock->mgr);
}
-
}
static void
* accept destruction.
*/
LOCK(&sock->lock);
- active_handles += sock->ah;
+ active_handles += atomic_load(&sock->ah);
if (sock->children != NULL) {
for (int i = 0; i < sock->nchildren; i++) {
LOCK(&sock->children[i].lock);
- active_handles += sock->children[i].ah;
+ active_handles += atomic_load(&sock->children[i].ah);
UNLOCK(&sock->children[i].lock);
}
}
isc_sockaddr_t *local)
{
isc_nmhandle_t *handle = NULL;
+ size_t handlenum;
int pos;
REQUIRE(VALID_NMSOCK(sock));
LOCK(&sock->lock);
/* We need to add this handle to the list of active handles */
- if (sock->ah == sock->ah_size) {
+ if ((size_t) atomic_load(&sock->ah) == sock->ah_size) {
sock->ah_frees =
isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees,
sock->ah_size * 2 *
sock->ah_size *= 2;
}
- pos = sock->ah_frees[sock->ah++];
+ handlenum = atomic_fetch_add(&sock->ah, 1);
+ pos = sock->ah_frees[handlenum];
+
INSIST(sock->ah_handles[pos] == NULL);
sock->ah_handles[pos] = handle;
handle->ah_pos = pos;
*handle = (isc_nmhandle_t) {
.magic = 0
};
+
isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra);
}
void
isc_nmhandle_unref(isc_nmhandle_t *handle) {
+ isc_nmsocket_t *sock = NULL;
+ size_t handlenum;
+ bool reuse = false;
int refs;
REQUIRE(VALID_NMHANDLE(handle));
refs = isc_refcount_decrement(&handle->references);
INSIST(refs > 0);
- if (refs == 1) {
- isc_nmsocket_t *sock = handle->sock;
- bool reuse = false;
+ if (refs > 1) {
+ return;
+ }
- handle->sock = NULL;
- if (handle->doreset != NULL) {
- handle->doreset(handle->opaque);
- }
+ sock = handle->sock;
+ handle->sock = NULL;
- /*
- * We do it all under lock to avoid races with socket
- * destruction.
- */
- LOCK(&sock->lock);
- INSIST(sock->ah_handles[handle->ah_pos] == handle);
- INSIST(sock->ah_size > handle->ah_pos);
- INSIST(sock->ah > 0);
- sock->ah_handles[handle->ah_pos] = NULL;
- sock->ah_frees[--sock->ah] = handle->ah_pos;
- handle->ah_pos = 0;
-
- if (atomic_load(&sock->active)) {
- reuse = isc_astack_trypush(sock->inactivehandles,
- handle);
- }
- UNLOCK(&sock->lock);
+ if (handle->doreset != NULL) {
+ handle->doreset(handle->opaque);
+ }
- /*
- * Handle is closed. If the socket has a callback
- * configured for that (e.g., to perform cleanup after
- * request processing), call it now.
- */
- if (sock->closehandle_cb != NULL) {
- sock->closehandle_cb(sock);
- }
+ /*
+ * We do all of this under lock to avoid races with socket
+ * destruction.
+ */
+ LOCK(&sock->lock);
- if (!reuse) {
- nmhandle_free(sock, handle);
- }
+ INSIST(sock->ah_handles[handle->ah_pos] == handle);
+ INSIST(sock->ah_size > handle->ah_pos);
+ INSIST(atomic_load(&sock->ah) > 0);
- if (sock->ah == 0 &&
- !atomic_load(&sock->active) &&
- !atomic_load(&sock->destroying))
- {
- nmsocket_maybe_destroy(sock);
+ sock->ah_handles[handle->ah_pos] = NULL;
+ handlenum = atomic_fetch_sub(&sock->ah, 1) - 1;
+ sock->ah_frees[handlenum] = handle->ah_pos;
+ handle->ah_pos = 0;
+
+ if (atomic_load(&sock->active)) {
+ reuse = isc_astack_trypush(sock->inactivehandles,
+ handle);
+ }
+
+ UNLOCK(&sock->lock);
+
+ if (!reuse) {
+ nmhandle_free(sock, handle);
+ }
+
+ /*
+ * The handle is closed. If the socket has a callback configured
+ * for that (e.g., to perform cleanup after request processing),
+ * call it now.
+ */
+ if (sock->closehandle_cb != NULL) {
+ if (sock->tid == isc_nm_tid()) {
+ sock->closehandle_cb(sock);
+
+ /*
+ * If we do this asynchronously then
+ * the async event will clean it up.
+ */
+ if (sock->ah == 0 &&
+ !atomic_load(&sock->active) &&
+ !atomic_load(&sock->destroying))
+ {
+ nmsocket_maybe_destroy(sock);
+ }
+ } else {
+
+ isc__netievent_closecb_t * event =
+ isc__nm_get_ievent(sock->mgr,
+ netievent_closecb);
+ isc_nmsocket_attach(sock, &event->sock);
+ isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *) event);
}
}
}
}
}
+void
+isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0) {
+ isc__netievent_closecb_t *ievent =
+ (isc__netievent_closecb_t *) ievent0;
+
+ REQUIRE(VALID_NMSOCK(ievent->sock));
+ REQUIRE(ievent->sock->tid == isc_nm_tid());
+ REQUIRE(ievent->sock->closehandle_cb != NULL);
+
+ UNUSED(worker);
+
+ ievent->sock->closehandle_cb(ievent->sock);
+ isc_nmsocket_detach(&ievent->sock);
+}
+
bool
isc__nm_acquire_interlocked(isc_nm_t *mgr) {
LOCK(&mgr->lock);
uv_close(&sock->uv_handle.handle, stoplistening_cb);
}
+static void
+readtimeout_cb(uv_timer_t *handle) {
+ isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data;
+
+ REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(sock->tid == isc_nm_tid());
+
+ /*
+ * Socket is actively processing something, so restart the timer
+ * and return.
+ */
+ if (atomic_load(&sock->processing)) {
+ uv_timer_start(handle, readtimeout_cb, sock->read_timeout, 0);
+ return;
+ }
+
+ /*
+ * Timeout; stop reading and process whatever we have.
+ */
+ uv_read_stop(&sock->uv_handle.stream);
+ if (sock->quota) {
+ isc_quota_detach(&sock->quota);
+ }
+ sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg);
+}
+
isc_result_t
isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock = NULL;
+ isc__netievent_startread_t *ievent = NULL;
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
sock = handle->sock;
sock->rcb.recv = cb;
- sock->rcbarg = cbarg; /* That's obviously broken... */
+ sock->rcbarg = cbarg;
+
+ ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
+ ievent->sock = sock;
+
if (sock->tid == isc_nm_tid()) {
- int r = uv_read_start(&sock->uv_handle.stream,
- isc__nm_alloc_cb, read_cb);
- INSIST(r == 0);
+ isc__nm_async_startread(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *) ievent);
+ isc__nm_put_ievent(sock->mgr, ievent);
} else {
- isc__netievent_startread_t *ievent =
- isc__nm_get_ievent(sock->mgr,
- netievent_tcpstartread);
- ievent->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
}
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(worker->id == isc_nm_tid());
+ if (sock->read_timeout != 0) {
+ if (!sock->timer_initialized) {
+ uv_timer_init(&worker->loop, &sock->timer);
+ sock->timer.data = sock;
+ sock->timer_initialized = true;
+ }
+ uv_timer_start(&sock->timer, readtimeout_cb,
+ sock->read_timeout, 0);
+ }
uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, read_cb);
}
isc_result_t
isc_nm_pauseread(isc_nmsocket_t *sock) {
+ isc__netievent_pauseread_t *ievent = NULL;
+
REQUIRE(VALID_NMSOCK(sock));
if (atomic_load(&sock->readpaused)) {
}
atomic_store(&sock->readpaused, true);
+ ievent = isc__nm_get_ievent(sock->mgr, netievent_tcppauseread);
+ ievent->sock = sock;
if (sock->tid == isc_nm_tid()) {
- int r = uv_read_stop(&sock->uv_handle.stream);
- INSIST(r == 0);
+ isc__nm_async_pauseread(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *) ievent);
+ isc__nm_put_ievent(sock->mgr, ievent);
} else {
- isc__netievent_pauseread_t *ievent =
- isc__nm_get_ievent(sock->mgr,
- netievent_tcppauseread);
- ievent->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
}
isc__netievent_pauseread_t *ievent =
(isc__netievent_pauseread_t *) ievent0;
isc_nmsocket_t *sock = ievent->sock;
- REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(VALID_NMSOCK(sock));
REQUIRE(worker->id == isc_nm_tid());
+ if (sock->timer_initialized) {
+ uv_timer_stop(&sock->timer);
+ }
uv_read_stop(&sock->uv_handle.stream);
}
isc_result_t
isc_nm_resumeread(isc_nmsocket_t *sock) {
+ isc__netievent_startread_t *ievent = NULL;
+
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->rcb.recv != NULL);
atomic_store(&sock->readpaused, false);
+ ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
+ ievent->sock = sock;
+
if (sock->tid == isc_nm_tid()) {
- int r = uv_read_start(&sock->uv_handle.stream,
- isc__nm_alloc_cb, read_cb);
- INSIST(r == 0);
+ isc__nm_async_startread(&sock->mgr->workers[sock->tid],
+ (isc__netievent_t *) ievent);
+ isc__nm_put_ievent(sock->mgr, ievent);
} else {
- /* It's the same as startread */
- isc__netievent_startread_t *ievent =
- isc__nm_get_ievent(sock->mgr,
- netievent_tcpstartread);
- ievent->sock = sock;
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *) ievent);
}
INSIST(sock->rcb.recv != NULL);
sock->rcb.recv(sock->tcphandle, ®ion, sock->rcbarg);
+ if (sock->timer_initialized && sock->read_timeout != 0) {
+ /* The timer will be updated */
+ uv_timer_start(&sock->timer, readtimeout_cb,
+ sock->read_timeout, 0);
+ }
isc__nm_free_uvbuf(sock, buf);
return;
}
handle = isc__nmhandle_get(csock, NULL, &local);
INSIST(ssock->rcb.accept != NULL);
+ csock->read_timeout = 1000;
ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg);
isc_nmsocket_detach(&csock);
isc__nmsocket_prep_destroy(sock);
}
+static void
+timer_close_cb(uv_handle_t *uvhandle) {
+ isc_nmsocket_t *sock = uvhandle->data;
+
+ REQUIRE(VALID_NMSOCK(sock));
+
+ isc_nmsocket_detach(&sock->server);
+ uv_close(&sock->uv_handle.handle, tcp_close_cb);
+}
+
static void
tcp_close_direct(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
}
}
}
-
- isc_nmsocket_detach(&sock->server);
- uv_close(&sock->uv_handle.handle, tcp_close_cb);
+ if (sock->timer_initialized) {
+ uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
+ sock->timer_initialized = false;
+ } else {
+ isc_nmsocket_detach(&sock->server);
+ uv_close(&sock->uv_handle.handle, tcp_close_cb);
+ }
}
void
return ((base[0] << 8) + (base[1]));
}
+/*
+ * Regular TCP buffer, should suffice in most cases.
+ */
#define NM_REG_BUF 4096
-#define NM_BIG_BUF 65536
+/*
+ * Two full DNS packets with lengths.
+ * netmgr receives 64k at most so there's no risk
+ * of overrun.
+ */
+#define NM_BIG_BUF (65535+2)*2
static inline void
alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
REQUIRE(len <= NM_BIG_BUF);
}
}
+static void
+timer_close_cb(uv_handle_t *handle) {
+ isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data;
+ INSIST(VALID_NMSOCK(sock));
+ sock->timer_initialized = false;
+ atomic_store(&sock->closed, true);
+ isc_nmsocket_detach(&sock);
+}
+
+static void
+dnstcp_readtimeout(uv_timer_t *timer) {
+ isc_nmsocket_t *sock = (isc_nmsocket_t *) timer->data;
+ REQUIRE(VALID_NMSOCK(sock));
+ isc_nmsocket_detach(&sock->outer);
+ uv_close((uv_handle_t*) &sock->timer, timer_close_cb);
+}
+
/*
* Accept callback for TCP-DNS connection
*/
isc_nmsocket_attach(handle->sock, &dnssock->outer);
dnssock->peer = handle->sock->peer;
dnssock->iface = handle->sock->iface;
+ dnssock->read_timeout = 5000;
+ dnssock->tid = isc_nm_tid();
dnssock->closehandle_cb = resume_processing;
+ uv_timer_init(&dnssock->mgr->workers[isc_nm_tid()].loop,
+ &dnssock->timer);
+ dnssock->timer.data = dnssock;
+ dnssock->timer_initialized = true;
+ uv_timer_start(&dnssock->timer, dnstcp_readtimeout,
+ dnssock->read_timeout, 0);
+
isc_nm_read(handle, dnslisten_readcb, dnssock);
}
-static bool
-connection_limit(isc_nmsocket_t *sock) {
- int ah;
-
- REQUIRE(sock->type == isc_nm_tcpdnssocket && sock->outer != NULL);
-
- if (atomic_load(&sock->sequential)) {
- /*
- * We're already non-pipelining, so there's
- * no need to check per-connection limits.
- */
- return (false);
- }
+/*
+ * Process a single packet from the incoming buffer.
+ *
+ * Return ISC_R_SUCCESS and attach 'handlep' to a handle if something
+ * was processed; return ISC_R_NOMORE if there isn't a full message
+ * to be processed.
+ *
+ * The caller will need to unreference the handle.
+ */
+static isc_result_t
+processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
+ size_t len;
- LOCK(&sock->lock);
- ah = sock->ah;
- UNLOCK(&sock->lock);
+ REQUIRE(VALID_NMSOCK(dnssock));
+ REQUIRE(handlep != NULL && *handlep == NULL);
- if (ah >= TCPDNS_CLIENTS_PER_CONN) {
- atomic_store(&sock->overlimit, true);
- isc_nm_pauseread(sock->outer);
- return (true);
+ /*
+ * If we don't even have the length yet, we can't do
+ * anything.
+ */
+ if (dnssock->buf_len < 2) {
+ return (ISC_R_NOMORE);
}
- return (false);
-}
-
-/* Process all complete packets out of incoming buffer */
-static void
-processbuffer(isc_nmsocket_t *dnssock) {
- REQUIRE(VALID_NMSOCK(dnssock));
-
- /* While we have a complete packet in the buffer */
- while (dnssock->buf_len > 2 &&
- dnslen(dnssock->buf) <= dnssock->buf_len - 2 &&
- !connection_limit(dnssock))
- {
+ /*
+ * Process the first packet from the buffer, leaving
+ * the rest (if any) for later.
+ */
+ len = dnslen(dnssock->buf);
+ if (len <= dnssock->buf_len - 2) {
isc_nmhandle_t *dnshandle = NULL;
isc_region_t r2 = {
.base = dnssock->buf + 2,
- .length = dnslen(dnssock->buf)
+ .length = len
};
- size_t len;
dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
- atomic_store(&dnssock->processing, true);
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
- /*
- * If the recv callback wants to hold on to the
- * handle, it needs to attach to it.
- */
- isc_nmhandle_unref(dnshandle);
-
- len = dnslen(dnssock->buf) + 2;
+ len += 2;
dnssock->buf_len -= len;
if (len > 0) {
memmove(dnssock->buf, dnssock->buf + len,
dnssock->buf_len);
}
- /* Check here to make sure we do the processing at least once */
- if (atomic_load(&dnssock->processing)) {
- return;
- }
+ *handlep = dnshandle;
+ return (ISC_R_SUCCESS);
}
+
+ return (ISC_R_NOMORE);
}
/*
static void
dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
isc_nmsocket_t *dnssock = (isc_nmsocket_t *) arg;
- isc_sockaddr_t local;
unsigned char *base = NULL;
+ bool done = false;
size_t len;
REQUIRE(VALID_NMSOCK(dnssock));
if (region == NULL) {
/* Connection closed */
- atomic_store(&dnssock->closed, true);
- isc_nmsocket_detach(&dnssock->outer);
- isc_nmsocket_detach(&dnssock);
+ isc__nm_tcpdns_close(dnssock);
return;
}
- local = isc_nmhandle_localaddr(handle);
-
base = region->base;
len = region->length;
- /*
- * We have something in the buffer, we need to glue it.
- */
- if (dnssock->buf_len > 0) {
- if (dnssock->buf_len == 1) {
- /* Make sure we have the length */
- dnssock->buf[1] = base[0];
- dnssock->buf_len = 2;
- base++;
- len--;
- }
-
- processbuffer(dnssock);
+ if (dnssock->buf_len + len > dnssock->buf_size) {
+ alloc_dnsbuf(dnssock, dnssock->buf_len + len);
}
+ memmove(dnssock->buf + dnssock->buf_len, base, len);
+ dnssock->buf_len += len;
- if (dnssock->buf_len > 0) {
- size_t plen;
-
- if (dnssock->buf_len == 1) {
- /* Make sure we have the length */
- dnssock->buf[1] = base[0];
- dnssock->buf_len = 2;
- base++;
- len--;
- }
-
- /* At this point we definitely have 2 bytes there. */
- plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 -
- dnssock->buf_len));
+ do {
+ isc_result_t result;
+ isc_nmhandle_t *dnshandle = NULL;
- if (dnssock->buf_len + plen > NM_BIG_BUF) {
+ result = processbuffer(dnssock, &dnshandle);
+ if (result != ISC_R_SUCCESS) {
/*
- * XXX: continuing to read will overrun the
- * socket buffer. We may need to force the
- * connection to close so the client will have
- * to open a new one.
+ * There wasn't anything in the buffer to process.
*/
return;
}
- if (dnssock->buf_len + plen > dnssock->buf_size) {
- alloc_dnsbuf(dnssock, dnssock->buf_len + plen);
- }
-
- memmove(dnssock->buf + dnssock->buf_len, base, plen);
- dnssock->buf_len += plen;
- base += plen;
- len -= plen;
-
- /* Do we have a complete packet in the buffer? */
- if (dnslen(dnssock->buf) >= dnssock->buf_len - 2 &&
- !connection_limit(dnssock))
- {
- isc_nmhandle_t *dnshandle = NULL;
- isc_region_t r2 = {
- .base = dnssock->buf + 2,
- .length = dnslen(dnssock->buf)
- };
-
- dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
- atomic_store(&dnssock->processing, true);
- dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
- dnssock->buf_len = 0;
+ /*
+ * We have a packet: stop timeout timers
+ */
+ atomic_store(&dnssock->outer->processing, true);
+ uv_timer_stop(&dnssock->timer);
+ if (dnssock->sequential) {
/*
- * If the recv callback wants to hold on to the
- * handle, it needs to attach to it.
+ * We're in sequential mode and we processed
+ * one packet, so we're done until the next read
+ * completes.
*/
- isc_nmhandle_unref(dnshandle);
+ isc_nm_pauseread(dnssock->outer);
+ done = true;
+ } else {
+ /*
+ * We're pipelining, so we now resume processing
+ * packets until the clients-per-connection limit
+ * is reached (as determined by the number of
+ * active handles on the socket). When the limit
+ * is reached, pause reading.
+ */
+ if (atomic_load(&dnssock->ah) >=
+ TCPDNS_CLIENTS_PER_CONN)
+ {
+ isc_nm_pauseread(dnssock->outer);
+ done = true;
+ }
}
- }
-
- /*
- * At this point we've processed whatever was previously in the
- * socket buffer. If there are more messages to be found in what
- * we've read, and if we're either pipelining or not processing
- * anything else currently, then we can process those messages now.
- */
- while (len >= 2 && dnslen(base) <= len - 2 &&
- (!atomic_load(&dnssock->sequential) ||
- !atomic_load(&dnssock->processing)) &&
- !connection_limit(dnssock))
- {
- isc_nmhandle_t *dnshandle = NULL;
- isc_region_t r2 = {
- .base = base + 2,
- .length = dnslen(base)
- };
- len -= dnslen(base) + 2;
- base += dnslen(base) + 2;
-
- dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
- atomic_store(&dnssock->processing, true);
- dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
-
- /*
- * If the recv callback wants to hold on to the
- * handle, it needs to attach to it.
- */
isc_nmhandle_unref(dnshandle);
- }
-
- /*
- * We have less than a full message remaining; it can be
- * stored in the socket buffer for next time.
- */
- if (len > 0) {
- if (len > dnssock->buf_size) {
- alloc_dnsbuf(dnssock, len);
- }
-
- INSIST(len <= dnssock->buf_size);
- memmove(dnssock->buf, base, len);
- dnssock->buf_len = len;
- }
+ } while (!done);
}
/*
static void
resume_processing(void *arg) {
isc_nmsocket_t *sock = (isc_nmsocket_t *) arg;
+ isc_result_t result;
REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(sock->tid == isc_nm_tid());
if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) {
return;
}
+ if (atomic_load(&sock->ah) == 0) {
+ /* Nothing is active; sockets can timeout now */
+ atomic_store(&sock->outer->processing, false);
+ uv_timer_start(&sock->timer, dnstcp_readtimeout,
+ sock->read_timeout, 0);
+ }
+
/*
- * If we're in sequential mode or over the
- * clients-per-connection limit, the sock can
- * resume reading now.
+ * For sequential sockets: Process what's in the buffer, or
+ * if there aren't any messages buffered, resume reading.
*/
- if (atomic_load(&sock->overlimit) || atomic_load(&sock->sequential)) {
- atomic_store(&sock->overlimit, false);
- atomic_store(&sock->processing, false);
- isc_nm_resumeread(sock->outer);
+ if (sock->sequential) {
+ isc_nmhandle_t *handle = NULL;
+
+ result = processbuffer(sock, &handle);
+ if (result == ISC_R_SUCCESS) {
+ atomic_store(&sock->outer->processing, true);
+ uv_timer_stop(&sock->timer);
+ isc_nmhandle_unref(handle);
+ } else if (sock->outer != NULL) {
+ isc_nm_resumeread(sock->outer);
+ }
+
+ return;
}
+
+ /*
+ * For pipelined sockets: If we're under the clients-per-connection
+ * limit, resume processing until we reach the limit again.
+ */
+ do {
+ isc_nmhandle_t *dnshandle = NULL;
+
+ result = processbuffer(sock, &dnshandle);
+ if (result != ISC_R_SUCCESS) {
+ /*
+ * Nothing in the buffer; resume reading.
+ */
+ if (sock->outer != NULL) {
+ isc_nm_resumeread(sock->outer);
+ }
+
+ break;
+ }
+
+ uv_timer_stop(&sock->timer);
+ atomic_store(&sock->outer->processing, true);
+ isc_nmhandle_unref(dnshandle);
+ } while (atomic_load(&sock->ah) < TCPDNS_CLIENTS_PER_CONN);
}
static void
ts->cb(ts->orighandle, result, ts->cbarg);
isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
- /*
- * The response was sent; if we're in sequential or overlimit
- * mode, resume processing now.
- */
- if (atomic_load(&ts->orighandle->sock->sequential) ||
- atomic_load(&ts->orighandle->sock->overlimit))
- {
- atomic_store(&ts->orighandle->sock->processing, false);
- atomic_store(&ts->orighandle->sock->overlimit, false);
- processbuffer(ts->orighandle->sock);
- isc_nm_resumeread(handle->sock);
- }
-
isc_nmhandle_unref(ts->orighandle);
isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts));
}
return (isc__nm_tcp_send(t->handle, &t->region, tcpdnssend_cb, t));
}
+
void
isc__nm_tcpdns_close(isc_nmsocket_t *sock) {
if (sock->outer != NULL) {
isc_nmsocket_detach(&sock->outer);
}
-
- atomic_store(&sock->closed, true);
- isc__nmsocket_prep_destroy(sock);
+ uv_close((uv_handle_t*) &sock->timer, timer_close_cb);
}