]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/http: support multiple subsequent streams in decoded tls data
authorTomas Krizek <tomas.krizek@nic.cz>
Fri, 7 Aug 2020 11:54:46 +0000 (13:54 +0200)
committerTomas Krizek <tomas.krizek@nic.cz>
Tue, 13 Oct 2020 10:55:22 +0000 (12:55 +0200)
daemon/http.c
daemon/http.h
daemon/worker.c

index afba89a3fd5d450f3bb7f5cab33ce4c2fc2dd3a0..891a44e647ac75087470e8e5489ec5512a6601fc 100644 (file)
@@ -28,7 +28,8 @@
 #define MAKE_STATIC_NV(K, V) \
        MAKE_NV(K, sizeof(K) - 1, V, sizeof(V) - 1)
 
-#define HTTP_MAX_CONCURRENT_STREAMS 1
+/* Use same maximum as for tcp_pipeline_max. */
+#define HTTP_MAX_CONCURRENT_STREAMS UINT16_MAX
 
 #define MAX_DECIMAL_LENGTH(VT) (CHAR_BIT * sizeof(VT) / 3) + 3
 
@@ -54,21 +55,56 @@ static int header_callback(nghttp2_session *session, const nghttp2_frame *frame,
        if (!strcasecmp(":path", (const char *)name)) {
                char *beg = strstr((const char *)value, key);
                if (beg) {
+                       // TODO check we're not interefing with incomplete stream
                        beg += sizeof(key) - 1;
                        char *end = strchrnul(beg, '&');
                        ctx->wire_len = kr_base64url_decode((uint8_t*)beg, end - beg, ctx->wire + sizeof(uint16_t), ctx->wire_len - sizeof(uint16_t));
-                       ctx->request_stream_id = frame->hd.stream_id;
+                       queue_push(ctx->streams, frame->hd.stream_id);
                }
        }
        return 0;
 }
 
-static int query_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data)
+/* This method is called for data received via POST. */
+static int data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data)
 {
        struct http_ctx_t *ctx = (struct http_ctx_t *)user_data;
-       memcpy(ctx->wire + sizeof(uint16_t), data, len);
-       ctx->wire_len = len;
-       ctx->request_stream_id = stream_id;
+
+       if (ctx->incomplete_stream && queue_len(ctx->streams) > 0 && queue_tail(ctx->streams) != stream_id) {
+               /* If the received DATA chunk is from a different stream
+                * than the one being currently handled, ignore it and refuse
+                * the stream. */
+               kr_log_verbose("[doh2] resetting http stream due to incomplete data\n");
+               nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE, stream_id, NGHTTP2_REFUSED_STREAM);
+               return 0;
+       }
+
+       // TODO is there enough space in the wire buffer?
+       if (!ctx->incomplete_stream) {
+               ctx->incomplete_stream = true;
+               queue_push(ctx->streams, stream_id);
+
+               ctx->wire += sizeof(uint16_t);
+               ctx->wire_len = 0;
+       }
+       memcpy(ctx->wire + ctx->wire_len, data, len);
+       ctx->wire_len += len;
+
+       return 0;
+}
+
+static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
+{
+       struct http_ctx_t *ctx = (struct http_ctx_t *)user_data;
+
+       if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && ctx->incomplete_stream) {
+               ctx->incomplete_stream = false;
+
+               knot_wire_write_u16(ctx->wire - sizeof(uint16_t), ctx->wire_len);  // TODO wire_len can be overflow when negative  int32_t
+               ctx->submitted += ctx->wire_len + sizeof(uint16_t);
+               ctx->wire += ctx->wire_len;
+       }
+
        return 0;
 }
 
@@ -80,12 +116,15 @@ struct http_ctx_t* http_new(http_send_callback cb, void *user_ctx)
        nghttp2_session_callbacks_new(&callbacks);
        nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
        nghttp2_session_callbacks_set_on_header_callback(callbacks, header_callback);
-       nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, query_recv_callback);
+       nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, data_chunk_recv_callback);
+       nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
 
        struct http_ctx_t *ctx = calloc(1UL, sizeof(struct http_ctx_t));
        ctx->send_cb = cb;
        ctx->user_ctx = user_ctx;
