/* Submit data to wirebuffer. */
knot_wire_write_u16(ctx->buf, len);
+ ctx->submitted_stream = stream_id;
ctx->submitted += ctx->buf_pos;
ctx->buf += ctx->buf_pos;
ctx->buf_pos = 0;
return NGHTTP2_ERR_CALLBACK_FAILURE;
if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) && ctx->incomplete_stream == stream_id) {
+ ctx->streaming = false;
+
if (ctx->current_method == HTTP_METHOD_GET || ctx->current_method == HTTP_METHOD_HEAD) {
if (process_uri_path(ctx, ctx->uri_path, stream_id) < 0) {
/* End processing - don't submit to wirebuffer. */
* Process inbound HTTP/2 data and return number of bytes read into session wire buffer.
*
* This function may trigger outgoing HTTP/2 data, such as stream resets, window updates etc.
+ *
+ * Returns 1 if stream has not ended yet, 0 if the stream has ended, or
+ * a negative value on error.
*/
-ssize_t http_process_input_data(struct session *session, const uint8_t *buf,
- ssize_t nread)
+int http_process_input_data(struct session *session, const uint8_t *buf,
+ ssize_t nread, ssize_t *out_submitted)
{
struct http_ctx *ctx = session_http_get_server_ctx(session);
ssize_t ret = 0;
* query will be ignored). This may also be problematic in other
* cases. */
ctx->submitted = 0;
+ ctx->streaming = true;
ctx->buf = session_wirebuf_get_free_start(session);
ctx->buf_pos = 0;
ctx->buf_size = session_wirebuf_get_free_size(session);
return kr_error(EIO);
}
- return ctx->submitted;
+ *out_submitted = ctx->submitted;
+ return ctx->streaming;
+}
+
+int http_send_bad_request(struct session *session)
+{
+ struct http_ctx *ctx = session_http_get_server_ctx(session);
+ if (ctx->submitted_stream >= 0)
+ return http_send_response(ctx, ctx->submitted_stream, NULL, HTTP_STATUS_BAD_REQUEST);
+
+ return 0;
}
/*
queue_http_stream streams; /* Streams present in the wire buffer. */
trie_t *stream_write_data; /* Dictionary of stream data that needs to be freed after write. */
int32_t incomplete_stream;
+ int32_t submitted_stream; /* Stream whose data has been submitted to the wire buffer. */
ssize_t submitted;
http_method_t current_method;
char *uri_path;
uint8_t *buf; /* Part of the wire_buf that belongs to current HTTP/2 stream. */
ssize_t buf_pos;
ssize_t buf_size;
+ bool streaming; /* True: not all data in the stream has been received yet. */
};
#if ENABLE_DOH2
struct http_ctx* http_new(struct session *session, http_send_callback send_cb);
-ssize_t http_process_input_data(struct session *session, const uint8_t *buf, ssize_t nread);
+int http_process_input_data(struct session *session, const uint8_t *buf,
+ ssize_t nread, ssize_t *out_submitted);
+int http_send_bad_request(struct session *session);
int http_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t* pkt, int32_t stream_id,
uv_write_cb on_write);
void http_free(struct http_ctx *ctx);
data_len = consumed;
}
#if ENABLE_DOH2
+ int streaming = 1;
if (session_flags(s)->has_http) {
- consumed = http_process_input_data(s, data, data_len);
- if (consumed < 0) {
+ streaming = http_process_input_data(s, data, data_len, &consumed);
+ if (streaming < 0) {
if (kr_log_is_debug(IO, NULL)) {
char *peer_str = kr_straddr(src_addr);
kr_log_debug(IO, "=> connection to '%s': "
}
worker_end_tcp(s);
return;
- } else if (consumed == 0) {
+ }
+ if (consumed == 0) {
return;
}
data = session_wirebuf_get_free_start(s);
}
session_wirebuf_compress(s);
mp_flush(the_worker->pkt_pool.ctx);
+#if ENABLE_DOH2
+ if (session_flags(s)->has_http && streaming == 0 && ret == 0) {
+ ret = http_send_bad_request(s);
+ if (ret < 0) {
+ /* An error has occurred, close the session. */
+ worker_end_tcp(s);
+ }
+ }
+#endif
}
#if ENABLE_DOH2