#define MAKE_STATIC_NV(K, V) \
MAKE_NV(K, sizeof(K) - 1, V, sizeof(V) - 1)
-#define HTTP_MAX_CONCURRENT_STREAMS 1
+/* Use same maximum as for tcp_pipeline_max. */
+#define HTTP_MAX_CONCURRENT_STREAMS UINT16_MAX
#define MAX_DECIMAL_LENGTH(VT) (CHAR_BIT * sizeof(VT) / 3) + 3
if (!strcasecmp(":path", (const char *)name)) {
char *beg = strstr((const char *)value, key);
if (beg) {
+ // TODO check we're not interefing with incomplete stream
beg += sizeof(key) - 1;
char *end = strchrnul(beg, '&');
ctx->wire_len = kr_base64url_decode((uint8_t*)beg, end - beg, ctx->wire + sizeof(uint16_t), ctx->wire_len - sizeof(uint16_t));
- ctx->request_stream_id = frame->hd.stream_id;
+ queue_push(ctx->streams, frame->hd.stream_id);
}
}
return 0;
}
-static int query_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data)
+/* This method is called for data received via POST. */
+static int data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data)
{
struct http_ctx_t *ctx = (struct http_ctx_t *)user_data;
- memcpy(ctx->wire + sizeof(uint16_t), data, len);
- ctx->wire_len = len;
- ctx->request_stream_id = stream_id;
+
+ if (ctx->incomplete_stream && queue_len(ctx->streams) > 0 && queue_tail(ctx->streams) != stream_id) {
+ /* If the received DATA chunk is from a different stream
+ * than the one being currently handled, ignore it and refuse
+ * the stream. */
+ kr_log_verbose("[doh2] resetting http stream due to incomplete data\n");
+ nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE, stream_id, NGHTTP2_REFUSED_STREAM);
+ return 0;
+ }
+
+ // TODO is there enough space in the wire buffer?
+ if (!ctx->incomplete_stream) {
+ ctx->incomplete_stream = true;
+ queue_push(ctx->streams, stream_id);
+
+ ctx->wire += sizeof(uint16_t);
+ ctx->wire_len = 0;
+ }
+ memcpy(ctx->wire + ctx->wire_len, data, len);
+ ctx->wire_len += len;
+
+ return 0;
+}
+
+static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
+{
+ struct http_ctx_t *ctx = (struct http_ctx_t *)user_data;
+
+ if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && ctx->incomplete_stream) {
+ ctx->incomplete_stream = false;
+
+ knot_wire_write_u16(ctx->wire - sizeof(uint16_t), ctx->wire_len); // TODO wire_len can be overflow when negative int32_t
+ ctx->submitted += ctx->wire_len + sizeof(uint16_t);
+ ctx->wire += ctx->wire_len;
+ }
+
return 0;
}
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
nghttp2_session_callbacks_set_on_header_callback(callbacks, header_callback);
- nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, query_recv_callback);
+ nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, data_chunk_recv_callback);
+ nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
struct http_ctx_t *ctx = calloc(1UL, sizeof(struct http_ctx_t));
ctx->send_cb = cb;
ctx->user_ctx = user_ctx;
- ctx->request_stream_id = -1;
+ queue_init(ctx->streams);
+ ctx->incomplete_stream = false;
+ ctx->submitted = 0;
nghttp2_session_server_new(&ctx->session, callbacks, ctx);
nghttp2_session_callbacks_del(callbacks);
return kr_error(ENOSYS);
}
- http_p->wire = session_wirebuf_get_free_start(s);
- http_p->wire_len = session_wirebuf_get_free_size(s);
+ http_p->submitted = 0;
+ http_p->wire_start_idx = session_wirebuf_get_free_start(s);
+ http_p->wire = http_p->wire_start_idx;
+ // http_p->wire_len = session_wirebuf_get_free_size(s); // TODO initialize this for GET
ssize_t ret = 0;
if ((ret = nghttp2_session_mem_recv(http_p->session, in_buf, in_buf_len)) < 0) {
kr_log_error("[%s] nghttp2_session_mem_recv failed: %s (%zd)\n", server_logstring, nghttp2_strerror(ret), ret);
return kr_error(EIO);
}
- ssize_t submitted = 0;
- if (http_p->request_stream_id >= 0) {
- knot_wire_write_u16(http_p->wire, http_p->wire_len);
- submitted = http_p->wire_len + sizeof(uint16_t);
- }
-
- return submitted;
+ return http_p->submitted;
}
static ssize_t send_response_callback(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data)
{
struct http_data_buffer *buffer = (struct http_data_buffer *)source->ptr;
- struct http_ctx_t *ctx = (struct http_ctx_t *)user_data; //TODO remove maybe
size_t send = MIN(buffer->end - buffer->data, length);
memcpy(buf, buffer->data, send);
buffer->data += send;
//*data_flags |= (buffer->data == buffer->end) ? NGHTTP2_DATA_FLAG_EOF : 0;
if (buffer->data == buffer->end) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
- ctx->request_stream_id = -1;
}
return send;
}
if (ctx == NULL || ctx->session == NULL) {
return;
}
+ queue_deinit(ctx->streams);
nghttp2_session_del(ctx->session);
ctx->session = NULL;
}