#define INITIAL_DNS_MESSAGE_BUFFER_SIZE (512)
+/*
+ * The value should be small enough to not allow a server to open too
+ * many streams at once. It should not be too small either because
+ * the incoming data will be split into too many chunks with each of
+ * them processed asynchronously.
+ */
+#define INCOMING_DATA_CHUNK_SIZE (256)
+
+/*
+ * Often processing a chunk does not change the number of streams. In
+ * that case we can process more than once, but we still should have a
+ * hard limit on that.
+ */
+#define INCOMING_DATA_MAX_CHUNKS_AT_ONCE (4)
+
typedef struct isc_nm_http_response_status {
size_t code;
size_t content_length;
isc__nm_http_pending_callbacks_t pending_write_callbacks;
isc_buffer_t *pending_write_data;
+
+ /*
+ * The statistical values below are for usage on server-side
+ * only. They are meant to detect clients that are taking too many
+ * resources from the server.
+ */
+ uint64_t received; /* How many requests have been received. */
+ uint64_t submitted; /* How many responses were submitted to send */
+ uint64_t processed; /* How many responses were processed. */
};
typedef enum isc_http_error_responses {
void *cbarg;
isc_buffer_t *pending_write_data;
isc__nm_http_pending_callbacks_t pending_write_callbacks;
+ uint64_t submitted;
} isc_http_send_req_t;
#define HTTP_ENDPOINTS_MAGIC ISC_MAGIC('H', 'T', 'E', 'P')
http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle,
isc_nm_cb_t cb, void *cbarg);
+static ssize_t
+http_process_input_data(isc_nm_http_session_t *session,
+ isc_buffer_t *input_data);
+
+static inline bool
+http_too_many_active_streams(isc_nm_http_session_t *session);
+
static void
http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
isc_nm_cb_t send_cb, void *send_cbarg);
+static void
+http_do_bio_async(isc_nm_http_session_t *session);
+
static void
failed_httpstream_read_cb(isc_nmsocket_t *sock, isc_result_t result,
isc_nm_http_session_t *session);
isc_nmhandle_close(session->handle);
}
+ /*
+ * Free any unprocessed incoming data in order to not process
+ * it during indirect calls to http_do_bio() that might happen
+ * when calling the failed callbacks.
+ */
+ if (session->buf != NULL) {
+ isc_buffer_free(&session->buf);
+ }
+
if (session->client) {
client_call_failed_read_cb(ISC_R_UNEXPECTED, session);
} else {
ISC_LIST_UNLINK(session->sstreams, sock->h2, link);
session->nsstreams--;
+ if (sock->h2->request_received) {
+ session->submitted++;
+ }
/*
* By making a call to isc__nmsocket_prep_destroy(), we ensure that
return ISC_R_SUCCESS;
}
+static ssize_t
+http_process_input_data(isc_nm_http_session_t *session,
+ isc_buffer_t *input_data) {
+ ssize_t readlen = 0;
+ ssize_t processed = 0;
+ isc_region_t chunk = { 0 };
+ size_t before, after;
+ size_t i;
+
+ REQUIRE(VALID_HTTP2_SESSION(session));
+ REQUIRE(input_data != NULL);
+
+ if (!http_session_active(session)) {
+ return 0;
+ }
+
+ /*
+ * For clients that initiate request themselves just process
+ * everything.
+ */
+ if (session->client) {
+ isc_buffer_remainingregion(input_data, &chunk);
+ if (chunk.length == 0) {
+ return 0;
+ }
+
+ readlen = nghttp2_session_mem_recv(session->ngsession,
+ chunk.base, chunk.length);
+
+ if (readlen >= 0) {
+ isc_buffer_forward(input_data, readlen);
+ }
+
+ return readlen;
+ }
+
+ /*
+ * If no streams are created during processing, we might process
+ * more than one chunk at a time. Still we should not overdo that
+ * to avoid processing too much data at once as such behaviour is
+ * known for trashing the memory allocator at times.
+ */
+ for (before = after = session->nsstreams, i = 0;
+ after <= before && i < INCOMING_DATA_MAX_CHUNKS_AT_ONCE;
+ after = session->nsstreams, i++)
+ {
+ const uint64_t active_streams =
+ (session->received - session->processed);
+
+ /*
+ * If there are non completed send requests in flight -let's
+ * not process any incoming data, as it could lead to piling
+ * up too much send data in send buffers. With many clients
+ * connected it can lead to excessive memory consumption on
+ * the server instance.
+ */
+ if (session->sending > 0) {
+ break;
+ }
+
+ /*
+ * If we have reached the maximum number of streams used, we
+ * might stop processing for now, as nghttp2 will happily
+ * consume as much data as possible.
+ */
+ if (session->nsstreams >= session->max_concurrent_streams &&
+ active_streams > 0)
+ {
+ break;
+ }
+
+ if (http_too_many_active_streams(session)) {
+ break;
+ }
+
+ isc_buffer_remainingregion(input_data, &chunk);
+ if (chunk.length == 0) {
+ break;
+ }
+
+ chunk.length = ISC_MIN(chunk.length, INCOMING_DATA_CHUNK_SIZE);
+
+ readlen = nghttp2_session_mem_recv(session->ngsession,
+ chunk.base, chunk.length);
+
+ if (readlen >= 0) {
+ isc_buffer_forward(input_data, readlen);
+ processed += readlen;
+ } else {
+ isc_buffer_clear(input_data);
+ return readlen;
+ }
+ }
+
+ return processed;
+}
+
/*
* Read callback from TLS socket.
*/
isc_nm_http_session_t *session = (isc_nm_http_session_t *)data;
isc_nm_http_session_t *tmpsess = NULL;
ssize_t readlen;
+ isc_buffer_t input;
REQUIRE(VALID_HTTP2_SESSION(session));
goto done;
}
- readlen = nghttp2_session_mem_recv(session->ngsession, region->base,
- region->length);
+ isc_buffer_init(&input, region->base, region->length);
+ isc_buffer_add(&input, region->length);
+
+ readlen = http_process_input_data(session, &input);
if (readlen < 0) {
failed_read_cb(ISC_R_UNEXPECTED, session);
goto done;
isc_buffer_putmem(session->buf, region->base + readlen,
unread_size);
isc_nm_read_stop(session->handle);
+ http_do_bio_async(session);
+ } else {
+ /* We might have something to receive or send, do IO */
+ http_do_bio(session, NULL, NULL, NULL);
}
- /* We might have something to receive or send, do IO */
- http_do_bio(session, NULL, NULL, NULL);
-
done:
isc__nm_httpsession_detach(&tmpsess);
}
}
isc_buffer_free(&req->pending_write_data);
+ session->processed += req->submitted;
isc_mem_put(session->mctx, req, sizeof(*req));
session->sending--;
- http_do_bio(session, NULL, NULL, NULL);
- isc_nmhandle_detach(&transphandle);
- if (result != ISC_R_SUCCESS && session->sending == 0) {
+
+ if (result == ISC_R_SUCCESS) {
+ http_do_bio(session, NULL, NULL, NULL);
+ } else {
finish_http_session(session);
}
+ isc_nmhandle_detach(&transphandle);
+
isc__nm_httpsession_detach(&session);
}
*send = (isc_http_send_req_t){ .pending_write_data =
session->pending_write_data,
.cb = cb,
- .cbarg = cbarg };
+ .cbarg = cbarg,
+ .submitted = session->submitted };
+ session->submitted = 0;
session->pending_write_data = NULL;
move_pending_send_callbacks(session, send);
return false;
}
+static inline bool
+http_too_many_active_streams(isc_nm_http_session_t *session) {
+ const uint64_t active_streams = session->received - session->processed;
+ const uint64_t max_active_streams =
+ ISC_MIN(ISC_NETMGR_MAX_STREAM_CLIENTS_PER_CONN,
+ session->max_concurrent_streams);
+
+ if (session->client) {
+ return false;
+ }
+
+ /*
+ * Do not process incoming data if there are too many active DNS
+ * clients (streams) per connection.
+ */
+ if (active_streams >= max_active_streams) {
+ return true;
+ }
+
+ return false;
+}
+
static void
http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
isc_nm_cb_t send_cb, void *send_cbarg) {
finish_http_session(session);
}
return;
- } else if (nghttp2_session_want_read(session->ngsession) == 0 &&
- nghttp2_session_want_write(session->ngsession) == 0 &&
- session->pending_write_data == NULL)
- {
- session->closing = true;
+ }
+
+ if (send_cb != NULL) {
+ INSIST(VALID_NMHANDLE(send_httphandle));
+ (void)http_send_outgoing(session, send_httphandle, send_cb,
+ send_cbarg);
+ return;
+ }
+
+ INSIST(send_httphandle == NULL);
+ INSIST(send_cb == NULL);
+ INSIST(send_cbarg == NULL);
+
+ if (session->pending_write_data != NULL && session->sending == 0) {
+ (void)http_send_outgoing(session, NULL, NULL, NULL);
return;
}
if (nghttp2_session_want_read(session->ngsession) != 0) {
if (!session->reading) {
- /* We have not yet started
- * reading from this handle */
+ /* We have not yet started reading from this handle */
isc_nm_read(session->handle, http_readcb, session);
session->reading = true;
} else if (session->buf != NULL) {
size_t remaining =
isc_buffer_remaininglength(session->buf);
- /* Leftover data in the
- * buffer, use it */
- size_t readlen = nghttp2_session_mem_recv(
- session->ngsession,
- isc_buffer_current(session->buf), remaining);
+ /* Leftover data in the buffer, use it */
+ size_t remaining_after = 0;
+ ssize_t readlen = 0;
+ isc_nm_http_session_t *tmpsess = NULL;
+
+ /*
+ * Let's ensure that HTTP/2 session and its associated
+ * data will not go "out of scope" too early.
+ */
+ isc__nm_httpsession_attach(session, &tmpsess);
+
+ readlen = http_process_input_data(session,
+ session->buf);
- if (readlen == remaining) {
+ remaining_after =
+ isc_buffer_remaininglength(session->buf);
+
+ if (readlen < 0) {
+ failed_read_cb(ISC_R_UNEXPECTED, session);
+ } else if ((size_t)readlen == remaining) {
isc_buffer_free(&session->buf);
+ http_do_bio(session, NULL, NULL, NULL);
+ } else if (remaining_after > 0 &&
+ remaining_after < remaining)
+ {
+ /*
+ * We have processed a part of the data, now
+ * let's delay processing of whatever is left
+ * here. We want it to be an async operation so
+ * that we will:
+ *
+ * a) let other things run;
+ * b) have finer grained control over how much
+ * data is processed at once, because nghttp2
+ * would happily consume as much data we pass to
+ * it and that could overwhelm the server.
+ */
+ http_do_bio_async(session);
} else {
- isc_buffer_forward(session->buf, readlen);
+ (void)http_send_outgoing(session, NULL, NULL,
+ NULL);
}
- http_do_bio(session, send_httphandle, send_cb,
- send_cbarg);
+ isc__nm_httpsession_detach(&tmpsess);
return;
} else {
- /* Resume reading, it's
- * idempotent, wait for more
+ /*
+ * Resume reading, it's idempotent, wait for more
*/
isc_nm_read(session->handle, http_readcb, session);
}
isc_nm_read_stop(session->handle);
}
- if (send_cb != NULL) {
- INSIST(VALID_NMHANDLE(send_httphandle));
- (void)http_send_outgoing(session, send_httphandle, send_cb,
- send_cbarg);
- } else {
- INSIST(send_httphandle == NULL);
- INSIST(send_cb == NULL);
- INSIST(send_cbarg == NULL);
- (void)http_send_outgoing(session, NULL, NULL, NULL);
+ /* we might have some data to send after processing */
+ (void)http_send_outgoing(session, NULL, NULL, NULL);
+
+ if (nghttp2_session_want_read(session->ngsession) == 0 &&
+ nghttp2_session_want_write(session->ngsession) == 0 &&
+ session->pending_write_data == NULL)
+ {
+ session->closing = true;
+ isc_nm_read_stop(session->handle);
+ if (session->sending == 0) {
+ finish_http_session(session);
+ }
}
return;
}
+static void
+http_do_bio_async_cb(void *arg) {
+ isc_nm_http_session_t *session = arg;
+
+ REQUIRE(VALID_HTTP2_SESSION(session));
+
+ if (session->handle != NULL &&
+ !isc__nmsocket_closing(session->handle->sock))
+ {
+ http_do_bio(session, NULL, NULL, NULL);
+ }
+
+ isc__nm_httpsession_detach(&session);
+}
+
+static void
+http_do_bio_async(isc_nm_http_session_t *session) {
+ isc_nm_http_session_t *tmpsess = NULL;
+
+ REQUIRE(VALID_HTTP2_SESSION(session));
+
+ if (session->handle == NULL ||
+ isc__nmsocket_closing(session->handle->sock))
+ {
+ return;
+ }
+ isc__nm_httpsession_attach(session, &tmpsess);
+ isc_async_run(session->handle->sock->worker->loop, http_do_bio_async_cb,
+ tmpsess);
+}
+
static isc_result_t
get_http_cstream(isc_nmsocket_t *sock, http_cstream_t **streamp) {
http_cstream_t *cstream = sock->h2->connect.cstream;
if (result != ISC_R_SUCCESS) {
data = NULL;
}
+ if (result == ISC_R_SUCCESS) {
+ socket->h2->request_received = true;
+ socket->h2->session->received++;
+ }
socket->h2->cb(handle, result, data, socket->h2->cbarg);
isc_nmhandle_detach(&handle);
}