isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
+ sock->reading = false;
if (sock->recv_cb != NULL) {
isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
goto failure;
}
+ sock->reading = true;
+
if (!sock->manual_read_timer) {
isc__nmsocket_timer_start(sock);
}
return;
failure:
- sock->reading = true;
isc__nm_tcp_failed_read_cb(sock, result, true);
}
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
+ sock->reading = false;
return;
}
isc__nm_readcb(sock, req, ISC_R_SUCCESS, false);
- /* The readcb could have paused the reading */
- if (sock->reading && !sock->manual_read_timer) {
+ if (!sock->client && sock->reading) {
+ /*
+ * Stop reading if we have accumulated enough bytes in the send
+ * queue; this means that the TCP client is not reading back the
+ * data we sending to it, and there's no reason to continue
+ * processing more incoming DNS messages, if the client is not
+ * reading back the responses.
+ */
+ size_t write_queue_size =
+ uv_stream_get_write_queue_size(&sock->uv_handle.stream);
+
+ if (write_queue_size >= ISC_NETMGR_TCP_SENDBUF_SIZE) {
+ isc__nmsocket_log(
+ sock, ISC_LOG_DEBUG(3),
+ "throttling TCP connection, the other side is "
+ "not reading the data (%zu)",
+ write_queue_size);
+ isc__nm_stop_reading(sock);
+ }
+ } else if (uv_is_active(&sock->uv_handle.handle) &&
+ !sock->manual_read_timer)
+ {
+ /* The readcb could have paused the reading */
/* The timer will be updated */
isc__nmsocket_timer_restart(sock);
}
tcp_send(handle, region, cb, cbarg, true);
}
+static void
+tcp_maybe_restart_reading(isc_nmsocket_t *sock) {
+ if (!sock->client && sock->reading &&
+ !uv_is_active(&sock->uv_handle.handle))
+ {
+ /*
+ * Restart reading if we have less data in the send queue than
+ * the send buffer size, this means that the TCP client has
+ * started reading some data again. Starting reading when we go
+ * under the limit instead of waiting for all data has been
+ * flushed allows faster recovery (in case there was a
+ * congestion and now there isn't).
+ */
+ size_t write_queue_size =
+ uv_stream_get_write_queue_size(&sock->uv_handle.stream);
+ if (write_queue_size < ISC_NETMGR_TCP_SENDBUF_SIZE) {
+ isc__nmsocket_log(
+ sock, ISC_LOG_DEBUG(3),
+ "resuming TCP connection, the other side "
+ "is reading the data again (%zu)",
+ write_queue_size);
+ isc__nm_start_reading(sock);
+ }
+ }
+}
+
static void
tcp_send_cb(uv_write_t *req, int status) {
isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
isc__nm_incstats(sock, STATID_SENDFAIL);
isc__nm_failed_send_cb(sock, uvreq, isc_uverr2result(status),
false);
+ if (!sock->client && sock->reading) {
+ isc__nm_start_reading(sock);
+ isc__nmsocket_reset(sock);
+ }
return;
}
isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false);
+ tcp_maybe_restart_reading(sock);
}
static isc_result_t
if (r == (int)(bufs[0].len)) {
/* Wrote everything */
isc__nm_sendcb(sock, req, ISC_R_SUCCESS, true);
+ tcp_maybe_restart_reading(sock);
return (ISC_R_SUCCESS);
} else if (r > 0) {
bufs[0].base += (size_t)r;
if (r == (int)(bufs[0].len + bufs[1].len)) {
/* Wrote everything */
isc__nm_sendcb(sock, req, ISC_R_SUCCESS, true);
+ tcp_maybe_restart_reading(sock);
return (ISC_R_SUCCESS);
} else if (r == 1) {
/* Partial write of DNSMSG length */
/* 2. close the socket + destroy the socket in callback */
isc__nmsocket_clearcb(sock);
isc__nm_stop_reading(sock);
+ sock->reading = false;
uv_close(&sock->uv_handle.handle, tcp_close_cb);
/* 1. close the timer */
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_tid());
- REQUIRE(!sock->reading);
+ REQUIRE(!uv_is_active(&sock->uv_handle.handle));
sock->manual_read_timer = manual;
}