]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/quic: store dcid and stream_id of the latest query in comm->target
authorFrantisek Tobias <frantisek.tobias@nic.cz>
Thu, 31 Jul 2025 10:53:22 +0000 (12:53 +0200)
committerFrantisek Tobias <frantisek.tobias@nic.cz>
Wed, 7 Jan 2026 13:38:01 +0000 (14:38 +0100)
daemon/quic.c
daemon/quic.h

index b10820d5abc518b2da495f6e90c597e8a94a1672..3e0c6dcdb660c94a0731fa313931b9e455057fd7 100644 (file)
@@ -221,7 +221,6 @@ static int kr_recv_stream_data_cb(ngtcp2_conn *conn, uint32_t flags,
 
        struct kr_quic_conn *qconn = (struct kr_quic_conn *)user_data;
        assert(ctx->conn == conn);
-       kr_log_info(DOQ, "recved stream data: %s\n", data);
 
        int ret = kr_quic_stream_recv_data(qconn, stream_id, data, datalen,
                        (flags & NGTCP2_STREAM_DATA_FLAG_FIN));
@@ -229,25 +228,6 @@ static int kr_recv_stream_data_cb(ngtcp2_conn *conn, uint32_t flags,
        return ret == KNOT_EOK ? 0 : NGTCP2_ERR_CALLBACK_FAILURE;
 }
 
-
-// TODO Will likely be removed once the proper buffer scheme for
-// pl is figured out
-uint64_t buffer_alloc_size(uint64_t buffer_len)
-{
-       if (buffer_len == 0) {
-               return 0;
-       }
-       buffer_len -= 1;
-       buffer_len |= 0x3f; // the result will be at least 64
-       buffer_len |= (buffer_len >> 1);
-       buffer_len |= (buffer_len >> 2);
-       buffer_len |= (buffer_len >> 4);
-       buffer_len |= (buffer_len >> 8);
-       buffer_len |= (buffer_len >> 16);
-       buffer_len |= (buffer_len >> 32);
-       return buffer_len + 1;
-}
-
 void kr_quic_table_rem2(kr_quic_cid_t **pcid, kr_quic_table_t *table)
 {
        kr_quic_cid_t *cid = *pcid;
@@ -549,8 +529,8 @@ static int conn_new_handler(ngtcp2_conn **pconn, const ngtcp2_path *path,
        // ignore for now; allow any
        params.max_idle_timeout = 0;
 
-       params.stateless_reset_token_present = 1;
-       params.active_connection_id_limit = 8;
+       // params.stateless_reset_token_present = 1;
+       // params.active_connection_id_limit = 8;
 
        if (odcid != NULL) {
                params.original_dcid = *odcid;
@@ -1221,7 +1201,8 @@ static int handle_packet(struct pl_quic_sess_data *quic,
 
 void __attribute__ ((noinline)) empty_call(void) { }
 
-static int collect_queries(struct kr_quic_conn *qconn)
+static int collect_queries(struct protolayer_iter_ctx *ctx,
+               struct kr_quic_conn *qconn, struct quic_target *target)
 {
 
        // if (0/* use dns_dgram (iovec input)*/) {
@@ -1260,6 +1241,7 @@ static int collect_queries(struct kr_quic_conn *qconn)
 
        kr_require(wire_buf_data_length(&qconn->unwrap_buf) == 0);
        size_t free_space = wire_buf_free_space_length(&qconn->unwrap_buf);
+       uint16_t queries_agregated = 0;
 
        int64_t stream_id;
        struct kr_quic_stream *stream;
@@ -1268,6 +1250,17 @@ static int collect_queries(struct kr_quic_conn *qconn)
                                &stream_id)) != NULL) {
 
                size_t to_write = wire_buf_data_length(&stream->pers_inbuf);
+               ctx->payload = protolayer_payload_wire_buf(&stream->pers_inbuf,
+                               false);
+
+               target->stream_id = stream_id;
+               ctx->comm->target = target;
+               // session2_unwrap_after(ctx->session, PROTOLAYER_TYPE_QUIC,
+               //              protolayer_payload_wire_buf(&stream->pers_inbuf, false),
+               //              ctx->comm,
+               //              ctx->finished_cb,
+               //              ctx->finished_cb_baton);
+
                kr_assert(to_write > 0);
                if (to_write > free_space) {
                        kr_log_error(DOQ, "unwrap buf is not big enough\n");
@@ -1278,14 +1271,11 @@ static int collect_queries(struct kr_quic_conn *qconn)
                        wire_buf_data(&stream->pers_inbuf), to_write);
                wire_buf_consume(&qconn->unwrap_buf, to_write);
                wire_buf_trim(&stream->pers_inbuf, to_write);
-               /* TODO inspect assembly diff compared to repeated
-                * call of wire_buf_free_space_length
-                * should make no difference though*/
                free_space -= to_write;
+               ++queries_agregated;
        }
 
-       return wire_buf_data(&qconn->unwrap_buf) > 0
-               ? kr_ok() : kr_error(ENODATA);
+       return queries_agregated;
 }
 
 static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data,
@@ -1296,6 +1286,10 @@ static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data,
        struct pl_quic_sess_data *quic = sess_data;
 
        queue_push(quic->unwrap_queue, ctx);
+       /* TODO Verify this doesn't leak */
+       struct quic_target *target = malloc(sizeof(struct quic_target));
+       /* TODO log failed allocation "iterctx ran out of memory" */
+       kr_require(target);
 
        while (protolayer_queue_has_payload(&quic->unwrap_queue)) {
                kr_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF);
@@ -1311,11 +1305,7 @@ static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data,
                        goto fail;
                }
 
-               /* TODO Verify this doesn't leak */
-               ctx->comm->target = malloc(sizeof(ngtcp2_cid));
-               /* TODO log failed allocation "iterctx ran out of memory" */
-               kr_require(ctx->comm->target);
-               memcpy(ctx->comm->target, &dcid, sizeof(ngtcp2_cid));
+               memcpy(&target->dcid, &dcid, sizeof(ngtcp2_cid));
 
                if (qconn->stream_inprocess == -1) {
                        /* This will produce a reposponse and pass it towards
@@ -1337,18 +1327,21 @@ static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data,
        if (!kr_fails_assert(ctx == ctx_head))
                queue_pop(quic->unwrap_queue);
 
-       if ((rv = collect_queries(qconn)) == kr_ok()) {
+       // rv = collect_queries(ctx, qconn, target);
+       if ((rv = collect_queries(ctx, qconn, target)) > 0) {
                ctx->payload = protolayer_payload_wire_buf(&qconn->unwrap_buf,
                                false);
                return protolayer_continue(ctx);
        }
 
+       // return protolayer_continue(ctx);
        return protolayer_break(ctx, rv);
 
 fail:
        ctx_head = queue_head(quic->unwrap_queue);
        if (!kr_fails_assert(ctx == ctx_head))
                queue_pop(quic->unwrap_queue);
+       free(target); // We asume the folowing protolayers are synchronous
        return protolayer_break(ctx, rv);
 }
 
@@ -1367,13 +1360,11 @@ static int send_stream(struct protolayer_iter_ctx *ctx,
                int ret = ngtcp2_conn_open_bidi_stream(qconn->conn,
                                &opened, NULL);
                if (ret != kr_ok()) {
-                       /** This should not happen */
-                       kr_log_info(DOQ, "remote endpoint isn't ready for streams: %s (%d)\n",
+                       kr_log_warning(DOQ, "remote endpoint isn't ready for streams: %s (%d)\n",
                                        ngtcp2_strerror(ret), ret);
                        return ret;
                }
-               assert((bool)(opened == stream_id)
-                               == kr_quic_stream_exists(qconn, stream_id));
+               kr_require((bool)(opened == stream_id) == kr_quic_stream_exists(qconn, stream_id));
        }
 
        uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN
@@ -1499,7 +1490,7 @@ static int send_stream(struct protolayer_iter_ctx *ctx,
                kr_require(nwrite || *sent);
        }
 
-       /* make sure return is negative 
+       /* make sure return is negative
         * though the code shouldn't currently get here */
        return nwrite <= 0 ? nwrite : -1;
 }
