]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/quic: move stream functions to separate file, replace quic_ctx with struct...
authorFrantisek Tobias <frantisek.tobias@nic.cz>
Mon, 7 Jul 2025 08:15:21 +0000 (10:15 +0200)
committerFrantisek Tobias <frantisek.tobias@nic.cz>
Wed, 7 Jan 2026 13:38:01 +0000 (14:38 +0100)
daemon/meson.build
daemon/quic.c
daemon/quic.h
daemon/quic_stream.c [new file with mode: 0644]
daemon/quic_stream.h [new file with mode: 0644]

index b9523ebd08f1ab8add156ff266f21f94f7e18da7..7f1ff0676c4e236afea106bc7a2737739b9af20d 100644 (file)
@@ -24,6 +24,7 @@ kresd_src = files([
   'worker.c',
   'zimport.c',
   'quic.c',
+  'quic_stream.c',
 ])
 
 if nghttp2.found()
index dcb78a4fac5b703495608bf64bf6eb8ba196e15a..d70a06d73511082c08d4b8f70e8936001bf0a8c2 100644 (file)
@@ -2,7 +2,6 @@
  *  SPDX-License-Identifier: GPL-3.0-or-later
  */
 
-#include "quic.h"
 #include <asm-generic/errno-base.h>
 #include <asm-generic/errno.h>
 #include <bits/types/struct_iovec.h>
 // FIXME: For now just to perform the pin check once HS finishes
 #include <libknot/quic/tls_common.h>
 
+#include "quic.h"
+#include "quic_stream.h"
+
 static uint64_t cid2hash(const ngtcp2_cid *cid, kr_quic_table_t *table);
 kr_quic_conn_t *kr_quic_table_lookup(const ngtcp2_cid *cid, kr_quic_table_t *table);
 kr_quic_cid_t **kr_quic_table_lookup2(const ngtcp2_cid *cid, kr_quic_table_t *table);
 kr_quic_cid_t **kr_quic_table_insert(kr_quic_conn_t *conn, const ngtcp2_cid *cid,
                                     kr_quic_table_t *table);
-kr_quic_stream_t *kr_quic_conn_get_stream(kr_quic_conn_t *conn,
-                                         int64_t stream_id, bool create);
 static int pl_quic_client_init(struct session2 *session,
                               pl_quic_sess_data_t *quic,
                               tls_client_param_t *param);
@@ -54,9 +54,15 @@ int kr_quic_send(kr_quic_table_t *quic_table, kr_quic_conn_t *conn,
                    /* kr_quic_reply_t *reply */void *sess_data,
                   struct protolayer_iter_ctx *ctx,
                   unsigned max_msgs, kr_quic_send_flag_t flags);
-
 uint64_t quic_timestamp(void);
 
