]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/quic: support for multiple streams: collect finished queries into wire_buf...
authorFrantisek Tobias <frantisek.tobias@nic.cz>
Tue, 29 Jul 2025 07:23:59 +0000 (09:23 +0200)
committerFrantisek Tobias <frantisek.tobias@nic.cz>
Wed, 7 Jan 2026 13:38:01 +0000 (14:38 +0100)
daemon/quic.c
daemon/quic.h

index 03b2df5abca1f354e5611640d47a2aed867ed58f..b10820d5abc518b2da495f6e90c597e8a94a1672 100644 (file)
@@ -5,18 +5,19 @@
 #include <asm-generic/errno-base.h>
 #include <asm-generic/errno.h>
 #include <bits/types/struct_iovec.h>
-#include <errno.h>
 #include <gnutls/x509.h>
 #include <gnutls/gnutls.h>
 #include <gnutls/crypto.h>
 #include "lib/defines.h"
 #include "lib/log.h"
 #include "lib/resolve-impl.h"
+// #include "mempattern.h"
 #include "session2.h"
 #include "network.h"
 #include "lib/resolve.h"
 // #include "libknot/quic/quic.h"
 #include "libdnssec/random.h"
+#include <libknot/wire.h>
 #include <libknot/xdp/tcp_iobuf.h>
 #include <stdint.h>
 #include <contrib/ucw/heap.h>