@@ -1534,7 +1525,6 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
        unsigned sent_msgs = 0, stream_msgs = 0, ignore_last = ((flags & KR_QUIC_SEND_IGNORE_LASTBYTE) ? 1 : 0);
        int ret = 1;
 
-       /* This is really wasteful, we always have payload for one stream. */
        for (int64_t si = 0; si < conn->streams_count && sent_msgs < max_msgs; /* NO INCREMENT */) {
                int64_t stream_id = 4 * (conn->first_stream_id + si);
 
@@ -1543,44 +1533,12 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
                kr_quic_obuf_t *uo = conn->streams[si].unsent_obuf;
                if (uo == NULL) {
                        si++;
-                       // continue;
+                       continue;
                }
 
-               // bool fin = (((node_t *)uo->node.next)->next == NULL) && ignore_last == 0;
-               // ret = send_stream(ctx, conn, stream_id, (uint8_t *)uo->buf + uf,
-               //                uo->len - uf - ignore_last, fin, &sent);
-
-               if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
-                       ret = send_stream(ctx, conn, stream_id,
-                                       wire_buf_data(ctx->payload.wire_buf),
-                                       wire_buf_data_length(ctx->payload.wire_buf),
-                                       // ctx->req->answer->wire,
-                                       // ctx->req->answer->size,
-                                       0/* FIXME `fin` probably for client side "request sent" */,
-                                       &sent);
-
-               } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
-                       kr_require(ctx->payload.iovec.cnt == 2);
-                       ret = send_stream(ctx, conn, stream_id,
-                                       ctx->payload.iovec.iov[1].iov_base,
-                                       ctx->payload.iovec.iov[1].iov_len,
-                                       // ctx->req->answer->wire,
-                                       // ctx->req->answer->size,
-                                       0/* fixme `fin` probably for client side "request sent" */,
-                                       &sent);
-
-               } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
-                       ret = send_stream(ctx, conn, stream_id,
-                                       ctx->payload.buffer.buf,
-                                       ctx->payload.buffer.len,
-                                       // ctx->req->answer->wire,
-                                       // ctx->req->answer->size,
-                                       0/* fixme `fin` probably for client side "request sent" */,
-                                       &sent);
-               } else {
-                       kr_assert(false && "Invalid payload type\n");
-                       return kr_error(EINVAL);
-               }
+               bool fin = (((node_t *)uo->node.next)->next == NULL) && ignore_last == 0;
+               ret = send_stream(ctx, conn, stream_id, uo->buf + uf,
+                                 uo->len - uf - ignore_last, fin, &sent);
 
                if (ret < 0) {
                        return ret;
@@ -1591,9 +1549,9 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
                if (sent > 0 && ignore_last > 0) {
                        sent++;
                }
-               // if (sent > 0) {
-               //      kr_quic_stream_mark_sent(conn, stream_id, sent);
-               // }
+               if (sent > 0) {
+                       kr_quic_stream_mark_sent(conn, stream_id, sent);
+               }
 
                if (stream_msgs >= max_msgs / conn->streams_count) {
                        stream_msgs = 0;
@@ -1607,7 +1565,7 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
        }
 
        ngtcp2_conn_update_pkt_tx_time(conn->conn, quic_timestamp());
-       return ret;
+       return sent_msgs;
 }
 
 /* For now we assume any iovec payload we get
@@ -1617,9 +1575,12 @@ static enum protolayer_iter_cb_result pl_quic_wrap(
                void *sess_data, void *iter_data,
                struct protolayer_iter_ctx *ctx)
 {
+       int rv;
        pl_quic_sess_data_t *quic = sess_data;
        queue_push(quic->wrap_queue, ctx);
-       ngtcp2_cid *dcid = ctx->comm_storage.target;
+       struct quic_target *target = ctx->comm_storage.target;
+       ngtcp2_cid *dcid = &target->dcid;
+       uint64_t stream_id = target->stream_id;
 
        kr_log_info(DOQ, "Quic wrap prototype: %s\n",
                        protolayer_payload_name(ctx->payload.type));
@@ -1641,9 +1602,19 @@ static enum protolayer_iter_cb_result pl_quic_wrap(
                        return protolayer_break(ctx, EINVAL /* TODO */);
                }
 