+#define set_application_error(ctx, error_code, reason, reason_len) \
+       ngtcp2_ccerr_set_application_error(&(ctx)->last_err, \
+               error_code, reason, reason_len)
+#define set_transport_error(ctx, error_code, reason, reason_len) \
+       ngtcp2_ccerr_set_transport_error(&(ctx)->last_err, \
+               error_code, reason, reason_len)
+
 static int cmp_expiry_heap_nodes(void *c1, void *c2)
 {
        if (((kr_quic_conn_t *)c1)->next_expiry < ((kr_quic_conn_t *)c2)->next_expiry)
@@ -164,134 +170,23 @@ kr_quic_cid_t **kr_quic_table_insert(kr_quic_conn_t *conn, const ngtcp2_cid *cid
        return addto;
 }
 
-// int kr_quic_stream_recv_data(kr_quic_conn_t *conn, int64_t stream_id,
-//                                const uint8_t *data, size_t len, bool fin)
-// {
-//     if (len == 0 || conn == NULL || data == NULL) {
-//             return KNOT_EINVAL;
-//     }
-//
-//     kr_quic_stream_t *stream = kr_quic_conn_get_stream(conn, stream_id, true);
-//     if (stream == NULL) {
-//             return KNOT_ENOENT;
-//     }
-//
-//     struct iovec in = { (void *)data, len };
-//     ssize_t prev_ibufs_size = conn->ibufs_size;
-//     int ret = kr_tcp_inbufs_upd(&stream->inbuf, in, true,
-//                                   &stream->inbufs, &conn->ibufs_size);
-//     conn->quic_table->ibufs_size += (ssize_t)conn->ibufs_size - prev_ibufs_size;
-//     if (ret != KNOT_EOK) {
-//             return ret;
-//     }
-//
-//     if (fin && stream->inbufs == NULL) {
-//             return KNOT_ESEMCHECK;
-//     }
-//
-//     if (stream->inbufs != NULL) {
-//             stream_inprocess(conn, stream);
-//     }
-//     return KNOT_EOK;
-// }
-
 static int kr_recv_stream_data_cb(ngtcp2_conn *conn, uint32_t flags,
        int64_t stream_id, uint64_t offset, const uint8_t *data,
        size_t datalen, void *user_data, void *stream_user_data)
 {
-       (void)conn;
-       (void)flags;
-       (void)offset;
-       (void)stream_user_data;
-
-       quic_ctx_t *ctx = (quic_ctx_t *)user_data;
-
-       if (stream_id != ctx->stream.id) {
-               // const uint8_t msg[] = "Unknown stream";
-               // set_application_error(ctx, DOQ_PROTOCOL_ERROR, msg, sizeof(msg) - 1);
-               return NGTCP2_ERR_CALLBACK_FAILURE;
-       }
-
-       struct iovec in = {
-               .iov_base = (uint8_t *)data,
-               .iov_len = datalen
-       };
+       (void)(stream_user_data); // always NULL
+       (void)(offset); // QUIC shall ensure that data arrive in-order
 
-       int ret = 0;
-       // int ret = knot_tcp_inbufs_upd(&ctx->stream.in_buffer, in, true,
-       //                               &ctx->stream.in_parsed,
-       //                               &ctx->stream.in_parsed_total);
+       struct kr_quic_conn *qconn = (struct kr_quic_conn *)user_data;
+       assert(ctx->conn == conn);
 
-       if (ret != KNOT_EOK) {
-               // const uint8_t msg[] = "Malformed payload";
-               // set_application_error(ctx, DOQ_PROTOCOL_ERROR, msg, sizeof(msg) - 1);
-               return NGTCP2_ERR_CALLBACK_FAILURE;
-       }
+       int ret = kr_quic_stream_recv_data(qconn, stream_id, data, datalen,
+                                            (flags & NGTCP2_STREAM_DATA_FLAG_FIN));
 
-       ctx->stream.in_parsed_it = 0;
-       return 0;
+       return ret == KNOT_EOK ? 0 : NGTCP2_ERR_CALLBACK_FAILURE;
 }
 
 
-kr_quic_stream_t *kr_quic_conn_get_stream(kr_quic_conn_t *conn,
-                                              int64_t stream_id, bool create)
-{
-       if (stream_id % 4 != 0 || conn == NULL) {
-               return NULL;
-       }
-       stream_id /= 4;
-
-       if (conn->first_stream_id > stream_id) {
-               return NULL;
-       }
-       if (conn->streams_count > stream_id - conn->first_stream_id) {
-               return &conn->streams[stream_id - conn->first_stream_id];
-       }
-
-       if (create) {
-               size_t new_streams_count;
-               kr_quic_stream_t *new_streams;
-
-               if (conn->streams_count == 0) {
-                       new_streams = malloc(sizeof(new_streams[0]));
-                       if (new_streams == NULL) {
-                               return NULL;
-                       }
-                       new_streams_count = 1;
-                       conn->first_stream_id = stream_id;
-               } else {
-                       new_streams_count = stream_id + 1 - conn->first_stream_id;
-                       if (new_streams_count > MAX_STREAMS_PER_CONN) {
-                               return NULL;
-                       }
-                       new_streams = realloc(conn->streams, new_streams_count * sizeof(*new_streams));
-                       if (new_streams == NULL) {
-                               return NULL;
-                       }
-               }
-
-               for (kr_quic_stream_t *si = new_streams;
-                    si < new_streams + conn->streams_count; si++) {
-                       if (si->obufs_size == 0) {
-                               init_list(&si->outbufs);
-                       } else {
-                               fix_list(&si->outbufs);
-                       }
-               }
-
-               for (kr_quic_stream_t *si = new_streams + conn->streams_count;
-                    si < new_streams + new_streams_count; si++) {
-                       memset(si, 0, sizeof(*si));
-                       init_list(&si->outbufs);
-               }
-               conn->streams = new_streams;
-               conn->streams_count = new_streams_count;
-
-               return &conn->streams[stream_id - conn->first_stream_id];
-       }
-       return NULL;
-}
-
 // TODO Will likely be removed once the proper buffer scheme for
 // pl is figured out
 uint64_t buffer_alloc_size(uint64_t buffer_len)
@@ -310,79 +205,6 @@ uint64_t buffer_alloc_size(uint64_t buffer_len)
        return buffer_len + 1;
 }
 
