From: Tomas Krizek Date: Fri, 7 Aug 2020 11:33:06 +0000 (+0200) Subject: daemon/http: store stream_id per request to support query pipelining X-Git-Tag: v5.2.0~15^2~50 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b009eb25d20d4909f394565a0a661d7a0a12be05;p=thirdparty%2Fknot-resolver.git daemon/http: store stream_id per request to support query pipelining --- diff --git a/daemon/http.c b/daemon/http.c index d14555533..afba89a3f 100644 --- a/daemon/http.c +++ b/daemon/http.c @@ -143,9 +143,9 @@ static ssize_t send_response_callback(nghttp2_session *session, int32_t stream_i return send; } -int http_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb cb) +int http_write(uv_write_t *req, uv_handle_t *handle, int32_t stream_id, knot_pkt_t *pkt, uv_write_cb cb) { - if (!pkt || !handle || !handle->data) { + if (!pkt || !handle || !handle->data || stream_id < 0) { return kr_error(EINVAL); } @@ -176,7 +176,7 @@ int http_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_c MAKE_NV("content-length", 14, size, size_len) }; - int ret = nghttp2_submit_response(http_ctx->session, http_ctx->request_stream_id, hdrs, sizeof(hdrs)/sizeof(*hdrs), &data_prd); + int ret = nghttp2_submit_response(http_ctx->session, stream_id, hdrs, sizeof(hdrs)/sizeof(*hdrs), &data_prd); if (ret != 0) { kr_log_error("[%s] nghttp2_submit_response failed: %s (%d)\n", server_logstring, nghttp2_strerror(ret), ret); return kr_error(EIO); @@ -186,7 +186,7 @@ int http_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_c kr_log_error("[%s] nghttp2_session_send failed: %s (%d)\n", server_logstring, nghttp2_strerror(ret), ret); return kr_error(EIO); } - + /* The data is now accepted in gnutls internal buffers, the message can be treated as sent */ req->handle = (uv_stream_t *)handle; cb(req, 0); @@ -201,4 +201,4 @@ void http_free(struct http_ctx_t *ctx) } nghttp2_session_del(ctx->session); ctx->session = NULL; -} \ No newline at end of file +} diff --git a/daemon/http.h b/daemon/http.h index 3ab1a233c..9f957d64d 100644 --- a/daemon/http.h +++ b/daemon/http.h @@ -28,5 +28,5 @@ struct http_ctx_t { struct http_ctx_t* http_new(http_send_callback cb, void *user_ctx); ssize_t http_process_input_data(struct session *s, const uint8_t *buf, ssize_t nread); -int http_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb cb); -void http_free(struct http_ctx_t *ctx); \ No newline at end of file +int http_write(uv_write_t *req, uv_handle_t *handle, int32_t stream_id, knot_pkt_t *pkt, uv_write_cb cb); +void http_free(struct http_ctx_t *ctx); diff --git a/daemon/io.c b/daemon/io.c index 7154c2f43..c7aa8d534 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -359,12 +359,12 @@ static ssize_t tcp_send(const uint8_t *buffer, const size_t buffer_len, void *us // return kr_error(EIO); //} //memcpy(buffer_backup, buffer, buffer_len); - + uv_write_t *req = (uv_write_t *)calloc(1, sizeof(uv_write_t)); if (!req) { return kr_error(EIO); } - + const uv_buf_t uv_buffer = { //.base = buffer_backup, .base = buffer, diff --git a/daemon/lua/kres-gen.lua b/daemon/lua/kres-gen.lua index eef26bfd7..60f0212e7 100644 --- a/daemon/lua/kres-gen.lua +++ b/daemon/lua/kres-gen.lua @@ -171,6 +171,7 @@ struct kr_request { const knot_pkt_t *packet; struct kr_request_qsource_flags flags; size_t size; + int32_t stream_id; } qsource; struct { unsigned int rtt; diff --git a/daemon/worker.c b/daemon/worker.c index 47bb3b0e1..5ff65286b 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -298,6 +298,14 @@ static struct request_ctx *request_create(struct worker_ctx *worker, req->qsource.flags.tcp = session_get_handle(session)->type == UV_TCP; req->qsource.flags.tls = session_flags(session)->has_tls; req->qsource.flags.http = session_flags(session)->has_http; + req->qsource.stream_id = -1; + if (req->qsource.flags.http) { + struct http_ctx_t *http_ctx = session_http_get_server_ctx(session); + // TODO maybe assert? + if (http_ctx) { + req->qsource.stream_id = http_ctx->request_stream_id; + } + } /* We need to store a copy of peer address. */ memcpy(&ctx->source.addr.ip, peer, kr_sockaddr_len(peer)); req->qsource.addr = &ctx->source.addr.ip; @@ -612,7 +620,7 @@ static int qr_task_send(struct qr_task *task, struct session *session, if (session_flags(session)->has_http) { uv_write_t *write_req = (uv_write_t *)ioreq; write_req->data = task; - ret = http_write(write_req, handle, pkt, &on_write); + ret = http_write(write_req, handle, ctx->req.qsource.stream_id, pkt, &on_write); } else if (session_flags(session)->has_tls) { uv_write_t *write_req = (uv_write_t *)ioreq; write_req->data = task; diff --git a/lib/resolve.h b/lib/resolve.h index 3a2243069..db596a387 100644 --- a/lib/resolve.h +++ b/lib/resolve.h @@ -191,6 +191,7 @@ struct kr_request { const knot_pkt_t *packet; struct kr_request_qsource_flags flags; /**< See definition above. */ size_t size; /**< query packet size */ + int32_t stream_id; /**< HTTP/2 stream ID for DoH requests */ } qsource; struct { unsigned rtt; /**< Current upstream RTT */ @@ -248,7 +249,7 @@ int kr_resolve_begin(struct kr_request *request, struct kr_context *ctx, knot_pk * Consume input packet (may be either first query or answer to query originated from kr_resolve_produce()) * * @note If the I/O fails, provide an empty or NULL packet, this will make iterator recognize nameserver failure. - * + * * @param request request state (awaiting input) * @param src [in] packet source address * @param packet [in] input packet @@ -263,7 +264,7 @@ int kr_resolve_consume(struct kr_request *request, const struct sockaddr *src, k * If the CONSUME is returned then dst, type and packet will be filled with * appropriate values and caller is responsible to send them and receive answer. * If it returns any other state, then content of the variables is undefined. - * + * * @param request request state (in PRODUCE state) * @param dst [out] possible address of the next nameserver * @param type [out] possible used socket type (SOCK_STREAM, SOCK_DGRAM)