+
+               knot_pkt_t *ans = kr_request_ensure_answer(ctx->req);
+               kr_require(ans != NULL);
+
+               kr_require(data->payload.type == PROTOLAYER_PAYLOAD_IOVEC);
+               kr_quic_stream_add_data(conn, stream_id,
+                               &data->payload);
+                               // data->payload.iovec.iov[i].iov_base,
+                               // data->payload.iovec.iov[i].iov_len);
+
                /* Here we will actually have payload to be sent out
-                * TODO assert that requirement? */
-               kr_quic_send(quic->conn_table,
+                * TODO assert that? */
+               rv = kr_quic_send(quic->conn_table,
                                conn,
                                sess_data,
                                ctx,
@@ -1653,6 +1624,9 @@ static enum protolayer_iter_cb_result pl_quic_wrap(
                kr_log_info(DOQ, "About to continue from quic_wrap: %s\n",
                                protolayer_payload_name(data->payload.type));
 
+               if (rv == 0)
+                       break;
+
                return protolayer_continue(ctx);
        }
 
@@ -1682,8 +1656,8 @@ static void pl_quic_request_init(struct session2 *session,
 {
        kr_log_warning(DOQ, "IN request init\n");
        req->qsource.comm_flags.quic = true;
-       struct pl_quic_sess_data *quic = sess_data;
-       quic->req = req;
+       // struct pl_quic_sess_data *quic = sess_data;
+       // quic->req = req;
 
        // req->qsource.stream_id = session->comm_storage.target;
 }
index 60480a02daf4006e556c1b8e5736ca234c15471e..a0f3f4f3d7d903e82578b494490081bc8d897307 100644 (file)
@@ -111,6 +111,13 @@ struct pl_quic_state {
        /* struct ortt_ NOTE: Or some other data */ ;
 };
 
+/* here the dcid and per query stream_id
+ * gets stored in quic_unwrap and read in quic_wrap */
+struct quic_target {
+       ngtcp2_cid dcid;
+       uint64_t stream_id;
+};
+
 struct kr_quic_conn;
 typedef struct kr_quic_cid {
        uint8_t cid_placeholder[32];
@@ -153,7 +160,7 @@ typedef struct kr_quic_obuf {
        struct node node;
        size_t len;
        // struct wire_buf buf;?
-       char buf[];
+       uint8_t buf[];
 } kr_quic_obuf_t;
 
 typedef enum {