@@ -162,10 +163,14 @@ kr_quic_conn_t *kr_quic_table_add(ngtcp2_conn *ngconn, const ngtcp2_cid *cid,
        if (conn == NULL)
                return NULL;
 
+       /* FIXME magic numbers */
+
+
        conn->conn = ngconn;
        conn->quic_table = table;
        conn->stream_inprocess = -1;
        conn->qlog_fd = -1;
+       wire_buf_init(&conn->unwrap_buf, 1200);
 
        conn->next_expiry = UINT64_MAX;
        if (!heap_insert(table->expiry_heap, (heap_val_t *)conn)) {
@@ -788,6 +793,8 @@ static int pl_quic_sess_init(struct session2 *session, void *sess_data, void *pa
                quic->conn_count = 0;
        }
 
+       // TODO set setings?
+
        return 0;
 }
 
@@ -1087,7 +1094,7 @@ finish:
 /**
  */
 static int handle_packet(struct pl_quic_sess_data *quic,
-               struct protolayer_iter_ctx *pkt_ctx,
+               struct protolayer_iter_ctx *ctx,
                const uint8_t *pkt, size_t pktlen, ngtcp2_cid *dcid,
                struct kr_quic_conn **out_conn)
 {
@@ -1104,11 +1111,12 @@ static int handle_packet(struct pl_quic_sess_data *quic,
        uint32_t supported_quic[1] = { NGTCP2_PROTO_VER_V1 };
        if (ret == NGTCP2_ERR_VERSION_NEGOTIATION) {
                // FIXME: This will be broken by trimming the pkt below
+               // this will need to be sent out immediatelly
                ngtcp2_pkt_write_version_negotiation(
-                       wire_buf_free_space(pkt_ctx->payload.wire_buf),
-                       wire_buf_free_space_length(pkt_ctx->payload.wire_buf),
+                       wire_buf_free_space(ctx->payload.wire_buf),
+                       wire_buf_free_space_length(ctx->payload.wire_buf),
                        random(),
-                       // FIXME: Maybe switch
+                       // WARNING Maybe switch
                        decoded_cids.scid,
                        decoded_cids.scidlen,
                        decoded_cids.dcid,
@@ -1133,13 +1141,13 @@ static int handle_packet(struct pl_quic_sess_data *quic,
        qconn = kr_quic_table_lookup(dcid, quic->conn_table);
 
        if (!qconn) {
-               if ((ret = quic_init_server_conn(quic->conn_table, pkt_ctx,
+               if ((ret = quic_init_server_conn(quic->conn_table, ctx,
                                 UINT64_MAX - 1, &scid, dcid, decoded_cids,
                                 pkt, pktlen, &qconn)) != kr_ok()) {
                        return ret;
                }
 
-               // if ((ret = wire_buf_trim(pkt_ctx->payload.wire_buf, pktlen)) != 0) {
+               // if ((ret = wire_buf_trim(ctx->payload.wire_buf, pktlen)) != 0) {
                //      kr_log_error(DOQ, "wirebuf failed to trim: %s (%d)\n",
                //                      kr_strerror(ret), ret);
                //      return kr_error(ret);
@@ -1165,8 +1173,10 @@ static int handle_packet(struct pl_quic_sess_data *quic,
        ret = ngtcp2_conn_read_pkt(qconn->conn,
                        path,
                        &pi,
-                       wire_buf_data(pkt_ctx->payload.wire_buf),
-                       wire_buf_data_length(pkt_ctx->payload.wire_buf),
+                       pkt,
+                       pktlen,
+                       // wire_buf_data(ctx->payload.wire_buf),
+                       // wire_buf_data_length(ctx->payload.wire_buf),
                        now);
 
        if (ret == NGTCP2_ERR_DRAINING) { // doq received CONNECTION_CLOSE from the counterpart
@@ -1193,17 +1203,17 @@ static int handle_packet(struct pl_quic_sess_data *quic,
 
        ngtcp2_conn_handle_expiry(qconn->conn, now);
 
-       if (wire_buf_trim(pkt_ctx->payload.wire_buf, wire_buf_data_length(pkt_ctx->payload.wire_buf))) {
+       if (wire_buf_trim(ctx->payload.wire_buf, pktlen)) {
                kr_log_error(DOQ, "Failed to trim wire_buf\n");
                return ret;
        }
 
-       if (kr_fails_assert(wire_buf_data_length(pkt_ctx->payload.wire_buf) == 0)) {
+       if (kr_fails_assert(wire_buf_data_length(ctx->payload.wire_buf) == 0)) {
                kr_log_error(DOQ, "read pkt should consume the entire packet\n");
                return -1; /* TODO errcode */
        }
 
-       // pkt_ctx->comm->target = &dcid;
+       // ctx->comm->target = &dcid;
 
        // kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0);
        return kr_ok();
@@ -1211,78 +1221,141 @@ static int handle_packet(struct pl_quic_sess_data *quic,
 
 void __attribute__ ((noinline)) empty_call(void) { }
 
+static int collect_queries(struct kr_quic_conn *qconn)
+{
+
+       // if (0/* use dns_dgram (iovec input)*/) {
+       //      // Perhaps make permanent for each conn;
+       //      struct iovec *iov = mm_calloc(&ctx->pool,
+       //                      qconn->streams_pending, sizeof(struct iovec));
+       //      kr_require(iov);
+       //      struct protolayer_payload payload = protolayer_payload_iovec(iov,
+       //                      0/* increase upon fill */, false);
+       //
+       //      int64_t stream_id;
+       //      struct kr_quic_stream *stream;
+       //      while (qconn != NULL && (stream = kr_quic_stream_get_process(qconn, &stream_id)) != NULL) {
+       //              assert(stream->inbufs != NULL);
+       //              assert(stream->inbufs->n_inbufs > 0);
+       //
+       //              uint16_t msg_size = knot_wire_read_u16(wire_buf_data(&stream->pers_inbuf));
+       //              payload.iovec.iov[payload.iovec.cnt].iov_base =
+       //                      wire_buf_data(&stream->pers_inbuf) + sizeof(uint16_t);
+       //
+       //              payload.iovec.iov[payload.iovec.cnt].iov_len = msg_size;
+       //              ++payload.iovec.cnt;
+       //      }
+       //
+       //      struct protolayer_iter_ctx *ctx_head = queue_head(quic->unwrap_queue);
+       //      if (!kr_fails_assert(ctx == ctx_head))
+       //              queue_pop(quic->unwrap_queue);
+       //
+       //      if (payload.iovec.cnt > 0) {
+       //              ctx->payload = payload;
+       //              return protolayer_continue(ctx);
+       //      }
+       //
+       //      mm_free(&ctx->pool, iov);
+       //      return protolayer_break(ctx, kr_error(ENODATA));
+
+       kr_require(wire_buf_data_length(&qconn->unwrap_buf) == 0);
+       size_t free_space = wire_buf_free_space_length(&qconn->unwrap_buf);
+
+       int64_t stream_id;
+       struct kr_quic_stream *stream;
+       while (qconn != NULL
+               && (stream = kr_quic_stream_get_process(qconn,
+                               &stream_id)) != NULL) {
+
+               size_t to_write = wire_buf_data_length(&stream->pers_inbuf);
+               kr_assert(to_write > 0);
+               if (to_write > free_space) {
+                       kr_log_error(DOQ, "unwrap buf is not big enough\n");
+                       return kr_error(ENOMEM);
+               }
+
+               memcpy(wire_buf_free_space(&qconn->unwrap_buf),
+                       wire_buf_data(&stream->pers_inbuf), to_write);
+               wire_buf_consume(&qconn->unwrap_buf, to_write);
+               wire_buf_trim(&stream->pers_inbuf, to_write);
+               /* TODO inspect assembly diff compared to repeated
+                * call of wire_buf_free_space_length
+                * should make no difference though*/
+               free_space -= to_write;
+       }
+
+       return wire_buf_data(&qconn->unwrap_buf) > 0
+               ? kr_ok() : kr_error(ENODATA);
+}
+
 static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data,
                void *iter_data, struct protolayer_iter_ctx *ctx)
 {
-       int ret;
-       struct pl_quic_sess_data *quic = sess_data;
+       int rv = kr_error(ENOSPC);
        kr_quic_conn_t *qconn = NULL;
+       struct pl_quic_sess_data *quic = sess_data;
 
        queue_push(quic->unwrap_queue, ctx);
 
        while (protolayer_queue_has_payload(&quic->unwrap_queue)) {
-               struct protolayer_iter_ctx *pkt_ctx = queue_head(quic->unwrap_queue);
-
-               queue_pop(quic->unwrap_queue);
-
-               kr_assert(pkt_ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF);
-               size_t pktlen = wire_buf_data_length(pkt_ctx->payload.wire_buf);
-               const uint8_t *pkt = wire_buf_data(pkt_ctx->payload.wire_buf);
+               kr_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF);
                ngtcp2_cid dcid = { 0 };
 
-               ret = handle_packet(quic,
-                               /** FIXME */pkt_ctx,
-                               pkt, pktlen, &dcid, &qconn);
+               int ret = handle_packet(quic,
+                               ctx,
+                               wire_buf_data(ctx->payload.wire_buf),
+                               wire_buf_data_length(ctx->payload.wire_buf),
+                               &dcid, &qconn);
+
                if (ret != kr_ok()) {
-                       protolayer_break(ctx, ret);
+                       goto fail;
                }
 
                /* TODO Verify this doesn't leak */
-               pkt_ctx->comm->target = mm_calloc(&ctx->pool, sizeof(ngtcp2_cid), 1);
-               /* TODO log failed allocation "iteration ctx ran out of memory" */
-               kr_require(pkt_ctx->comm->target);
-               memcpy(pkt_ctx->comm->target, &dcid, sizeof(ngtcp2_cid));
-
-               if (qconn->stream_inprocess >= 0) {
-                       // This branch is only accessed once a stream has
-                       // finished receiving a query (stream_inprocess received FIN)
-                       // TODO: protolayer_continue with the query in the first finished stream
-                       empty_call();
-                       struct kr_quic_stream *stream = kr_quic_conn_get_stream(
-                               qconn, qconn->stream_inprocess, true
-                       );
-
-                       if (stream == NULL) {
-                               return KNOT_ENOENT;
-                       }
+               ctx->comm->target = malloc(sizeof(ngtcp2_cid));
+               /* TODO log failed allocation "iterctx ran out of memory" */
+               kr_require(ctx->comm->target);
+               memcpy(ctx->comm->target, &dcid, sizeof(ngtcp2_cid));
+
+               if (qconn->stream_inprocess == -1) {
+                       /* This will produce a reposponse and pass it towards
+                        * lower layers (UDP or anything between the current
+                        * layer and udp). */
+                       kr_quic_send(quic->conn_table, qconn, quic,
+                                       ctx, QUIC_MAX_SEND_PER_RECV, 0);
+                       rv = kr_ok();
+                       goto fail;
+               }
 
-                       pkt_ctx->payload.wire_buf = &stream->pers_inbuf;
-                       kr_log_info(DOQ, "Proceeding protolayer_continue in quic\n");
-                       return protolayer_continue(pkt_ctx);
+               if (kr_fails_assert(queue_len(quic->unwrap_queue) == 1)) {
+                       ret = kr_error(EINVAL);
+                       goto fail;
                }
+       }
 
-               // ctx->comm->target = pkt_ctx->comm->target;
+       struct protolayer_iter_ctx *ctx_head = queue_head(quic->unwrap_queue);
+       if (!kr_fails_assert(ctx == ctx_head))
+               queue_pop(quic->unwrap_queue);
 
-               if (qconn->flags & KR_QUIC_CONN_HANDSHAKE_DONE) {
-                               // && qconn->flags & ) {
-                       // kr_log_info(DOQ, "Proceeding to next layer\n");
-                       kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0);
-                       // return protolayer_continue(pkt_ctx);
-               } else {
-                       // proceed with nodata handshake process
-                       kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0);
-               }
+       if ((rv = collect_queries(qconn)) == kr_ok()) {
+               ctx->payload = protolayer_payload_wire_buf(&qconn->unwrap_buf,
+                               false);
+               return protolayer_continue(ctx);
        }
 
-       // return protolayer_break(ctx, 0);
-       return protolayer_continue(ctx);
+       return protolayer_break(ctx, rv);
+
+fail:
+       ctx_head = queue_head(quic->unwrap_queue);
+       if (!kr_fails_assert(ctx == ctx_head))
+               queue_pop(quic->unwrap_queue);
+       return protolayer_break(ctx, rv);
 }
 
 /* TODO perhaps also move to quic_stream */
 static int send_stream(struct protolayer_iter_ctx *ctx,
                        kr_quic_conn_t *qconn, int64_t stream_id,
-                       uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent,
-                      )
+                       uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent)
 {
        assert(stream_id >= 0 || (data == NULL && len == 0));
 
@@ -1291,18 +1364,21 @@ static int send_stream(struct protolayer_iter_ctx *ctx,
                kr_log_info(DOQ, "Openning bidirectional stream no: %zu\n",
                                stream_id);
 
-               int ret = ngtcp2_conn_open_bidi_stream(qconn->conn, &opened, NULL);
+               int ret = ngtcp2_conn_open_bidi_stream(qconn->conn,
+                               &opened, NULL);
                if (ret != kr_ok()) {
                        /** This should not happen */
                        kr_log_info(DOQ, "remote endpoint isn't ready for streams: %s (%d)\n",
                                        ngtcp2_strerror(ret), ret);
                        return ret;
                }
-               assert((bool)(opened == stream_id) == kr_quic_stream_exists(qconn, stream_id));
+               assert((bool)(opened == stream_id)
+                               == kr_quic_stream_exists(qconn, stream_id));
        }
 
-       uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN :
-                                                NGTCP2_WRITE_STREAM_FLAG_NONE);
+       uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN
+                                              : NGTCP2_WRITE_STREAM_FLAG_NONE);
+
        ngtcp2_vec vec = { .base = data, .len = len };
        ngtcp2_pkt_info pi = { 0 };
 
@@ -1310,111 +1386,122 @@ static int send_stream(struct protolayer_iter_ctx *ctx,
 
        ngtcp2_conn_info info = { 0 };
        ngtcp2_conn_get_conn_info(qconn->conn, &info);
-
        int nwrite = 0;
-       nwrite = ngtcp2_conn_writev_stream(qconn->conn, path, &pi,
-                       wire_buf_free_space(ctx->payload.wire_buf),
-                       wire_buf_free_space_length(ctx->payload.wire_buf),
-                       sent, fl, stream_id, &vec,
-                       (stream_id >= 0 ? 1 : 0), quic_timestamp());
-
-       /* TODO:
-        * This packet may contain frames other than STREAM frame. The
-        * packet might not contain STREAM frame if other frames
-        * occupy the packet. In that case, *pdatalen would
-        * be -1 if pdatalen is not NULL.
-        */
-       // TODO: abstract error printing, likely shared across mane ngtcp2_ calls
-       if (nwrite < 0) {
-               switch (nwrite) {
-                       case NGTCP2_ERR_NOMEM:
-                               kr_log_error(DOQ, "write failed: %s (%d)",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               // TODO terminal
-
-                       case NGTCP2_ERR_STREAM_NOT_FOUND:
-                               kr_log_error(DOQ, "write stream failed to find: %s (%d)\n",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               // TODO terminal
-
-                       case NGTCP2_ERR_STREAM_SHUT_WR:
-                               kr_log_error(DOQ, "local write endpoint is shut or stream is beeing reset: %s (%d)\n",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               // TODO attempt later once (if reset)
-
-                       case NGTCP2_ERR_PKT_NUM_EXHAUSTED:
-                               kr_log_error(DOQ, "no more pkt numbers available: %s (%d)\n",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               // TODO terminal or reset pktn
-
-                       case NGTCP2_ERR_CALLBACK_FAILURE:
-                               kr_log_error(DOQ, "user callback failed: %s (%d)\n",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               // TODO attempt later
-
-                       case NGTCP2_ERR_INVALID_ARGUMENT:
-                               kr_log_error(DOQ, "The total length of stream data is too large: %s (%d)\n",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               // TODO attempt differently
-
-                       case NGTCP2_ERR_STREAM_DATA_BLOCKED:
-                               kr_log_error(DOQ, "stream is blocked due to flow controll: %s (%d)\n",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               // TODO attempt later
-
-                        /* only when NGTCP2_WRITE_STREAM_FLAG_MORE (currently not used) */
-                       case NGTCP2_ERR_WRITE_MORE:
-                               kr_log_error(DOQ, "should not happen: %s (%d)\n",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               kr_require(false);
-                       default:
-                               kr_log_error(DOQ, "unknown error in writev_stream: %s (%d)\n",
-                                               ngtcp2_strerror(nwrite), nwrite);
-                               kr_require(false);
-               }
 
-       } else if (*sent >= 0) { /** FIXME */ }
+       struct wire_buf *wb = mm_alloc(&ctx->pool, sizeof(struct wire_buf));
+       kr_require(wb);
 
-       if (nwrite == 0) {
-               return nwrite;
-       }
+       wire_buf_init(wb, 1200/* TODO perhaps maxudp payload */);
+       struct protolayer_payload pl = protolayer_payload_wire_buf(wb, false);
 
-       if (len) {
-               /* FIXME this is horrible */
-               if (wire_buf_trim(ctx->payload.wire_buf, len) != kr_ok()) {
-                       kr_log_error(DOQ, "Wrire buf failed to trim: %s (%d)\n",
+       do {
+               nwrite = ngtcp2_conn_writev_stream(qconn->conn, path, &pi,
+                               wire_buf_free_space(pl.wire_buf),
+                               wire_buf_free_space_length(pl.wire_buf),
+                               sent, fl, stream_id, &vec,
+                               (stream_id >= 0 ? 1 : 0), quic_timestamp());
+
+               /* TODO:
+                * This packet may contain frames other than STREAM frame. The
+                * packet might not contain STREAM frame if other frames
+                * occupy the packet. In that case, *pdatalen would
+                * be -1 if pdatalen is not NULL.
+                */
+               // TODO: abstract error printing, likely shared across mane ngtcp2_ calls
+               if (nwrite < 0) {
+                       mm_free(&ctx->pool, wb);
+                       switch (nwrite) {
+                               case NGTCP2_ERR_NOMEM:
+                                       kr_log_error(DOQ, "write failed: %s (%d)",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       // TODO terminal
+
+                               case NGTCP2_ERR_STREAM_NOT_FOUND:
+                                       kr_log_error(DOQ, "write stream failed to find: %s (%d)\n",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       // TODO terminal
+
+                               case NGTCP2_ERR_STREAM_SHUT_WR:
+                                       kr_log_error(DOQ, "local write endpoint is shut or stream is beeing reset: %s (%d)\n",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       return kr_ok();
+                                       // TODO attempt later once (if reset)
+
+                               case NGTCP2_ERR_PKT_NUM_EXHAUSTED:
+                                       kr_log_error(DOQ, "no more pkt numbers available: %s (%d)\n",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       // TODO terminal or reset pktn
+
+                               case NGTCP2_ERR_CALLBACK_FAILURE:
+                                       kr_log_error(DOQ, "user callback failed: %s (%d)\n",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       // TODO attempt later
+
+                               case NGTCP2_ERR_INVALID_ARGUMENT:
+                                       kr_log_error(DOQ, "The total length of stream data is too large: %s (%d)\n",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       // TODO attempt differently
+
+                               case NGTCP2_ERR_STREAM_DATA_BLOCKED:
+                                       kr_log_error(DOQ, "stream is blocked due to flow controll: %s (%d)\n",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       // TODO attempt later
+
+                                /* only when NGTCP2_WRITE_STREAM_FLAG_MORE (currently not used) */
+                               case NGTCP2_ERR_WRITE_MORE:
+                                       kr_log_error(DOQ, "should not happen: %s (%d)\n",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       kr_require(false);
+                               default:
+                                       kr_log_error(DOQ, "unknown error in writev_stream: %s (%d)\n",
+                                                       ngtcp2_strerror(nwrite), nwrite);
+                                       kr_require(false);
+                       }
+
+               } else if (*sent >= 0) {
+                       /* TODO this data has to be kept untill ack arives */
+                       vec.len -= *sent;
+               }
+
+               if (wire_buf_consume(pl.wire_buf, nwrite) != kr_ok()) {
+                       kr_log_error(DOQ, "Wire_buf failed to consume: %s (%d)\n",
                                        ngtcp2_strerror(nwrite), nwrite);
-                       return -1;
+                       mm_free(&ctx->pool, wb);
+                       return nwrite;
                }
-       }
 
-       if (wire_buf_consume(ctx->payload.wire_buf, nwrite) != kr_ok()) {
-               kr_log_error(DOQ, "Wire_buf failed to consume: %s (%d)\n",
-                               ngtcp2_strerror(nwrite), nwrite);
+       } while (nwrite && vec.len > 0);
+
+       ctx->payload = pl;
+
+       /* called from wrap, return the written amount and continue */
+       if (len) {
                return nwrite;
        }
 
        /* case HS has not finished, we have to switch to wrap direction
         * without proceeding to the resolve layer */
-       if (nwrite || *sent) {
-               // written++;
+       if (nwrite || *sent)  {
+               /* session2_wrap requires nonempty payload
+                * thats why we call it here, instead
+                * off calling wrap_after from unwrap and filling
+                * the buffer via this function called from wrap */
                int wrap_ret = session2_wrap_after(ctx->session,
                                PROTOLAYER_TYPE_QUIC,
-               // int wrap_ret = session2_wrap(ctx->session,
-
                                ctx->payload,
                                ctx->comm,
-                               // NULL,/*req*/
                                ctx->finished_cb,
                                ctx->finished_cb_baton);
 
 
-               return 1;
+               return nwrite;
        } else {
                kr_require(nwrite || *sent);
        }
 
-       return 0;
+       /* make sure return is negative 
+        * though the code shouldn't currently get here */
+       return nwrite <= 0 ? nwrite : -1;
 }
 
 // maybe rename kr_quic_respond?
@@ -1422,7 +1509,8 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
        kr_quic_conn_t *conn,
        /* kr_quic_reply_t *reply */void *sess_data,
        struct protolayer_iter_ctx *ctx,
-       unsigned max_msgs, kr_quic_send_flag_t flags)
+       unsigned max_msgs,
+       kr_quic_send_flag_t flags)
 {
        pl_quic_sess_data_t *quic = (pl_quic_sess_data_t *)sess_data;
 
@@ -1430,6 +1518,7 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
                return kr_error(EINVAL);
        } else if ((conn->flags & KR_QUIC_CONN_BLOCKED) && !(flags & KR_QUIC_SEND_IGNORE_BLOCKED)) {
                return kr_error(EINVAL);
+       // TODO
        // } else if (reply->handle_ret > 0) {
        //      return send_special(quic_table, reply, conn);
        } else if (conn == NULL) {
@@ -1445,43 +1534,53 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
        unsigned sent_msgs = 0, stream_msgs = 0, ignore_last = ((flags & KR_QUIC_SEND_IGNORE_LASTBYTE) ? 1 : 0);
        int ret = 1;
 
-       /* KnotDNS stores data to be written into a replay in the unsent_obuf
-        * since this will be called from pl_quic_wrap, and I'll have the
-        * payload in the form of a iovec anyway, I can just change the
-        * conditionals a bit and call send_Stream with the payload iovec */
+       /* This is really wasteful, we always have payload for one stream. */
        for (int64_t si = 0; si < conn->streams_count && sent_msgs < max_msgs; /* NO INCREMENT */) {
                int64_t stream_id = 4 * (conn->first_stream_id + si);
 
                ngtcp2_ssize sent = 0;
                size_t uf = conn->streams[si].unsent_offset;
                kr_quic_obuf_t *uo = conn->streams[si].unsent_obuf;
-
-               kr_require(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF);
-
                if (uo == NULL) {
                        si++;
                        // continue;
                }
 
                // bool fin = (((node_t *)uo->node.next)->next == NULL) && ignore_last == 0;
-               // size_t len = 5534u; /* size of the following buffer */
-               // uint8_t *data = /* alloc buffer */ NULL;
-               // ret = quic_package_payload(quic_table, ctx->payload, sess_data,
-               //              NULL, stream_id, data, len, fin, &sent);
-
-               // ret = send_stream(ctx, conn, stream_id,
-               //                   (uint8_t *)uo->buf + uf, uo->len - uf - ignore_last,
-               //                   0/* FIXME `fin` probably for client side "request sent" */,
-               //                &sent);
-
-               ret = send_stream(ctx, conn, stream_id,
-                               wire_buf_data(ctx->payload.wire_buf),
-                               wire_buf_data_length(ctx->payload.wire_buf),
-                               // ctx->req->answer->wire,
-                               // ctx->req->answer->size,
-                               0/* FIXME `fin` probably for client side "request sent" */,
-                               &sent);
-
+               // ret = send_stream(ctx, conn, stream_id, (uint8_t *)uo->buf + uf,
+               //                uo->len - uf - ignore_last, fin, &sent);
+
+               if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
+                       ret = send_stream(ctx, conn, stream_id,
+                                       wire_buf_data(ctx->payload.wire_buf),
+                                       wire_buf_data_length(ctx->payload.wire_buf),
+                                       // ctx->req->answer->wire,
+                                       // ctx->req->answer->size,
+                                       0/* FIXME `fin` probably for client side "request sent" */,
+                                       &sent);
+
+               } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
+                       kr_require(ctx->payload.iovec.cnt == 2);
+                       ret = send_stream(ctx, conn, stream_id,
+                                       ctx->payload.iovec.iov[1].iov_base,
+                                       ctx->payload.iovec.iov[1].iov_len,
+                                       // ctx->req->answer->wire,
+                                       // ctx->req->answer->size,
+                                       0/* fixme `fin` probably for client side "request sent" */,
+                                       &sent);
+
+               } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
+                       ret = send_stream(ctx, conn, stream_id,
+                                       ctx->payload.buffer.buf,
+                                       ctx->payload.buffer.len,
+                                       // ctx->req->answer->wire,
+                                       // ctx->req->answer->size,
+                                       0/* fixme `fin` probably for client side "request sent" */,
+                                       &sent);
+               } else {
+                       kr_assert(false && "Invalid payload type\n");
+                       return kr_error(EINVAL);
+               }
 
                if (ret < 0) {
                        return ret;
@@ -1492,11 +1591,9 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
                if (sent > 0 && ignore_last > 0) {
                        sent++;
                }
-
-               if (sent > 0) {
-                       // TODO
-                       // kr_quic_stream_mark_sent(conn, stream_id, sent);
-               }
+               // if (sent > 0) {
+               //      kr_quic_stream_mark_sent(conn, stream_id, sent);
+               // }
 
                if (stream_msgs >= max_msgs / conn->streams_count) {
                        stream_msgs = 0;
@@ -1513,6 +1610,9 @@ int kr_quic_send(kr_quic_table_t *quic_table /* FIXME maybe unused */,
        return ret;
 }
 
+/* For now we assume any iovec payload we get
+ * will just be a single (the second iovec, first one holds size)
+ * giant buffer. FIXME if proper iovec support ever comes. */
 static enum protolayer_iter_cb_result pl_quic_wrap(
                void *sess_data, void *iter_data,
                struct protolayer_iter_ctx *ctx)
@@ -1521,9 +1621,11 @@ static enum protolayer_iter_cb_result pl_quic_wrap(
        queue_push(quic->wrap_queue, ctx);
        ngtcp2_cid *dcid = ctx->comm_storage.target;
 
+       kr_log_info(DOQ, "Quic wrap prototype: %s\n",
+                       protolayer_payload_name(ctx->payload.type));
+
        while (protolayer_queue_has_payload(&quic->wrap_queue)) {
                struct protolayer_iter_ctx *data = queue_head(quic->wrap_queue);
-               kr_log_info(DOQ, "queue_len: %zu\n", queue_len(quic->wrap_queue));
 
                queue_pop(quic->wrap_queue);
                if (!data || !data->comm || !data->comm->target) {
@@ -1535,34 +1637,27 @@ static enum protolayer_iter_cb_result pl_quic_wrap(
                kr_quic_conn_t *conn = kr_quic_table_lookup(dcid, quic->conn_table);
                if (!conn) {
                        kr_log_warning(DOQ, "Missing associated connection\n");
-                       int ret = kr_quic_send(quic->conn_table,
-                                       conn, sess_data, ctx, 1, 0);
+                       /* There is nothing we can do */
                        return protolayer_break(ctx, EINVAL /* TODO */);
-                       // return -1; // TODO
                }
 
+               /* Here we will actually have payload to be sent out
+                * TODO assert that requirement? */
                kr_quic_send(quic->conn_table,
                                conn,
                                sess_data,
-                               data,
+                               ctx,
                                QUIC_MAX_SEND_PER_RECV,
                                0 /* no flags */);
 
                kr_log_info(DOQ, "About to continue from quic_wrap: %s\n",
                                protolayer_payload_name(data->payload.type));
 
-               return protolayer_continue(data);
-               // data->async_mode = true;
-               // protolayer_async();
-               // return protolayer_continue(data);
-               // kr_log_info(DOQ, "protolayer_continue returned %d\n", ret);
-               // return protolayer_async();
+               return protolayer_continue(ctx);
        }
 
-       /* We had nothing to send TODO error*/
+       /* We had nothing to send TODO error */
        return protolayer_break(ctx, kr_ok());
-       // return protolayer_continue(ctx);
-       // return protolayer_break(ctx, PROTOLAYER_RET_NORMAL);
 }
 
 static enum protolayer_event_cb_result pl_quic_event_unwrap(
@@ -1589,6 +1684,8 @@ static void pl_quic_request_init(struct session2 *session,
        req->qsource.comm_flags.quic = true;
        struct pl_quic_sess_data *quic = sess_data;
        quic->req = req;
+
+       // req->qsource.stream_id = session->comm_storage.target;
 }
 
 __attribute__((constructor))
index 52f87c256ae83cc742f7384ac67e77b4e030eb7d..60480a02daf4006e556c1b8e5736ca234c15471e 100644 (file)
@@ -14,6 +14,7 @@
 #include <gnutls/x509.h>
 #include <gnutls/gnutls.h>
 #include <gnutls/crypto.h>
+#include "lib/generic/queue.h"
 #include "lib/log.h"
 #include "session2.h"
 #include "network.h"
 
 #include <worker.h>
 
+// those are equivalent to contrib/ucw/lists.h  just must not be included.
+// typedef struct kr_quic_ucw_node {
+//     struct kr_quic_ucw_node *next, *prev;
+// } kr_quic_ucw_node_t;
+// typedef struct kr_quic_ucw_list {
+//     kr_quic_ucw_node_t head, tail;
+// } kr_quic_ucw_list_t;
+
 
 #define MAX_QUIC_FRAME_SIZE 65536
 
@@ -41,6 +50,8 @@ typedef enum {
        VERIFIED,  // RTT-1
 } quic_state_t;
 
+typedef queue_t(struct kr_quic_obuf *) quic_out_q;
+
 /** RFC 9250 4.3.  DoQ Error Codes */
 typedef enum {
        /*! No error.  This is used when the connection or stream needs to be
@@ -138,7 +149,8 @@ typedef struct kr_quic_table {
 } kr_quic_table_t;
 
 typedef struct kr_quic_obuf {
-       /*ucw_*/node_t node;
+       // struct kr_quic_ucw_list *node;
+       struct node node;
        size_t len;
        // struct wire_buf buf;?
        char buf[];
@@ -157,52 +169,26 @@ typedef struct kr_tcp_inbufs_upd_res {
        struct iovec inbufs[];
 } kr_tcp_inbufs_udp_res_t;
 
-/**
- * Inbufs are useless for us since we always receive the entire pkt
- * as a wire_buf. That means ngtcp2_conn_read_pkt should happily
- * consume it and we can trim pkt_len off the wire_buf.
- *
- * In output we can also manage with just a single buffer,
- * though this buffer has to remain over N pkts (protolayer cascades).
- * We can store it in the connection.
- *
- * The reason we only need one output buffer thought we deal with two "types" of
- * output data is because they share their outcome/what we do with them.
- *
- * First the output buffer has to store all data we sent out, only forgetting
- * it once the data is explicitly acked bu the remote endpoint. We clear
- * this acked data on each pkt read. If there is something left (something
- * hasn't been acked) we behave the same as if the output buffer was empty.
- * We just append the new response and send it all.
- *
- * Developer NOTE
- * I might actually need inbuf queue. It might be possible
- * that I'll receive more than one kr_quic_stream_recv_data
- * callbacks for a single pkt. Then I could not handle with
- * a single simple buffer. TODO: investigate
- *
- * I actually need a permanent buffer for inbuf. There is no guarantee that
- * the DNS query will arrive in a single sream pkt. And I'd lose that
- * bit then since I'd attempt to send it. damn, I really have to think
- * about this more, the handshake pkt do behave differently to stream ones
- *
- * WARNING Turns out ngtcp2 buffers the unacked packets internally,
- * so there is no reason for us to store them 
- *
- */
 struct kr_quic_stream {
-       struct iovec inbuf;
+       // struct iovec inbuf;
        struct wire_buf pers_inbuf;
        // struct kr_tcp_inbufs_upd_res *inbufs;
 
        size_t firstib_consumed;
-       // holds pointers to head and tail of knot_quic_obuf_t
-       /*ucw_*/queue_t(uint8_t *) outbufs;
+       /* ucw */struct list outbufs;
+       // /* ucw */struct kr_quic_ucw_list outbufs;
+       // /*ucw_*/queue_t(struct kr_quic_obuf) outbufs;
        // /*ucw_*/list_t outbufs;
+
+       /* FIXME Properly implement everywhere
+        * kr_quic_stream_ack_data uses this to check the
+        * stream is really finished, without proper handling
+        * no stream will ever be deleted */
        size_t obufs_size;
 
-       struct wire_buf *outbuf;
-       kr_quic_obuf_t *unsent_obuf;
+       // struct wire_buf *outbuf;
+       struct kr_quic_obuf *unsent_obuf;
+       // kr_quic_obuf_t *unsent_obuf;
        size_t first_offset;
        size_t unsent_offset;
 };
@@ -223,15 +209,17 @@ typedef struct kr_quic_conn {
        // crypto callbacks
        ngtcp2_crypto_conn_ref crypto_ref;
 
-        // QUIC stream abstraction
-        // TODO sentinel for streams?
+       // QUIC stream abstraction
+       // TODO sentinel for streams?
        struct kr_quic_stream *streams;
-        // number of allocated streams structures
+       // number of allocated streams structures
        int16_t streams_count;
-        // index of first stream that has complete incomming data to be processed (aka inbuf_fin)
+       // index of first stream that has complete incomming data to be processed (aka inbuf_fin)
        int16_t stream_inprocess;
-        // stream_id/4 of first allocated stream
+       // stream_id/4 of first allocated stream
        int64_t first_stream_id;
+       // count of streams with finished queries pending a resolution
+       uint16_t streams_pending;
 
        ngtcp2_ccerr last_error;
        kr_quic_conn_flag_t flags;
@@ -250,6 +238,8 @@ typedef struct kr_quic_conn {
        // FIXME: could this be removed?
        struct kr_quic_table *quic_table;
 
+       struct wire_buf unwrap_buf
+
 } kr_quic_conn_t;
 
 typedef struct pl_quic_sess_data {
@@ -263,6 +253,7 @@ typedef struct pl_quic_sess_data {
        protolayer_iter_ctx_queue_t resend_queue;
 
        kr_quic_table_t *conn_table;
+       uint64_t first_stream_id;
 
        struct kr_request *req;
        // quic_state_t state;