-       ctx->request_stream_id = -1;
+       queue_init(ctx->streams);
+       ctx->incomplete_stream = false;
+       ctx->submitted = 0;
 
        nghttp2_session_server_new(&ctx->session, callbacks, ctx);
        nghttp2_session_callbacks_del(callbacks);
@@ -106,8 +145,10 @@ ssize_t http_process_input_data(struct session *s, const uint8_t *in_buf, ssize_
                return kr_error(ENOSYS);
        }
 
-       http_p->wire = session_wirebuf_get_free_start(s);
-       http_p->wire_len = session_wirebuf_get_free_size(s);
+       http_p->submitted = 0;
+       http_p->wire_start_idx = session_wirebuf_get_free_start(s);
+       http_p->wire = http_p->wire_start_idx;
+       // http_p->wire_len = session_wirebuf_get_free_size(s);  // TODO initialize this for GET
        ssize_t ret = 0;
        if ((ret = nghttp2_session_mem_recv(http_p->session, in_buf, in_buf_len)) < 0) {
                kr_log_error("[%s] nghttp2_session_mem_recv failed: %s (%zd)\n", server_logstring, nghttp2_strerror(ret), ret);
@@ -119,26 +160,18 @@ ssize_t http_process_input_data(struct session *s, const uint8_t *in_buf, ssize_
                return kr_error(EIO);
        }
 
-       ssize_t submitted = 0;
-       if (http_p->request_stream_id >= 0) {
-               knot_wire_write_u16(http_p->wire, http_p->wire_len);
-               submitted = http_p->wire_len + sizeof(uint16_t);
-       }
-
-       return submitted;
+       return http_p->submitted;
 }
 
 static ssize_t send_response_callback(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data)
 {
        struct http_data_buffer *buffer = (struct http_data_buffer *)source->ptr;
-       struct http_ctx_t *ctx = (struct http_ctx_t *)user_data; //TODO remove maybe
        size_t send = MIN(buffer->end - buffer->data, length);
        memcpy(buf, buffer->data, send);
        buffer->data += send;
        //*data_flags |= (buffer->data == buffer->end) ? NGHTTP2_DATA_FLAG_EOF : 0;
        if (buffer->data == buffer->end) {
                *data_flags |= NGHTTP2_DATA_FLAG_EOF;
-               ctx->request_stream_id = -1;
        }
        return send;
 }
@@ -199,6 +232,7 @@ void http_free(struct http_ctx_t *ctx)
        if (ctx == NULL || ctx->session == NULL) {
                return;
        }
+       queue_deinit(ctx->streams);
        nghttp2_session_del(ctx->session);
        ctx->session = NULL;
 }
index 9f957d64d52d78b575336158efad53a508b04709..767c3d889dc3046fb78aa72303efed99124744ad 100644 (file)
 #include <nghttp2/nghttp2.h>
 #include <libknot/packet/pkt.h>
 
+#include "lib/generic/queue.h"
+
 /** Transport session (opaque). */
 struct session;
 
 typedef ssize_t(*http_send_callback)(const uint8_t *buffer, const size_t buffer_len, void *user_ctx);
 
+typedef queue_t(int32_t) queue_int32_t;
+
 struct http_ctx_t {
        struct nghttp2_session *session;
        http_send_callback send_cb;
        void *user_ctx;
-       int32_t request_stream_id;
+       queue_int32_t streams;  /* List of stream IDs of read HTTP/2 frames. */
+       bool incomplete_stream;
+       ssize_t submitted;
        uint8_t *wire;
+       uint8_t *wire_start_idx;
        int32_t wire_len;
 };
 
index 5ff65286b33c7970682feed2341091e823f41fec..5a54e4947bacf2574239be926fcff6df4f665e35 100644 (file)
@@ -302,8 +302,9 @@ static struct request_ctx *request_create(struct worker_ctx *worker,
                if (req->qsource.flags.http) {
                        struct http_ctx_t *http_ctx = session_http_get_server_ctx(session);
                        // TODO maybe assert?
-                       if (http_ctx) {
-                               req->qsource.stream_id = http_ctx->request_stream_id;
+                       if (http_ctx && queue_len(http_ctx->streams) > 0) {
+                               req->qsource.stream_id = queue_head(http_ctx->streams);
+                               queue_pop(http_ctx->streams);
                        }
                }
                /* We need to store a copy of peer address. */