-void kr_quic_stream_ack_data(kr_quic_conn_t *conn, int64_t stream_id,
-                               size_t end_acked, bool keep_stream)
-{
-       kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false);
-       if (s == NULL) {
-               return;
-       }
-
-       list_t *obs = &s->outbufs;
-
-       kr_quic_obuf_t *first;
-       while (!EMPTY_LIST(*obs) && end_acked >= (first = HEAD(*obs))->len + s->first_offset) {
-               rem_node((node_t *)first);
-               s->obufs_size -= first->len;
-               conn->obufs_size -= first->len;
-               conn->quic_table->obufs_size -= first->len;
-               s->first_offset += first->len;
-               free(first);
-               if (s->unsent_obuf == first) {
-                       s->unsent_obuf = EMPTY_LIST(*obs) ? NULL : HEAD(*obs);
-                       s->unsent_offset = 0;
-               }
-       }
-
-       if (EMPTY_LIST(*obs) && !keep_stream) {
-               // stream_outprocess(conn, s);
-               memset(s, 0, sizeof(*s));
-               init_list(&s->outbufs);
-               while (s = &conn->streams[0], s->inbuf.iov_len == 0 && s->inbufs == NULL && s->obufs_size == 0) {
-                       assert(conn->streams_count > 0);
-                       conn->streams_count--;
-
-                       if (conn->streams_count == 0) {
-                               free(conn->streams);
-                               conn->streams = 0;
-                               conn->first_stream_id = 0;
-                               break;
-                       } else {
-                               conn->first_stream_id++;
-                               conn->stream_inprocess--;
-                               memmove(s, s + 1, sizeof(*s) * conn->streams_count);
-                               // possible realloc to shrink allocated space, but probably useless
-                               for (kr_quic_stream_t *si = s;  si < s + conn->streams_count; si++) {
-                                       if (si->obufs_size == 0) {
-                                               init_list(&si->outbufs);
-                                       } else {
-                                               fix_list(&si->outbufs);
-                                       }
-                               }
-                       }
-               }
-       }
-}
-
-void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id)
-{
-       kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false);
-       if (s != NULL && s->inbuf.iov_len > 0) {
-               free(s->inbuf.iov_base);
-               conn->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len);
-               conn->quic_table->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len);
-               memset(&s->inbuf, 0, sizeof(s->inbuf));
-       }
-
-       while (s != NULL && s->inbufs != NULL) {
-               void *tofree = s->inbufs;
-               s->inbufs = s->inbufs->next;
-               free(tofree);
-       }
-
-       kr_quic_stream_ack_data(conn, stream_id, SIZE_MAX, false);
-}
-
 void kr_quic_table_rem2(kr_quic_cid_t **pcid, kr_quic_table_t *table)
 {
        kr_quic_cid_t *cid = *pcid;
@@ -1309,32 +1131,7 @@ static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data,
        return protolayer_continue(ctx);
 }
 
-static bool stream_exists(kr_quic_conn_t *conn, int64_t stream_id)
-{
-       // Taken from Knot, TODO fix if we use stream_user_data
-       // TRICK, we never use stream_user_data
-       return (ngtcp2_conn_set_stream_user_data(conn->conn, stream_id, NULL) == NGTCP2_NO_ERROR);
-}
-
-void kr_quic_stream_mark_sent(kr_quic_conn_t *conn, int64_t stream_id,
-                                size_t amount_sent)
-{
-       kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false);
-       if (s == NULL) {
-               return;
-       }
-
-       s->unsent_offset += amount_sent;
-       assert(s->unsent_offset <= s->unsent_obuf->len);
-       if (s->unsent_offset == s->unsent_obuf->len) {
-               s->unsent_offset = 0;
-               s->unsent_obuf = (kr_quic_obuf_t *)s->unsent_obuf->node.next;
-               if (s->unsent_obuf->node.next == NULL) { // already behind the tail of list
-                       s->unsent_obuf = NULL;
-               }
-       }
-}
-
+/* TODO perhaps also move to quic_stream */
 static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx *ctx,
                        kr_quic_conn_t *qconn, int64_t stream_id,
                        uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent)
