From: Frantisek Tobias Date: Tue, 29 Jul 2025 07:23:59 +0000 (+0200) Subject: daemon/quic: support for multiple streams: collect finished queries into wire_buf... X-Git-Tag: v6.2.0~2^2~54 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=08d1927b3de90d1e26fa41dfdba4a46dc6a004f9;p=thirdparty%2Fknot-resolver.git daemon/quic: support for multiple streams: collect finished queries into wire_buf a proceed to the next layer --- diff --git a/daemon/quic.c b/daemon/quic.c index 03b2df5ab..b10820d5a 100644 --- a/daemon/quic.c +++ b/daemon/quic.c @@ -5,18 +5,19 @@ #include #include #include -#include #include #include #include #include "lib/defines.h" #include "lib/log.h" #include "lib/resolve-impl.h" +// #include "mempattern.h" #include "session2.h" #include "network.h" #include "lib/resolve.h" // #include "libknot/quic/quic.h" #include "libdnssec/random.h" +#include #include #include #include @@ -162,10 +163,14 @@ kr_quic_conn_t *kr_quic_table_add(ngtcp2_conn *ngconn, const ngtcp2_cid *cid, if (conn == NULL) return NULL; + /* FIXME magic numbers */ + + conn->conn = ngconn; conn->quic_table = table; conn->stream_inprocess = -1; conn->qlog_fd = -1; + wire_buf_init(&conn->unwrap_buf, 1200); conn->next_expiry = UINT64_MAX; if (!heap_insert(table->expiry_heap, (heap_val_t *)conn)) { @@ -788,6 +793,8 @@ static int pl_quic_sess_init(struct session2 *session, void *sess_data, void *pa quic->conn_count = 0; } + // TODO set setings? + return 0; } @@ -1087,7 +1094,7 @@ finish: /** */ static int handle_packet(struct pl_quic_sess_data *quic, - struct protolayer_iter_ctx *pkt_ctx, + struct protolayer_iter_ctx *ctx, const uint8_t *pkt, size_t pktlen, ngtcp2_cid *dcid, struct kr_quic_conn **out_conn) { @@ -1104,11 +1111,12 @@ static int handle_packet(struct pl_quic_sess_data *quic, uint32_t supported_quic[1] = { NGTCP2_PROTO_VER_V1 }; if (ret == NGTCP2_ERR_VERSION_NEGOTIATION) { // FIXME: This will be broken by trimming the pkt below + // this will need to be sent out immediatelly ngtcp2_pkt_write_version_negotiation( - wire_buf_free_space(pkt_ctx->payload.wire_buf), - wire_buf_free_space_length(pkt_ctx->payload.wire_buf), + wire_buf_free_space(ctx->payload.wire_buf), + wire_buf_free_space_length(ctx->payload.wire_buf), random(), - // FIXME: Maybe switch + // WARNING Maybe switch decoded_cids.scid, decoded_cids.scidlen, decoded_cids.dcid, @@ -1133,13 +1141,13 @@ static int handle_packet(struct pl_quic_sess_data *quic, qconn = kr_quic_table_lookup(dcid, quic->conn_table); if (!qconn) { - if ((ret = quic_init_server_conn(quic->conn_table, pkt_ctx, + if ((ret = quic_init_server_conn(quic->conn_table, ctx, UINT64_MAX - 1, &scid, dcid, decoded_cids, pkt, pktlen, &qconn)) != kr_ok()) { return ret; } - // if ((ret = wire_buf_trim(pkt_ctx->payload.wire_buf, pktlen)) != 0) { + // if ((ret = wire_buf_trim(ctx->payload.wire_buf, pktlen)) != 0) { // kr_log_error(DOQ, "wirebuf failed to trim: %s (%d)\n", // kr_strerror(ret), ret); // return kr_error(ret); @@ -1165,8 +1173,10 @@ static int handle_packet(struct pl_quic_sess_data *quic, ret = ngtcp2_conn_read_pkt(qconn->conn, path, &pi, - wire_buf_data(pkt_ctx->payload.wire_buf), - wire_buf_data_length(pkt_ctx->payload.wire_buf), + pkt, + pktlen, + // wire_buf_data(ctx->payload.wire_buf), + // wire_buf_data_length(ctx->payload.wire_buf), now); if (ret == NGTCP2_ERR_DRAINING) { // doq received CONNECTION_CLOSE from the counterpart @@ -1193,17 +1203,17 @@ static int handle_packet(struct pl_quic_sess_data *quic, ngtcp2_conn_handle_expiry(qconn->conn, now); - if (wire_buf_trim(pkt_ctx->payload.wire_buf, wire_buf_data_length(pkt_ctx->payload.wire_buf))) { + if (wire_buf_trim(ctx->payload.wire_buf, pktlen)) { kr_log_error(DOQ, "Failed to trim wire_buf\n"); return ret; } - if (kr_fails_assert(wire_buf_data_length(pkt_ctx->payload.wire_buf) == 0)) { + if (kr_fails_assert(wire_buf_data_length(ctx->payload.wire_buf) == 0)) { kr_log_error(DOQ, "read pkt should consume the entire packet\n"); return -1; /* TODO errcode */ } - // pkt_ctx->comm->target = &dcid; + // ctx->comm->target = &dcid; // kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0); return kr_ok(); @@ -1211,78 +1221,141 @@ static int handle_packet(struct pl_quic_sess_data *quic, void __attribute__ ((noinline)) empty_call(void) { } +static int collect_queries(struct kr_quic_conn *qconn) +{ + + // if (0/* use dns_dgram (iovec input)*/) { + // // Perhaps make permanent for each conn; + // struct iovec *iov = mm_calloc(&ctx->pool, + // qconn->streams_pending, sizeof(struct iovec)); + // kr_require(iov); + // struct protolayer_payload payload = protolayer_payload_iovec(iov, + // 0/* increase upon fill */, false); + // + // int64_t stream_id; + // struct kr_quic_stream *stream; + // while (qconn != NULL && (stream = kr_quic_stream_get_process(qconn, &stream_id)) != NULL) { + // assert(stream->inbufs != NULL); + // assert(stream->inbufs->n_inbufs > 0); + // + // uint16_t msg_size = knot_wire_read_u16(wire_buf_data(&stream->pers_inbuf)); + // payload.iovec.iov[payload.iovec.cnt].iov_base = + // wire_buf_data(&stream->pers_inbuf) + sizeof(uint16_t); + // + // payload.iovec.iov[payload.iovec.cnt].iov_len = msg_size; + // ++payload.iovec.cnt; + // } + // + // struct protolayer_iter_ctx *ctx_head = queue_head(quic->unwrap_queue); + // if (!kr_fails_assert(ctx == ctx_head)) + // queue_pop(quic->unwrap_queue); + // + // if (payload.iovec.cnt > 0) { + // ctx->payload = payload; + // return protolayer_continue(ctx); + // } + // + // mm_free(&ctx->pool, iov); + // return protolayer_break(ctx, kr_error(ENODATA)); + + kr_require(wire_buf_data_length(&qconn->unwrap_buf) == 0); + size_t free_space = wire_buf_free_space_length(&qconn->unwrap_buf); + + int64_t stream_id; + struct kr_quic_stream *stream; + while (qconn != NULL + && (stream = kr_quic_stream_get_process(qconn, + &stream_id)) != NULL) { + + size_t to_write = wire_buf_data_length(&stream->pers_inbuf); + kr_assert(to_write > 0); + if (to_write > free_space) { + kr_log_error(DOQ, "unwrap buf is not big enough\n"); + return kr_error(ENOMEM); + } + + memcpy(wire_buf_free_space(&qconn->unwrap_buf), + wire_buf_data(&stream->pers_inbuf), to_write); + wire_buf_consume(&qconn->unwrap_buf, to_write); + wire_buf_trim(&stream->pers_inbuf, to_write); + /* TODO inspect assembly diff compared to repeated + * call of wire_buf_free_space_length + * should make no difference though*/ + free_space -= to_write; + } + + return wire_buf_data(&qconn->unwrap_buf) > 0 + ? kr_ok() : kr_error(ENODATA); +} + static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx) { - int ret; - struct pl_quic_sess_data *quic = sess_data; + int rv = kr_error(ENOSPC); kr_quic_conn_t *qconn = NULL; + struct pl_quic_sess_data *quic = sess_data; queue_push(quic->unwrap_queue, ctx); while (protolayer_queue_has_payload(&quic->unwrap_queue)) { - struct protolayer_iter_ctx *pkt_ctx = queue_head(quic->unwrap_queue); - - queue_pop(quic->unwrap_queue); - - kr_assert(pkt_ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF); - size_t pktlen = wire_buf_data_length(pkt_ctx->payload.wire_buf); - const uint8_t *pkt = wire_buf_data(pkt_ctx->payload.wire_buf); + kr_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF); ngtcp2_cid dcid = { 0 }; - ret = handle_packet(quic, - /** FIXME */pkt_ctx, - pkt, pktlen, &dcid, &qconn); + int ret = handle_packet(quic, + ctx, + wire_buf_data(ctx->payload.wire_buf), + wire_buf_data_length(ctx->payload.wire_buf), + &dcid, &qconn); + if (ret != kr_ok()) { - protolayer_break(ctx, ret); + goto fail; } /* TODO Verify this doesn't leak */ - pkt_ctx->comm->target = mm_calloc(&ctx->pool, sizeof(ngtcp2_cid), 1); - /* TODO log failed allocation "iteration ctx ran out of memory" */ - kr_require(pkt_ctx->comm->target); - memcpy(pkt_ctx->comm->target, &dcid, sizeof(ngtcp2_cid)); - - if (qconn->stream_inprocess >= 0) { - // This branch is only accessed once a stream has - // finished receiving a query (stream_inprocess received FIN) - // TODO: protolayer_continue with the query in the first finished stream - empty_call(); - struct kr_quic_stream *stream = kr_quic_conn_get_stream( - qconn, qconn->stream_inprocess, true - ); - - if (stream == NULL) { - return KNOT_ENOENT; - } + ctx->comm->target = malloc(sizeof(ngtcp2_cid)); + /* TODO log failed allocation "iterctx ran out of memory" */ + kr_require(ctx->comm->target); + memcpy(ctx->comm->target, &dcid, sizeof(ngtcp2_cid)); + + if (qconn->stream_inprocess == -1) { + /* This will produce a reposponse and pass it towards + * lower layers (UDP or anything between the current + * layer and udp). */ + kr_quic_send(quic->conn_table, qconn, quic, + ctx, QUIC_MAX_SEND_PER_RECV, 0); + rv = kr_ok(); + goto fail; + } - pkt_ctx->payload.wire_buf = &stream->pers_inbuf; - kr_log_info(DOQ, "Proceeding protolayer_continue in quic\n"); - return protolayer_continue(pkt_ctx); + if (kr_fails_assert(queue_len(quic->unwrap_queue) == 1)) { + ret = kr_error(EINVAL); + goto fail; } + } - // ctx->comm->target = pkt_ctx->comm->target; + struct protolayer_iter_ctx *ctx_head = queue_head(quic->unwrap_queue); + if (!kr_fails_assert(ctx == ctx_head)) + queue_pop(quic->unwrap_queue); - if (qconn->flags & KR_QUIC_CONN_HANDSHAKE_DONE) { - // && qconn->flags & ) { - // kr_log_info(DOQ, "Proceeding to next layer\n"); - kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0); - // return protolayer_continue(pkt_ctx); - } else { - // proceed with nodata handshake process - kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0); - } + if ((rv = collect_queries(qconn)) == kr_ok()) { + ctx->payload = protolayer_payload_wire_buf(&qconn->unwrap_buf, + false); + return protolayer_continue(ctx); } - // return protolayer_break(ctx, 0); - return protolayer_continue(ctx); + return protolayer_break(ctx, rv); + +fail: + ctx_head = queue_head(quic->unwrap_queue); + if (!kr_fails_assert(ctx == ctx_head)) + queue_pop(quic->unwrap_queue); + return protolayer_break(ctx, rv); } /* TODO perhaps also move to quic_stream */ static int send_stream(struct protolayer_iter_ctx *ctx, kr_quic_conn_t *qconn, int64_t stream_id, - uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent, - ) + uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent) { assert(stream_id >= 0 || (data == NULL && len == 0)); @@ -1291,18 +1364,21 @@ static int send_stream(struct protolayer_iter_ctx *ctx, kr_log_info(DOQ, "Openning bidirectional stream no: %zu\n", stream_id); - int ret = ngtcp2_conn_open_bidi_stream(qconn->conn, &opened, NULL); + int ret = ngtcp2_conn_open_bidi_stream(qconn->conn, + &opened, NULL); if (ret != kr_ok()) { /** This should not happen */ kr_log_info(DOQ, "remote endpoint isn't ready for streams: %s (%d)\n", ngtcp2_strerror(ret), ret); return ret; } - assert((bool)(opened == stream_id) == kr_quic_stream_exists(qconn, stream_id)); + assert((bool)(opened == stream_id) + == kr_quic_stream_exists(qconn, stream_id)); } - uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN : - NGTCP2_WRITE_STREAM_FLAG_NONE); + uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN + : NGTCP2_WRITE_STREAM_FLAG_NONE); + ngtcp2_vec vec = { .base = data, .len = len }; ngtcp2_pkt_info pi = { 0 }; @@ -1310,111 +1386,122 @@ static int send_stream(struct protolayer_iter_ctx *ctx, ngtcp2_conn_info info = { 0 }; ngtcp2_conn_get_conn_info(qconn->conn, &info); - int nwrite = 0; - nwrite = ngtcp2_conn_writev_stream(qconn->conn, path, &pi, - wire_buf_free_space(ctx->payload.wire_buf), - wire_buf_free_space_length(ctx->payload.wire_buf), - sent, fl, stream_id, &vec, - (stream_id >= 0 ? 1 : 0), quic_timestamp()); - - /* TODO: - * This packet may contain frames other than STREAM frame. The - * packet might not contain STREAM frame if other frames - * occupy the packet. In that case, *pdatalen would - * be -1 if pdatalen is not NULL. - */ - // TODO: abstract error printing, likely shared across mane ngtcp2_ calls - if (nwrite < 0) { - switch (nwrite) { - case NGTCP2_ERR_NOMEM: - kr_log_error(DOQ, "write failed: %s (%d)", - ngtcp2_strerror(nwrite), nwrite); - // TODO terminal - - case NGTCP2_ERR_STREAM_NOT_FOUND: - kr_log_error(DOQ, "write stream failed to find: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); - // TODO terminal - - case NGTCP2_ERR_STREAM_SHUT_WR: - kr_log_error(DOQ, "local write endpoint is shut or stream is beeing reset: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); - // TODO attempt later once (if reset) - - case NGTCP2_ERR_PKT_NUM_EXHAUSTED: - kr_log_error(DOQ, "no more pkt numbers available: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); - // TODO terminal or reset pktn - - case NGTCP2_ERR_CALLBACK_FAILURE: - kr_log_error(DOQ, "user callback failed: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); - // TODO attempt later - - case NGTCP2_ERR_INVALID_ARGUMENT: - kr_log_error(DOQ, "The total length of stream data is too large: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); - // TODO attempt differently - - case NGTCP2_ERR_STREAM_DATA_BLOCKED: - kr_log_error(DOQ, "stream is blocked due to flow controll: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); - // TODO attempt later - - /* only when NGTCP2_WRITE_STREAM_FLAG_MORE (currently not used) */ - case NGTCP2_ERR_WRITE_MORE: - kr_log_error(DOQ, "should not happen: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); - kr_require(false); - default: - kr_log_error(DOQ, "unknown error in writev_stream: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); - kr_require(false); - } - } else if (*sent >= 0) { /** FIXME */ } + struct wire_buf *wb = mm_alloc(&ctx->pool, sizeof(struct wire_buf)); + kr_require(wb); - if (nwrite == 0) { - return nwrite; - } + wire_buf_init(wb, 1200/* TODO perhaps maxudp payload */); + struct protolayer_payload pl = protolayer_payload_wire_buf(wb, false); - if (len) { - /* FIXME this is horrible */ - if (wire_buf_trim(ctx->payload.wire_buf, len) != kr_ok()) { - kr_log_error(DOQ, "Wrire buf failed to trim: %s (%d)\n", + do { + nwrite = ngtcp2_conn_writev_stream(qconn->conn, path, &pi, + wire_buf_free_space(pl.wire_buf), + wire_buf_free_space_length(pl.wire_buf), + sent, fl, stream_id, &vec, + (stream_id >= 0 ? 1 : 0), quic_timestamp()); + + /* TODO: + * This packet may contain frames other than STREAM frame. The + * packet might not contain STREAM frame if other frames + * occupy the packet. In that case, *pdatalen would + * be -1 if pdatalen is not NULL. + */ + // TODO: abstract error printing, likely shared across mane ngtcp2_ calls + if (nwrite < 0) { + mm_free(&ctx->pool, wb); + switch (nwrite) { + case NGTCP2_ERR_NOMEM: + kr_log_error(DOQ, "write failed: %s (%d)", + ngtcp2_strerror(nwrite), nwrite); + // TODO terminal + + case NGTCP2_ERR_STREAM_NOT_FOUND: + kr_log_error(DOQ, "write stream failed to find: %s (%d)\n", + ngtcp2_strerror(nwrite), nwrite); + // TODO terminal + + case NGTCP2_ERR_STREAM_SHUT_WR: + kr_log_error(DOQ, "local write endpoint is shut or stream is beeing reset: %s (%d)\n", + ngtcp2_strerror(nwrite), nwrite); + return kr_ok(); + // TODO attempt later once (if reset) + + case NGTCP2_ERR_PKT_NUM_EXHAUSTED: + kr_log_error(DOQ, "no more pkt numbers available: %s (%d)\n", + ngtcp2_strerror(nwrite), nwrite); + // TODO terminal or reset pktn + + case NGTCP2_ERR_CALLBACK_FAILURE: + kr_log_error(DOQ, "user callback failed: %s (%d)\n", + ngtcp2_strerror(nwrite), nwrite); + // TODO attempt later + + case NGTCP2_ERR_INVALID_ARGUMENT: + kr_log_error(DOQ, "The total length of stream data is too large: %s (%d)\n", + ngtcp2_strerror(nwrite), nwrite); + // TODO attempt differently + + case NGTCP2_ERR_STREAM_DATA_BLOCKED: + kr_log_error(DOQ, "stream is blocked due to flow controll: %s (%d)\n", + ngtcp2_strerror(nwrite), nwrite); + // TODO attempt later + + /* only when NGTCP2_WRITE_STREAM_FLAG_MORE (currently not used) */ + case NGTCP2_ERR_WRITE_MORE: + kr_log_error(DOQ, "should not happen: %s (%d)\n", + ngtcp2_strerror(nwrite), nwrite); + kr_require(false); + default: + kr_log_error(DOQ, "unknown error in writev_stream: %s (%d)\n", + ngtcp2_strerror(nwrite), nwrite); + kr_require(false); + } + + } else if (*sent >= 0) { + /* TODO this data has to be kept untill ack arives */ + vec.len -= *sent; + } + + if (wire_buf_consume(pl.wire_buf, nwrite) != kr_ok()) { + kr_log_error(DOQ, "Wire_buf failed to consume: %s (%d)\n", ngtcp2_strerror(nwrite), nwrite); - return -1; + mm_free(&ctx->pool, wb); + return nwrite; } - } - if (wire_buf_consume(ctx->payload.wire_buf, nwrite) != kr_ok()) { - kr_log_error(DOQ, "Wire_buf failed to consume: %s (%d)\n", - ngtcp2_strerror(nwrite), nwrite); + } while (nwrite && vec.len > 0); + + ctx->payload = pl; + + /* called from wrap, return the written amount and continue */ + if (len) { return nwrite; } /* case HS has not finished, we have to switch to wrap direction * without proceeding to the resolve layer */ - if (nwrite || *sent) { - // written++; + if (nwrite || *sent) { + /* session2_wrap requires nonempty payload + * thats why we call it here, instead + * off calling wrap_after from unwrap and filling + * the buffer via this function called from wrap */ int wrap_ret = session2_wrap_after(ctx->session, PROTOLAYER_TYPE_QUIC, - // int wrap_ret = session2_wrap(ctx->session, - ctx->payload, ctx->comm, - // NULL,/*req*/ ctx->finished_cb, ctx->finished_cb_baton); - return 1; + return nwrite; } else { kr_require(nwrite || *sent); } - return 0; + /* make sure return is negative + * though the code shouldn't currently get here */ + return nwrite <= 0 ? nwrite : -1; } // maybe rename kr_quic_respond? @@ -1422,7 +1509,8 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */, kr_quic_conn_t *conn, /* kr_quic_reply_t *reply */void *sess_data, struct protolayer_iter_ctx *ctx, - unsigned max_msgs, kr_quic_send_flag_t flags) + unsigned max_msgs, + kr_quic_send_flag_t flags) { pl_quic_sess_data_t *quic = (pl_quic_sess_data_t *)sess_data; @@ -1430,6 +1518,7 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */, return kr_error(EINVAL); } else if ((conn->flags & KR_QUIC_CONN_BLOCKED) && !(flags & KR_QUIC_SEND_IGNORE_BLOCKED)) { return kr_error(EINVAL); + // TODO // } else if (reply->handle_ret > 0) { // return send_special(quic_table, reply, conn); } else if (conn == NULL) { @@ -1445,43 +1534,53 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */, unsigned sent_msgs = 0, stream_msgs = 0, ignore_last = ((flags & KR_QUIC_SEND_IGNORE_LASTBYTE) ? 1 : 0); int ret = 1; - /* KnotDNS stores data to be written into a replay in the unsent_obuf - * since this will be called from pl_quic_wrap, and I'll have the - * payload in the form of a iovec anyway, I can just change the - * conditionals a bit and call send_Stream with the payload iovec */ + /* This is really wasteful, we always have payload for one stream. */ for (int64_t si = 0; si < conn->streams_count && sent_msgs < max_msgs; /* NO INCREMENT */) { int64_t stream_id = 4 * (conn->first_stream_id + si); ngtcp2_ssize sent = 0; size_t uf = conn->streams[si].unsent_offset; kr_quic_obuf_t *uo = conn->streams[si].unsent_obuf; - - kr_require(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF); - if (uo == NULL) { si++; // continue; } // bool fin = (((node_t *)uo->node.next)->next == NULL) && ignore_last == 0; - // size_t len = 5534u; /* size of the following buffer */ - // uint8_t *data = /* alloc buffer */ NULL; - // ret = quic_package_payload(quic_table, ctx->payload, sess_data, - // NULL, stream_id, data, len, fin, &sent); - - // ret = send_stream(ctx, conn, stream_id, - // (uint8_t *)uo->buf + uf, uo->len - uf - ignore_last, - // 0/* FIXME `fin` probably for client side "request sent" */, - // &sent); - - ret = send_stream(ctx, conn, stream_id, - wire_buf_data(ctx->payload.wire_buf), - wire_buf_data_length(ctx->payload.wire_buf), - // ctx->req->answer->wire, - // ctx->req->answer->size, - 0/* FIXME `fin` probably for client side "request sent" */, - &sent); - + // ret = send_stream(ctx, conn, stream_id, (uint8_t *)uo->buf + uf, + // uo->len - uf - ignore_last, fin, &sent); + + if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) { + ret = send_stream(ctx, conn, stream_id, + wire_buf_data(ctx->payload.wire_buf), + wire_buf_data_length(ctx->payload.wire_buf), + // ctx->req->answer->wire, + // ctx->req->answer->size, + 0/* FIXME `fin` probably for client side "request sent" */, + &sent); + + } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) { + kr_require(ctx->payload.iovec.cnt == 2); + ret = send_stream(ctx, conn, stream_id, + ctx->payload.iovec.iov[1].iov_base, + ctx->payload.iovec.iov[1].iov_len, + // ctx->req->answer->wire, + // ctx->req->answer->size, + 0/* fixme `fin` probably for client side "request sent" */, + &sent); + + } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) { + ret = send_stream(ctx, conn, stream_id, + ctx->payload.buffer.buf, + ctx->payload.buffer.len, + // ctx->req->answer->wire, + // ctx->req->answer->size, + 0/* fixme `fin` probably for client side "request sent" */, + &sent); + } else { + kr_assert(false && "Invalid payload type\n"); + return kr_error(EINVAL); + } if (ret < 0) { return ret; @@ -1492,11 +1591,9 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */, if (sent > 0 && ignore_last > 0) { sent++; } - - if (sent > 0) { - // TODO - // kr_quic_stream_mark_sent(conn, stream_id, sent); - } + // if (sent > 0) { + // kr_quic_stream_mark_sent(conn, stream_id, sent); + // } if (stream_msgs >= max_msgs / conn->streams_count) { stream_msgs = 0; @@ -1513,6 +1610,9 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */, return ret; } +/* For now we assume any iovec payload we get + * will just be a single (the second iovec, first one holds size) + * giant buffer. FIXME if proper iovec support ever comes. */ static enum protolayer_iter_cb_result pl_quic_wrap( void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx) @@ -1521,9 +1621,11 @@ static enum protolayer_iter_cb_result pl_quic_wrap( queue_push(quic->wrap_queue, ctx); ngtcp2_cid *dcid = ctx->comm_storage.target; + kr_log_info(DOQ, "Quic wrap prototype: %s\n", + protolayer_payload_name(ctx->payload.type)); + while (protolayer_queue_has_payload(&quic->wrap_queue)) { struct protolayer_iter_ctx *data = queue_head(quic->wrap_queue); - kr_log_info(DOQ, "queue_len: %zu\n", queue_len(quic->wrap_queue)); queue_pop(quic->wrap_queue); if (!data || !data->comm || !data->comm->target) { @@ -1535,34 +1637,27 @@ static enum protolayer_iter_cb_result pl_quic_wrap( kr_quic_conn_t *conn = kr_quic_table_lookup(dcid, quic->conn_table); if (!conn) { kr_log_warning(DOQ, "Missing associated connection\n"); - int ret = kr_quic_send(quic->conn_table, - conn, sess_data, ctx, 1, 0); + /* There is nothing we can do */ return protolayer_break(ctx, EINVAL /* TODO */); - // return -1; // TODO } + /* Here we will actually have payload to be sent out + * TODO assert that requirement? */ kr_quic_send(quic->conn_table, conn, sess_data, - data, + ctx, QUIC_MAX_SEND_PER_RECV, 0 /* no flags */); kr_log_info(DOQ, "About to continue from quic_wrap: %s\n", protolayer_payload_name(data->payload.type)); - return protolayer_continue(data); - // data->async_mode = true; - // protolayer_async(); - // return protolayer_continue(data); - // kr_log_info(DOQ, "protolayer_continue returned %d\n", ret); - // return protolayer_async(); + return protolayer_continue(ctx); } - /* We had nothing to send TODO error*/ + /* We had nothing to send TODO error */ return protolayer_break(ctx, kr_ok()); - // return protolayer_continue(ctx); - // return protolayer_break(ctx, PROTOLAYER_RET_NORMAL); } static enum protolayer_event_cb_result pl_quic_event_unwrap( @@ -1589,6 +1684,8 @@ static void pl_quic_request_init(struct session2 *session, req->qsource.comm_flags.quic = true; struct pl_quic_sess_data *quic = sess_data; quic->req = req; + + // req->qsource.stream_id = session->comm_storage.target; } __attribute__((constructor)) diff --git a/daemon/quic.h b/daemon/quic.h index 52f87c256..60480a02d 100644 --- a/daemon/quic.h +++ b/daemon/quic.h @@ -14,6 +14,7 @@ #include #include #include +#include "lib/generic/queue.h" #include "lib/log.h" #include "session2.h" #include "network.h" @@ -32,6 +33,14 @@ #include +// those are equivalent to contrib/ucw/lists.h just must not be included. +// typedef struct kr_quic_ucw_node { +// struct kr_quic_ucw_node *next, *prev; +// } kr_quic_ucw_node_t; +// typedef struct kr_quic_ucw_list { +// kr_quic_ucw_node_t head, tail; +// } kr_quic_ucw_list_t; + #define MAX_QUIC_FRAME_SIZE 65536 @@ -41,6 +50,8 @@ typedef enum { VERIFIED, // RTT-1 } quic_state_t; +typedef queue_t(struct kr_quic_obuf *) quic_out_q; + /** RFC 9250 4.3. DoQ Error Codes */ typedef enum { /*! No error. This is used when the connection or stream needs to be @@ -138,7 +149,8 @@ typedef struct kr_quic_table { } kr_quic_table_t; typedef struct kr_quic_obuf { - /*ucw_*/node_t node; + // struct kr_quic_ucw_list *node; + struct node node; size_t len; // struct wire_buf buf;? char buf[]; @@ -157,52 +169,26 @@ typedef struct kr_tcp_inbufs_upd_res { struct iovec inbufs[]; } kr_tcp_inbufs_udp_res_t; -/** - * Inbufs are useless for us since we always receive the entire pkt - * as a wire_buf. That means ngtcp2_conn_read_pkt should happily - * consume it and we can trim pkt_len off the wire_buf. - * - * In output we can also manage with just a single buffer, - * though this buffer has to remain over N pkts (protolayer cascades). - * We can store it in the connection. - * - * The reason we only need one output buffer thought we deal with two "types" of - * output data is because they share their outcome/what we do with them. - * - * First the output buffer has to store all data we sent out, only forgetting - * it once the data is explicitly acked bu the remote endpoint. We clear - * this acked data on each pkt read. If there is something left (something - * hasn't been acked) we behave the same as if the output buffer was empty. - * We just append the new response and send it all. - * - * Developer NOTE - * I might actually need inbuf queue. It might be possible - * that I'll receive more than one kr_quic_stream_recv_data - * callbacks for a single pkt. Then I could not handle with - * a single simple buffer. TODO: investigate - * - * I actually need a permanent buffer for inbuf. There is no guarantee that - * the DNS query will arrive in a single sream pkt. And I'd lose that - * bit then since I'd attempt to send it. damn, I really have to think - * about this more, the handshake pkt do behave differently to stream ones - * - * WARNING Turns out ngtcp2 buffers the unacked packets internally, - * so there is no reason for us to store them - * - */ struct kr_quic_stream { - struct iovec inbuf; + // struct iovec inbuf; struct wire_buf pers_inbuf; // struct kr_tcp_inbufs_upd_res *inbufs; size_t firstib_consumed; - // holds pointers to head and tail of knot_quic_obuf_t - /*ucw_*/queue_t(uint8_t *) outbufs; + /* ucw */struct list outbufs; + // /* ucw */struct kr_quic_ucw_list outbufs; + // /*ucw_*/queue_t(struct kr_quic_obuf) outbufs; // /*ucw_*/list_t outbufs; + + /* FIXME Properly implement everywhere + * kr_quic_stream_ack_data uses this to check the + * stream is really finished, without proper handling + * no stream will ever be deleted */ size_t obufs_size; - struct wire_buf *outbuf; - kr_quic_obuf_t *unsent_obuf; + // struct wire_buf *outbuf; + struct kr_quic_obuf *unsent_obuf; + // kr_quic_obuf_t *unsent_obuf; size_t first_offset; size_t unsent_offset; }; @@ -223,15 +209,17 @@ typedef struct kr_quic_conn { // crypto callbacks ngtcp2_crypto_conn_ref crypto_ref; - // QUIC stream abstraction - // TODO sentinel for streams? + // QUIC stream abstraction + // TODO sentinel for streams? struct kr_quic_stream *streams; - // number of allocated streams structures + // number of allocated streams structures int16_t streams_count; - // index of first stream that has complete incomming data to be processed (aka inbuf_fin) + // index of first stream that has complete incomming data to be processed (aka inbuf_fin) int16_t stream_inprocess; - // stream_id/4 of first allocated stream + // stream_id/4 of first allocated stream int64_t first_stream_id; + // count of streams with finished queries pending a resolution + uint16_t streams_pending; ngtcp2_ccerr last_error; kr_quic_conn_flag_t flags; @@ -250,6 +238,8 @@ typedef struct kr_quic_conn { // FIXME: could this be removed? struct kr_quic_table *quic_table; + struct wire_buf unwrap_buf + } kr_quic_conn_t; typedef struct pl_quic_sess_data { @@ -263,6 +253,7 @@ typedef struct pl_quic_sess_data { protolayer_iter_ctx_queue_t resend_queue; kr_quic_table_t *conn_table; + uint64_t first_stream_id; struct kr_request *req; // quic_state_t state;