From: Tomas Krizek Date: Fri, 7 Aug 2020 11:54:46 +0000 (+0200) Subject: daemon/http: support multiple subsequent streams in decoded tls data X-Git-Tag: v5.2.0~15^2~49 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=93eedc7e9f9cfdd814037d3b477f99dfe1f05191;p=thirdparty%2Fknot-resolver.git daemon/http: support multiple subsequent streams in decoded tls data --- diff --git a/daemon/http.c b/daemon/http.c index afba89a3f..891a44e64 100644 --- a/daemon/http.c +++ b/daemon/http.c @@ -28,7 +28,8 @@ #define MAKE_STATIC_NV(K, V) \ MAKE_NV(K, sizeof(K) - 1, V, sizeof(V) - 1) -#define HTTP_MAX_CONCURRENT_STREAMS 1 +/* Use same maximum as for tcp_pipeline_max. */ +#define HTTP_MAX_CONCURRENT_STREAMS UINT16_MAX #define MAX_DECIMAL_LENGTH(VT) (CHAR_BIT * sizeof(VT) / 3) + 3 @@ -54,21 +55,56 @@ static int header_callback(nghttp2_session *session, const nghttp2_frame *frame, if (!strcasecmp(":path", (const char *)name)) { char *beg = strstr((const char *)value, key); if (beg) { + // TODO check we're not interefing with incomplete stream beg += sizeof(key) - 1; char *end = strchrnul(beg, '&'); ctx->wire_len = kr_base64url_decode((uint8_t*)beg, end - beg, ctx->wire + sizeof(uint16_t), ctx->wire_len - sizeof(uint16_t)); - ctx->request_stream_id = frame->hd.stream_id; + queue_push(ctx->streams, frame->hd.stream_id); } } return 0; } -static int query_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data) +/* This method is called for data received via POST. */ +static int data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data) { struct http_ctx_t *ctx = (struct http_ctx_t *)user_data; - memcpy(ctx->wire + sizeof(uint16_t), data, len); - ctx->wire_len = len; - ctx->request_stream_id = stream_id; + + if (ctx->incomplete_stream && queue_len(ctx->streams) > 0 && queue_tail(ctx->streams) != stream_id) { + /* If the received DATA chunk is from a different stream + * than the one being currently handled, ignore it and refuse + * the stream. */ + kr_log_verbose("[doh2] resetting http stream due to incomplete data\n"); + nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE, stream_id, NGHTTP2_REFUSED_STREAM); + return 0; + } + + // TODO is there enough space in the wire buffer? + if (!ctx->incomplete_stream) { + ctx->incomplete_stream = true; + queue_push(ctx->streams, stream_id); + + ctx->wire += sizeof(uint16_t); + ctx->wire_len = 0; + } + memcpy(ctx->wire + ctx->wire_len, data, len); + ctx->wire_len += len; + + return 0; +} + +static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) +{ + struct http_ctx_t *ctx = (struct http_ctx_t *)user_data; + + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && ctx->incomplete_stream) { + ctx->incomplete_stream = false; + + knot_wire_write_u16(ctx->wire - sizeof(uint16_t), ctx->wire_len); // TODO wire_len can be overflow when negative int32_t + ctx->submitted += ctx->wire_len + sizeof(uint16_t); + ctx->wire += ctx->wire_len; + } + return 0; } @@ -80,12 +116,15 @@ struct http_ctx_t* http_new(http_send_callback cb, void *user_ctx) nghttp2_session_callbacks_new(&callbacks); nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); nghttp2_session_callbacks_set_on_header_callback(callbacks, header_callback); - nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, query_recv_callback); + nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, data_chunk_recv_callback); + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback); struct http_ctx_t *ctx = calloc(1UL, sizeof(struct http_ctx_t)); ctx->send_cb = cb; ctx->user_ctx = user_ctx; - ctx->request_stream_id = -1; + queue_init(ctx->streams); + ctx->incomplete_stream = false; + ctx->submitted = 0; nghttp2_session_server_new(&ctx->session, callbacks, ctx); nghttp2_session_callbacks_del(callbacks); @@ -106,8 +145,10 @@ ssize_t http_process_input_data(struct session *s, const uint8_t *in_buf, ssize_ return kr_error(ENOSYS); } - http_p->wire = session_wirebuf_get_free_start(s); - http_p->wire_len = session_wirebuf_get_free_size(s); + http_p->submitted = 0; + http_p->wire_start_idx = session_wirebuf_get_free_start(s); + http_p->wire = http_p->wire_start_idx; + // http_p->wire_len = session_wirebuf_get_free_size(s); // TODO initialize this for GET ssize_t ret = 0; if ((ret = nghttp2_session_mem_recv(http_p->session, in_buf, in_buf_len)) < 0) { kr_log_error("[%s] nghttp2_session_mem_recv failed: %s (%zd)\n", server_logstring, nghttp2_strerror(ret), ret); @@ -119,26 +160,18 @@ ssize_t http_process_input_data(struct session *s, const uint8_t *in_buf, ssize_ return kr_error(EIO); } - ssize_t submitted = 0; - if (http_p->request_stream_id >= 0) { - knot_wire_write_u16(http_p->wire, http_p->wire_len); - submitted = http_p->wire_len + sizeof(uint16_t); - } - - return submitted; + return http_p->submitted; } static ssize_t send_response_callback(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data) { struct http_data_buffer *buffer = (struct http_data_buffer *)source->ptr; - struct http_ctx_t *ctx = (struct http_ctx_t *)user_data; //TODO remove maybe size_t send = MIN(buffer->end - buffer->data, length); memcpy(buf, buffer->data, send); buffer->data += send; //*data_flags |= (buffer->data == buffer->end) ? NGHTTP2_DATA_FLAG_EOF : 0; if (buffer->data == buffer->end) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; - ctx->request_stream_id = -1; } return send; } @@ -199,6 +232,7 @@ void http_free(struct http_ctx_t *ctx) if (ctx == NULL || ctx->session == NULL) { return; } + queue_deinit(ctx->streams); nghttp2_session_del(ctx->session); ctx->session = NULL; } diff --git a/daemon/http.h b/daemon/http.h index 9f957d64d..767c3d889 100644 --- a/daemon/http.h +++ b/daemon/http.h @@ -12,17 +12,24 @@ #include #include +#include "lib/generic/queue.h" + /** Transport session (opaque). */ struct session; typedef ssize_t(*http_send_callback)(const uint8_t *buffer, const size_t buffer_len, void *user_ctx); +typedef queue_t(int32_t) queue_int32_t; + struct http_ctx_t { struct nghttp2_session *session; http_send_callback send_cb; void *user_ctx; - int32_t request_stream_id; + queue_int32_t streams; /* List of stream IDs of read HTTP/2 frames. */ + bool incomplete_stream; + ssize_t submitted; uint8_t *wire; + uint8_t *wire_start_idx; int32_t wire_len; }; diff --git a/daemon/worker.c b/daemon/worker.c index 5ff65286b..5a54e4947 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -302,8 +302,9 @@ static struct request_ctx *request_create(struct worker_ctx *worker, if (req->qsource.flags.http) { struct http_ctx_t *http_ctx = session_http_get_server_ctx(session); // TODO maybe assert? - if (http_ctx) { - req->qsource.stream_id = http_ctx->request_stream_id; + if (http_ctx && queue_len(http_ctx->streams) > 0) { + req->qsource.stream_id = queue_head(http_ctx->streams); + queue_pop(http_ctx->streams); } } /* We need to store a copy of peer address. */