@@ -1342,7 +1139,7 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx *
        (void)quic_table;
        assert(stream_id >= 0 || (data == NULL && len == 0));
 
-       while (stream_id >= 0 && !stream_exists(qconn, stream_id)) {
+       while (stream_id >= 0 && !kr_quic_stream_exists(qconn, stream_id)) {
                int64_t opened = 0;
                kr_log_info(DOQ, "Openning bidirectional stream no: %zu\n",
                                stream_id);
@@ -1351,7 +1148,7 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx *
                if (ret != kr_ok()) {
                        return ret;
                }
-               assert((bool)(opened == stream_id) == stream_exists(qconn, stream_id));
+               assert((bool)(opened == stream_id) == kr_quic_stream_exists(qconn, stream_id));
        }
 
        uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN :
@@ -1444,6 +1241,7 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx *
                int wrap_ret = session2_wrap(ctx->session,
                                ctx->payload,
                                ctx->comm,
+                               NULL,/*req*/
                                ctx->finished_cb,
                                ctx->finished_cb_baton);
 
@@ -1543,7 +1341,8 @@ static enum protolayer_iter_cb_result pl_quic_wrap(
 
        while (protolayer_queue_has_payload(&quic->wrap_queue)) {
                struct protolayer_iter_ctx *data = queue_head(quic->wrap_queue);
-               queue_push(quic->resend_queue, data);
+               // queue_push(quic->resend_queue, data);
+
                kr_log_info(DOQ, "queue_len: %zu\n", queue_len(quic->wrap_queue));
 
                queue_pop(quic->wrap_queue);
@@ -1596,7 +1395,7 @@ static void pl_quic_request_init(struct session2 *session,
 {
        kr_log_warning(DOQ, "IN request init\n");
        req->qsource.comm_flags.quic = true;
-       pl_quic_sess_data_t *quic = sess_data;
+       struct pl_quic_sess_data *quic = sess_data;
        quic->req = req;
 }
 
index a4ffe6bb55d6efdd41a85d5f6d5ea592a25b6247..f9852c0099d51e923ab4c8083a24a1fc9add49e3 100644 (file)
@@ -41,6 +41,29 @@ typedef enum {
        VERIFIED,  // RTT-1
 } quic_state_t;
 
+typedef enum {
+       /*! No error.  This is used when the connection or stream needs to be
+           closed, but there is no error to signal. */
+       DOQ_NO_ERROR = 0x0,
+       /*! The DoQ implementation encountered an internal error and is
+           incapable of pursuing the transaction or the connection. */
+       DOQ_INTERNAL_ERROR = 0x1,
+       /*! The DoQ implementation encountered a protocol error and is forcibly
+           aborting the connection. */
+       DOQ_PROTOCOL_ERROR = 0x2,
+       /*! A DoQ client uses this to signal that it wants to cancel an
+           outstanding transaction. */
+       DOQ_REQUEST_CANCELLED = 0x3,
+       /*! A DoQ implementation uses this to signal when closing a connection
+           due to excessive load. */
+       DOQ_EXCESSIVE_LOAD = 0x4,
+       /*!  A DoQ implementation uses this in the absence of a more specific
+            error code. */
+       DOQ_UNSPECIFIED_ERROR = 0x5,
+       /*! Alternative error code used for tests. */
+       DOQ_ERROR_RESERVED = 0xd098ea5e
+} quic_doq_error_t;
+
 
 /*! \brief QUIC parameters. */
 typedef struct {
@@ -67,11 +90,10 @@ typedef enum {
        KR_QUIC_SEND_IGNORE_BLOCKED  = (1 << 1),
 } kr_quic_send_flag_t;
 
-struct quic_ctx;
 // TODO maybe rename to something more in line with iter_data
 struct pl_quic_state {
        struct protolayer_data h;
-       struct quic_ctx *quic_ctx;
+       struct kr_quic_conn *qconn;
        /* struct ortt_ NOTE: Or some other data */ ;
 };
 
@@ -93,6 +115,7 @@ typedef struct {
 
 typedef struct kr_quic_table {
        kr_quic_table_flag_t flags;
+       /* general "settings" for connections */
        size_t size;
        size_t usage;
        size_t pointers;
@@ -131,51 +154,19 @@ typedef struct kr_tcp_inbufs_upd_res {
        struct iovec inbufs[];
 } kr_tcp_inbufs_udp_res_t;
 
-typedef struct kr_quic_stream {
-       /** the inbuf for small, singlepacket messages
-        * while the latter is for larger comunications, but still... */
+struct kr_quic_stream {
        struct iovec inbuf;
        struct kr_tcp_inbufs_upd_res *inbufs;
 
        size_t firstib_consumed;
-       /*ucw_*/list_t outbufs;
+       /*ucw_*/queue_t(uint8_t *) outbufs;
+       // /*ucw_*/list_t outbufs;
        size_t obufs_size;
 
        kr_quic_obuf_t *unsent_obuf;
        size_t first_offset;
        size_t unsent_offset;
-} kr_quic_stream_t;
-
-typedef struct quic_ctx {
-       ngtcp2_crypto_conn_ref conn_ref;
-
-       // // Parameters
-       quic_params_t params;
-
-       // Context
-       ngtcp2_settings settings;
-       struct {
-               int64_t id;
-               uint64_t out_ack;
-               struct iovec in_buffer;
-               struct knot_tcp_inbufs_upd_res *in_parsed;
-               size_t in_parsed_it;
-               size_t in_parsed_total;
-       } stream;
-       ngtcp2_ccerr last_err;
-       uint8_t secret[32];
-
-       // tls_ctx_t *tls;
-
-       // convenient struct to store Connection ID, its associated path, and stateless reset token.
-       ngtcp2_cid_token dcid_token;
-       ngtcp2_cid_token scid_token;
-       // ngtcp2_cid_token odcid_token; // maybe?
-       ngtcp2_conn *conn;
-       ngtcp2_pkt_info pi;
-       ngtcp2_path path;
-       quic_state_t state;
-} quic_ctx_t;
+};
 
 typedef struct kr_quic_conn {
        int heap_node_placeholder; // MUST be first field of the struct
@@ -194,7 +185,7 @@ typedef struct kr_quic_conn {
        ngtcp2_crypto_conn_ref crypto_ref;
 
         // QUIC stream abstraction
-       kr_quic_stream_t *streams;
+       struct kr_quic_stream *streams;
         // number of allocated streams structures
        int16_t streams_count;
         // index of first stream that has complete incomming data to be processed (aka inbuf_fin)
@@ -221,24 +212,9 @@ typedef struct kr_quic_conn {
 
 typedef struct pl_quic_sess_data {
        struct protolayer_data h;
-       // ngtcp2_conn *conns; This one might be wrong
-       // ngtcp2_crypto_conn_ref conn_ref;
-       // Parameters
        quic_params_t params;
        ngtcp2_settings settings;
 
-       // Context
-       // struct {
-       //      int64_t id;
-       //      uint64_t out_ack;
-       //      struct iovec in_buffer;
-       //      struct knot_tcp_inbufs_upd_res *in_parsed;
-       //      size_t in_parsed_it;
-       //      size_t in_parsed_total;
-       // } stream;
-       // ngtcp2_ccerr last_err;
-       // uint8_t secret[32];
-
        uint32_t conn_count;
        protolayer_iter_ctx_queue_t unwrap_queue;
        protolayer_iter_ctx_queue_t wrap_queue;
@@ -247,6 +223,5 @@ typedef struct pl_quic_sess_data {
        kr_quic_table_t *conn_table;
 
        struct kr_request *req;
-       quic_state_t state;
-       // struct wire_buf wire_buf;
+       // quic_state_t state;
 } pl_quic_sess_data_t;
diff --git a/daemon/quic_stream.c b/daemon/quic_stream.c
new file mode 100644 (file)
index 0000000..10c7802
--- /dev/null
@@ -0,0 +1,228 @@
+/*  Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
+ *  SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include "lib/generic/queue.h"
+#include "quic.h"
+#include <stdint.h>
+#include "quic_stream.h"
+
+typedef queue_t(kr_quic_obuf_t *) q_stream_buf;
+
+static void stream_outprocess(struct kr_quic_conn *conn, struct kr_quic_stream *stream)
+{
+       if (stream != &conn->streams[conn->stream_inprocess]) {
+               return;
+       }
+
+       for (int16_t idx = conn->stream_inprocess + 1; idx < conn->streams_count; idx++) {
+               stream = &conn->streams[idx];
+               if (stream->inbufs != NULL) {
+                       conn->stream_inprocess = stream - conn->streams;
+                       return;
+               }
+       }
+       conn->stream_inprocess = -1;
+}
+
+void kr_quic_stream_ack_data(struct kr_quic_conn *conn, int64_t stream_id,
+                               size_t end_acked, bool keep_stream)
+{
+       struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false);
+       if (s == NULL) {
+               return;
+       }
+
+       q_stream_buf *obs = (q_stream_buf *)&s->outbufs;
+
+       kr_quic_obuf_t *first;
+
+       while (queue_len(*obs) != 0 && end_acked >= (first = queue_head(*obs))->len + s->first_offset) {
+               queue_pop(*obs);
+               assert(queue_head(*obs) != first); // help CLANG analyzer understand what rem_node did and that further usage of HEAD(*obs) is safe
+               s->obufs_size -= first->len;
+               conn->obufs_size -= first->len;
+               conn->quic_table->obufs_size -= first->len;
+               s->first_offset += first->len;
+               free(first);
+               if (s->unsent_obuf == first) {
+                       s->unsent_obuf = queue_len(*obs) == 0 ? NULL : queue_head(*obs);
+                       s->unsent_offset = 0;
+               }
+       }
+
+       if (queue_len(*obs) == 0 && !keep_stream) {
+               stream_outprocess(conn, s);
+               memset(s, 0, sizeof(*s));
+               init_list((list_t *)&s->outbufs);
+               while (s = &conn->streams[0], s->inbuf.iov_len == 0 && s->inbufs == NULL && s->obufs_size == 0) {
+                       assert(conn->streams_count > 0);
+                       conn->streams_count--;
+
+                       if (conn->streams_count == 0) {
+                               free(conn->streams);
+                               conn->streams = 0;
+                               conn->first_stream_id = 0;
+                               break;
+                       } else {
+                               conn->first_stream_id ++;
+                               conn->stream_inprocess--;
+                               memmove(s, s + 1, sizeof(*s) * conn->streams_count);
+                               // possible realloc to shrink allocated space, but probably useless
+                               for (struct kr_quic_stream *si = s;  si < s + conn->streams_count; si++) {
+                                       if (si->obufs_size == 0) {
+                                               queue_init(si->outbufs);
+                                               // init_list((list_t *)&si->outbufs);
+                                       } else {
+                                               // fix_list((list_t *)&si->outbufs);
+                                       }
+                               }
+                       }
+               }
+       }
+}
+
+void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id)
+{
+       // TODO:
+       // struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false);
+       // if (s != NULL && s->inbuf.iov_len > 0) {
+       //      free(s->inbuf.iov_base);
+       //      conn->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len);
+       //      conn->quic_table->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len);
+       //      memset(&s->inbuf, 0, sizeof(s->inbuf));
+       // }
+       //
+       // while (s != NULL && s->inbufs != NULL) {
+       //      void *tofree = s->inbufs;
+       //      s->inbufs = s->inbufs->next;
+       //      free(tofree);
+       // }
+
+       // TODO:
+       // kr_quic_stream_ack_data(conn, stream_id, SIZE_MAX, false);
+}
+
+bool kr_quic_stream_exists(kr_quic_conn_t *conn, int64_t stream_id)
+{
+       // Taken from Knot, TODO fix if we use stream_user_data
+       // TRICK, we never use stream_user_data
+       return (ngtcp2_conn_set_stream_user_data(conn->conn, stream_id, NULL) == NGTCP2_NO_ERROR);
+}
+
+struct kr_quic_stream *kr_quic_conn_get_stream(kr_quic_conn_t *conn,
+               int64_t stream_id, bool create)
+{
+       if (stream_id % 4 != 0 || conn == NULL) {
+               return NULL;
+       }
+       stream_id /= 4;
+
+       if (conn->first_stream_id > stream_id) {
+               return NULL;
+       }
+       if (conn->streams_count > stream_id - conn->first_stream_id) {
+               return &conn->streams[stream_id - conn->first_stream_id];
+       }
+
+       if (create) {
+               size_t new_streams_count;
+               struct kr_quic_stream *new_streams;
+
+               if (conn->streams_count == 0) {
+                       new_streams = malloc(sizeof(new_streams[0]));
+                       if (new_streams == NULL) {
+                               return NULL;
+                       }
+                       new_streams_count = 1;
+                       conn->first_stream_id = stream_id;
+               } else {
+                       new_streams_count = stream_id + 1 - conn->first_stream_id;
+                       if (new_streams_count > MAX_STREAMS_PER_CONN) {
+                               return NULL;
+                       }
+                       new_streams = realloc(conn->streams, new_streams_count * sizeof(*new_streams));
+                       if (new_streams == NULL) {
+                               return NULL;
+                       }
+               }
+
+               for (struct kr_quic_stream *si = new_streams;
+                    si < new_streams + conn->streams_count; si++) {
+                       if (si->obufs_size == 0) {
+                               queue_init(si->outbufs);
+                               // init_list(&si->outbufs);
+                       } else {
+                               // fix_list(&si->outbufs);
+                       }
+               }
+
+               for (struct kr_quic_stream *si = new_streams + conn->streams_count;
+                    si < new_streams + new_streams_count; si++) {
+                       memset(si, 0, sizeof(*si));
+                       queue_init(si->outbufs);
+                       // init_list(&si->outbufs);
+               }
+
+               conn->streams = new_streams;
+               conn->streams_count = new_streams_count;
+
+               return &conn->streams[stream_id - conn->first_stream_id];
+       }
+
+       return NULL;
+}
+
+void kr_quic_stream_mark_sent(kr_quic_conn_t *conn,
+               int64_t stream_id, size_t amount_sent)
+{
+       struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false);
+       if (s == NULL) {
+               return;
+       }
+
+       s->unsent_offset += amount_sent;
+       assert(s->unsent_offset <= s->unsent_obuf->len);
+       if (s->unsent_offset == s->unsent_obuf->len) {
+               s->unsent_offset = 0;
+               s->unsent_obuf = (kr_quic_obuf_t *)s->unsent_obuf->node.next;
+               if (s->unsent_obuf->node.next == NULL) { // already behind the tail of list
+                       s->unsent_obuf = NULL;
+               }
+       }
+}
+
+int kr_quic_stream_recv_data(struct kr_quic_conn *qconn, int64_t stream_id,
+                               const uint8_t *data, size_t len, bool fin)
+{
+       if (len == 0 || qconn == NULL || data == NULL) {
+               return KNOT_EINVAL;
+       }
+
+       struct kr_quic_stream *stream = kr_quic_conn_get_stream(qconn, stream_id, true);
+       if (stream == NULL) {
+               return KNOT_ENOENT;
+       }
+
+       struct iovec in = { (void *)data, len };
+       ssize_t prev_ibufs_size = qconn->ibufs_size;
+       // TODO:
+       // int ret = kr_tcp_inbufs_upd(&stream->inbuf, in, true,
+       //                               &stream->inbufs, &conn->ibufs_size);
+       int ret = KNOT_EOK;
+
+       qconn->quic_table->ibufs_size += (ssize_t)qconn->ibufs_size - prev_ibufs_size;
+       if (ret != KNOT_EOK) {
+               return ret;
+       }
+
+       if (fin && stream->inbufs == NULL) {
+               return KNOT_ESEMCHECK;
+       }
+
+       if (stream->inbufs != NULL) {
+               // TODO:
+               // stream_inprocess(conn, stream);
+       }
+       return KNOT_EOK;
+}
diff --git a/daemon/quic_stream.h b/daemon/quic_stream.h
new file mode 100644 (file)
index 0000000..e849dc3
--- /dev/null
@@ -0,0 +1,27 @@
+/*  Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
+ *  SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#pragma once
+
+#include "quic.h"
+
+bool kr_quic_stream_exists(kr_quic_conn_t *conn, int64_t stream_id);
+
+/** We have to buffer all data that has been send but still waits
+ * to be acked, move the pointer of "yet to be acked" data */
+// void kr_quic_stream_ack_data(struct kr_quic_conn *conn, int64_t stream_id,
+//                                size_t end_acked, bool keep_stream);
+void kr_quic_stream_mark_sent(kr_quic_conn_t *conn,
+               int64_t stream_id, size_t amount_sent);
+
+struct kr_quic_stream *quic_conn_get_stream(struct kr_quic_conn *conn,
+               int64_t stream_id, bool create);
+
+int kr_quic_stream_recv_data(kr_quic_conn_t *conn, int64_t stream_id,
+               const uint8_t *data, size_t len, bool fin);
+
+void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id);
+
+struct kr_quic_stream *kr_quic_conn_get_stream(kr_quic_conn_t *conn,
+               int64_t stream_id, bool create);