'worker.c',
'zimport.c',
'quic.c',
+ 'quic_stream.c',
])
if nghttp2.found()
* SPDX-License-Identifier: GPL-3.0-or-later
*/
-#include "quic.h"
#include <asm-generic/errno-base.h>
#include <asm-generic/errno.h>
#include <bits/types/struct_iovec.h>
// FIXME: For now just to perform the pin check once HS finishes
#include <libknot/quic/tls_common.h>
+#include "quic.h"
+#include "quic_stream.h"
+
static uint64_t cid2hash(const ngtcp2_cid *cid, kr_quic_table_t *table);
kr_quic_conn_t *kr_quic_table_lookup(const ngtcp2_cid *cid, kr_quic_table_t *table);
kr_quic_cid_t **kr_quic_table_lookup2(const ngtcp2_cid *cid, kr_quic_table_t *table);
kr_quic_cid_t **kr_quic_table_insert(kr_quic_conn_t *conn, const ngtcp2_cid *cid,
kr_quic_table_t *table);
-kr_quic_stream_t *kr_quic_conn_get_stream(kr_quic_conn_t *conn,
- int64_t stream_id, bool create);
static int pl_quic_client_init(struct session2 *session,
pl_quic_sess_data_t *quic,
tls_client_param_t *param);
/* kr_quic_reply_t *reply */void *sess_data,
struct protolayer_iter_ctx *ctx,
unsigned max_msgs, kr_quic_send_flag_t flags);
-
uint64_t quic_timestamp(void);
+#define set_application_error(ctx, error_code, reason, reason_len) \
+ ngtcp2_ccerr_set_application_error(&(ctx)->last_err, \
+ error_code, reason, reason_len)
+#define set_transport_error(ctx, error_code, reason, reason_len) \
+ ngtcp2_ccerr_set_transport_error(&(ctx)->last_err, \
+ error_code, reason, reason_len)
+
static int cmp_expiry_heap_nodes(void *c1, void *c2)
{
if (((kr_quic_conn_t *)c1)->next_expiry < ((kr_quic_conn_t *)c2)->next_expiry)
return addto;
}
-// int kr_quic_stream_recv_data(kr_quic_conn_t *conn, int64_t stream_id,
-// const uint8_t *data, size_t len, bool fin)
-// {
-// if (len == 0 || conn == NULL || data == NULL) {
-// return KNOT_EINVAL;
-// }
-//
-// kr_quic_stream_t *stream = kr_quic_conn_get_stream(conn, stream_id, true);
-// if (stream == NULL) {
-// return KNOT_ENOENT;
-// }
-//
-// struct iovec in = { (void *)data, len };
-// ssize_t prev_ibufs_size = conn->ibufs_size;
-// int ret = kr_tcp_inbufs_upd(&stream->inbuf, in, true,
-// &stream->inbufs, &conn->ibufs_size);
-// conn->quic_table->ibufs_size += (ssize_t)conn->ibufs_size - prev_ibufs_size;
-// if (ret != KNOT_EOK) {
-// return ret;
-// }
-//
-// if (fin && stream->inbufs == NULL) {
-// return KNOT_ESEMCHECK;
-// }
-//
-// if (stream->inbufs != NULL) {
-// stream_inprocess(conn, stream);
-// }
-// return KNOT_EOK;
-// }
-
static int kr_recv_stream_data_cb(ngtcp2_conn *conn, uint32_t flags,
int64_t stream_id, uint64_t offset, const uint8_t *data,
size_t datalen, void *user_data, void *stream_user_data)
{
- (void)conn;
- (void)flags;
- (void)offset;
- (void)stream_user_data;
-
- quic_ctx_t *ctx = (quic_ctx_t *)user_data;
-
- if (stream_id != ctx->stream.id) {
- // const uint8_t msg[] = "Unknown stream";
- // set_application_error(ctx, DOQ_PROTOCOL_ERROR, msg, sizeof(msg) - 1);
- return NGTCP2_ERR_CALLBACK_FAILURE;
- }
-
- struct iovec in = {
- .iov_base = (uint8_t *)data,
- .iov_len = datalen
- };
+ (void)(stream_user_data); // always NULL
+ (void)(offset); // QUIC shall ensure that data arrive in-order
- int ret = 0;
- // int ret = knot_tcp_inbufs_upd(&ctx->stream.in_buffer, in, true,
- // &ctx->stream.in_parsed,
- // &ctx->stream.in_parsed_total);
+ struct kr_quic_conn *qconn = (struct kr_quic_conn *)user_data;
+ assert(ctx->conn == conn);
- if (ret != KNOT_EOK) {
- // const uint8_t msg[] = "Malformed payload";
- // set_application_error(ctx, DOQ_PROTOCOL_ERROR, msg, sizeof(msg) - 1);
- return NGTCP2_ERR_CALLBACK_FAILURE;
- }
+ int ret = kr_quic_stream_recv_data(qconn, stream_id, data, datalen,
+ (flags & NGTCP2_STREAM_DATA_FLAG_FIN));
- ctx->stream.in_parsed_it = 0;
- return 0;
+ return ret == KNOT_EOK ? 0 : NGTCP2_ERR_CALLBACK_FAILURE;
}
-kr_quic_stream_t *kr_quic_conn_get_stream(kr_quic_conn_t *conn,
- int64_t stream_id, bool create)
-{
- if (stream_id % 4 != 0 || conn == NULL) {
- return NULL;
- }
- stream_id /= 4;
-
- if (conn->first_stream_id > stream_id) {
- return NULL;
- }
- if (conn->streams_count > stream_id - conn->first_stream_id) {
- return &conn->streams[stream_id - conn->first_stream_id];
- }
-
- if (create) {
- size_t new_streams_count;
- kr_quic_stream_t *new_streams;
-
- if (conn->streams_count == 0) {
- new_streams = malloc(sizeof(new_streams[0]));
- if (new_streams == NULL) {
- return NULL;
- }
- new_streams_count = 1;
- conn->first_stream_id = stream_id;
- } else {
- new_streams_count = stream_id + 1 - conn->first_stream_id;
- if (new_streams_count > MAX_STREAMS_PER_CONN) {
- return NULL;
- }
- new_streams = realloc(conn->streams, new_streams_count * sizeof(*new_streams));
- if (new_streams == NULL) {
- return NULL;
- }
- }
-
- for (kr_quic_stream_t *si = new_streams;
- si < new_streams + conn->streams_count; si++) {
- if (si->obufs_size == 0) {
- init_list(&si->outbufs);
- } else {
- fix_list(&si->outbufs);
- }
- }
-
- for (kr_quic_stream_t *si = new_streams + conn->streams_count;
- si < new_streams + new_streams_count; si++) {
- memset(si, 0, sizeof(*si));
- init_list(&si->outbufs);
- }
- conn->streams = new_streams;
- conn->streams_count = new_streams_count;
-
- return &conn->streams[stream_id - conn->first_stream_id];
- }
- return NULL;
-}
-
// TODO Will likely be removed once the proper buffer scheme for
// pl is figured out
uint64_t buffer_alloc_size(uint64_t buffer_len)
return buffer_len + 1;
}
-void kr_quic_stream_ack_data(kr_quic_conn_t *conn, int64_t stream_id,
- size_t end_acked, bool keep_stream)
-{
- kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false);
- if (s == NULL) {
- return;
- }
-
- list_t *obs = &s->outbufs;
-
- kr_quic_obuf_t *first;
- while (!EMPTY_LIST(*obs) && end_acked >= (first = HEAD(*obs))->len + s->first_offset) {
- rem_node((node_t *)first);
- s->obufs_size -= first->len;
- conn->obufs_size -= first->len;
- conn->quic_table->obufs_size -= first->len;
- s->first_offset += first->len;
- free(first);
- if (s->unsent_obuf == first) {
- s->unsent_obuf = EMPTY_LIST(*obs) ? NULL : HEAD(*obs);
- s->unsent_offset = 0;
- }
- }
-
- if (EMPTY_LIST(*obs) && !keep_stream) {
- // stream_outprocess(conn, s);
- memset(s, 0, sizeof(*s));
- init_list(&s->outbufs);
- while (s = &conn->streams[0], s->inbuf.iov_len == 0 && s->inbufs == NULL && s->obufs_size == 0) {
- assert(conn->streams_count > 0);
- conn->streams_count--;
-
- if (conn->streams_count == 0) {
- free(conn->streams);
- conn->streams = 0;
- conn->first_stream_id = 0;
- break;
- } else {
- conn->first_stream_id++;
- conn->stream_inprocess--;
- memmove(s, s + 1, sizeof(*s) * conn->streams_count);
- // possible realloc to shrink allocated space, but probably useless
- for (kr_quic_stream_t *si = s; si < s + conn->streams_count; si++) {
- if (si->obufs_size == 0) {
- init_list(&si->outbufs);
- } else {
- fix_list(&si->outbufs);
- }
- }
- }
- }
- }
-}
-
-void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id)
-{
- kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false);
- if (s != NULL && s->inbuf.iov_len > 0) {
- free(s->inbuf.iov_base);
- conn->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len);
- conn->quic_table->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len);
- memset(&s->inbuf, 0, sizeof(s->inbuf));
- }
-
- while (s != NULL && s->inbufs != NULL) {
- void *tofree = s->inbufs;
- s->inbufs = s->inbufs->next;
- free(tofree);
- }
-
- kr_quic_stream_ack_data(conn, stream_id, SIZE_MAX, false);
-}
-
void kr_quic_table_rem2(kr_quic_cid_t **pcid, kr_quic_table_t *table)
{
kr_quic_cid_t *cid = *pcid;
return protolayer_continue(ctx);
}
-static bool stream_exists(kr_quic_conn_t *conn, int64_t stream_id)
-{
- // Taken from Knot, TODO fix if we use stream_user_data
- // TRICK, we never use stream_user_data
- return (ngtcp2_conn_set_stream_user_data(conn->conn, stream_id, NULL) == NGTCP2_NO_ERROR);
-}
-
-void kr_quic_stream_mark_sent(kr_quic_conn_t *conn, int64_t stream_id,
- size_t amount_sent)
-{
- kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false);
- if (s == NULL) {
- return;
- }
-
- s->unsent_offset += amount_sent;
- assert(s->unsent_offset <= s->unsent_obuf->len);
- if (s->unsent_offset == s->unsent_obuf->len) {
- s->unsent_offset = 0;
- s->unsent_obuf = (kr_quic_obuf_t *)s->unsent_obuf->node.next;
- if (s->unsent_obuf->node.next == NULL) { // already behind the tail of list
- s->unsent_obuf = NULL;
- }
- }
-}
-
+/* TODO perhaps also move to quic_stream */
static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx *ctx,
kr_quic_conn_t *qconn, int64_t stream_id,
uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent)
(void)quic_table;
assert(stream_id >= 0 || (data == NULL && len == 0));
- while (stream_id >= 0 && !stream_exists(qconn, stream_id)) {
+ while (stream_id >= 0 && !kr_quic_stream_exists(qconn, stream_id)) {
int64_t opened = 0;
kr_log_info(DOQ, "Openning bidirectional stream no: %zu\n",
stream_id);
if (ret != kr_ok()) {
return ret;
}
- assert((bool)(opened == stream_id) == stream_exists(qconn, stream_id));
+ assert((bool)(opened == stream_id) == kr_quic_stream_exists(qconn, stream_id));
}
uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN :
int wrap_ret = session2_wrap(ctx->session,
ctx->payload,
ctx->comm,
+ NULL,/*req*/
ctx->finished_cb,
ctx->finished_cb_baton);
while (protolayer_queue_has_payload(&quic->wrap_queue)) {
struct protolayer_iter_ctx *data = queue_head(quic->wrap_queue);
- queue_push(quic->resend_queue, data);
+ // queue_push(quic->resend_queue, data);
+
kr_log_info(DOQ, "queue_len: %zu\n", queue_len(quic->wrap_queue));
queue_pop(quic->wrap_queue);
{
kr_log_warning(DOQ, "IN request init\n");
req->qsource.comm_flags.quic = true;
- pl_quic_sess_data_t *quic = sess_data;
+ struct pl_quic_sess_data *quic = sess_data;
quic->req = req;
}
VERIFIED, // RTT-1
} quic_state_t;
+typedef enum {
+ /*! No error. This is used when the connection or stream needs to be
+ closed, but there is no error to signal. */
+ DOQ_NO_ERROR = 0x0,
+ /*! The DoQ implementation encountered an internal error and is
+ incapable of pursuing the transaction or the connection. */
+ DOQ_INTERNAL_ERROR = 0x1,
+ /*! The DoQ implementation encountered a protocol error and is forcibly
+ aborting the connection. */
+ DOQ_PROTOCOL_ERROR = 0x2,
+ /*! A DoQ client uses this to signal that it wants to cancel an
+ outstanding transaction. */
+ DOQ_REQUEST_CANCELLED = 0x3,
+ /*! A DoQ implementation uses this to signal when closing a connection
+ due to excessive load. */
+ DOQ_EXCESSIVE_LOAD = 0x4,
+ /*! A DoQ implementation uses this in the absence of a more specific
+ error code. */
+ DOQ_UNSPECIFIED_ERROR = 0x5,
+ /*! Alternative error code used for tests. */
+ DOQ_ERROR_RESERVED = 0xd098ea5e
+} quic_doq_error_t;
+
/*! \brief QUIC parameters. */
typedef struct {
KR_QUIC_SEND_IGNORE_BLOCKED = (1 << 1),
} kr_quic_send_flag_t;
-struct quic_ctx;
// TODO maybe rename to something more in line with iter_data
struct pl_quic_state {
struct protolayer_data h;
- struct quic_ctx *quic_ctx;
+ struct kr_quic_conn *qconn;
/* struct ortt_ NOTE: Or some other data */ ;
};
typedef struct kr_quic_table {
kr_quic_table_flag_t flags;
+ /* general "settings" for connections */
size_t size;
size_t usage;
size_t pointers;
struct iovec inbufs[];
} kr_tcp_inbufs_udp_res_t;
-typedef struct kr_quic_stream {
- /** the inbuf for small, singlepacket messages
- * while the latter is for larger comunications, but still... */
+struct kr_quic_stream {
struct iovec inbuf;
struct kr_tcp_inbufs_upd_res *inbufs;
size_t firstib_consumed;
- /*ucw_*/list_t outbufs;
+ /*ucw_*/queue_t(uint8_t *) outbufs;
+ // /*ucw_*/list_t outbufs;
size_t obufs_size;
kr_quic_obuf_t *unsent_obuf;
size_t first_offset;
size_t unsent_offset;
-} kr_quic_stream_t;
-
-typedef struct quic_ctx {
- ngtcp2_crypto_conn_ref conn_ref;
-
- // // Parameters
- quic_params_t params;
-
- // Context
- ngtcp2_settings settings;
- struct {
- int64_t id;
- uint64_t out_ack;
- struct iovec in_buffer;
- struct knot_tcp_inbufs_upd_res *in_parsed;
- size_t in_parsed_it;
- size_t in_parsed_total;
- } stream;
- ngtcp2_ccerr last_err;
- uint8_t secret[32];
-
- // tls_ctx_t *tls;
-
- // convenient struct to store Connection ID, its associated path, and stateless reset token.
- ngtcp2_cid_token dcid_token;
- ngtcp2_cid_token scid_token;
- // ngtcp2_cid_token odcid_token; // maybe?
- ngtcp2_conn *conn;
- ngtcp2_pkt_info pi;
- ngtcp2_path path;
- quic_state_t state;
-} quic_ctx_t;
+};
typedef struct kr_quic_conn {
int heap_node_placeholder; // MUST be first field of the struct
ngtcp2_crypto_conn_ref crypto_ref;
// QUIC stream abstraction
- kr_quic_stream_t *streams;
+ struct kr_quic_stream *streams;
// number of allocated streams structures
int16_t streams_count;
// index of first stream that has complete incomming data to be processed (aka inbuf_fin)
typedef struct pl_quic_sess_data {
struct protolayer_data h;
- // ngtcp2_conn *conns; This one might be wrong
- // ngtcp2_crypto_conn_ref conn_ref;
- // Parameters
quic_params_t params;
ngtcp2_settings settings;
- // Context
- // struct {
- // int64_t id;
- // uint64_t out_ack;
- // struct iovec in_buffer;
- // struct knot_tcp_inbufs_upd_res *in_parsed;
- // size_t in_parsed_it;
- // size_t in_parsed_total;
- // } stream;
- // ngtcp2_ccerr last_err;
- // uint8_t secret[32];
-
uint32_t conn_count;
protolayer_iter_ctx_queue_t unwrap_queue;
protolayer_iter_ctx_queue_t wrap_queue;
kr_quic_table_t *conn_table;
struct kr_request *req;
- quic_state_t state;
- // struct wire_buf wire_buf;
+ // quic_state_t state;
} pl_quic_sess_data_t;
--- /dev/null
+/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include "lib/generic/queue.h"
+#include "quic.h"
+#include <stdint.h>
+#include "quic_stream.h"
+
+typedef queue_t(kr_quic_obuf_t *) q_stream_buf;
+
+static void stream_outprocess(struct kr_quic_conn *conn, struct kr_quic_stream *stream)
+{
+ if (stream != &conn->streams[conn->stream_inprocess]) {
+ return;
+ }
+
+ for (int16_t idx = conn->stream_inprocess + 1; idx < conn->streams_count; idx++) {
+ stream = &conn->streams[idx];
+ if (stream->inbufs != NULL) {
+ conn->stream_inprocess = stream - conn->streams;
+ return;
+ }
+ }
+ conn->stream_inprocess = -1;
+}
+
+void kr_quic_stream_ack_data(struct kr_quic_conn *conn, int64_t stream_id,
+ size_t end_acked, bool keep_stream)
+{
+ struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false);
+ if (s == NULL) {
+ return;
+ }
+
+ q_stream_buf *obs = (q_stream_buf *)&s->outbufs;
+
+ kr_quic_obuf_t *first;
+
+ while (queue_len(*obs) != 0 && end_acked >= (first = queue_head(*obs))->len + s->first_offset) {
+ queue_pop(*obs);
+ assert(queue_head(*obs) != first); // help CLANG analyzer understand what rem_node did and that further usage of HEAD(*obs) is safe
+ s->obufs_size -= first->len;
+ conn->obufs_size -= first->len;
+ conn->quic_table->obufs_size -= first->len;
+ s->first_offset += first->len;
+ free(first);
+ if (s->unsent_obuf == first) {
+ s->unsent_obuf = queue_len(*obs) == 0 ? NULL : queue_head(*obs);
+ s->unsent_offset = 0;
+ }
+ }
+
+ if (queue_len(*obs) == 0 && !keep_stream) {
+ stream_outprocess(conn, s);
+ memset(s, 0, sizeof(*s));
+ init_list((list_t *)&s->outbufs);
+ while (s = &conn->streams[0], s->inbuf.iov_len == 0 && s->inbufs == NULL && s->obufs_size == 0) {
+ assert(conn->streams_count > 0);
+ conn->streams_count--;
+
+ if (conn->streams_count == 0) {
+ free(conn->streams);
+ conn->streams = 0;
+ conn->first_stream_id = 0;
+ break;
+ } else {
+ conn->first_stream_id ++;
+ conn->stream_inprocess--;
+ memmove(s, s + 1, sizeof(*s) * conn->streams_count);
+ // possible realloc to shrink allocated space, but probably useless
+ for (struct kr_quic_stream *si = s; si < s + conn->streams_count; si++) {
+ if (si->obufs_size == 0) {
+ queue_init(si->outbufs);
+ // init_list((list_t *)&si->outbufs);
+ } else {
+ // fix_list((list_t *)&si->outbufs);
+ }
+ }
+ }
+ }
+ }
+}
+
+void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id)
+{
+ // TODO:
+ // struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false);
+ // if (s != NULL && s->inbuf.iov_len > 0) {
+ // free(s->inbuf.iov_base);
+ // conn->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len);
+ // conn->quic_table->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len);
+ // memset(&s->inbuf, 0, sizeof(s->inbuf));
+ // }
+ //
+ // while (s != NULL && s->inbufs != NULL) {
+ // void *tofree = s->inbufs;
+ // s->inbufs = s->inbufs->next;
+ // free(tofree);
+ // }
+
+ // TODO:
+ // kr_quic_stream_ack_data(conn, stream_id, SIZE_MAX, false);
+}
+
+bool kr_quic_stream_exists(kr_quic_conn_t *conn, int64_t stream_id)
+{
+ // Taken from Knot, TODO fix if we use stream_user_data
+ // TRICK, we never use stream_user_data
+ return (ngtcp2_conn_set_stream_user_data(conn->conn, stream_id, NULL) == NGTCP2_NO_ERROR);
+}
+
+struct kr_quic_stream *kr_quic_conn_get_stream(kr_quic_conn_t *conn,
+ int64_t stream_id, bool create)
+{
+ if (stream_id % 4 != 0 || conn == NULL) {
+ return NULL;
+ }
+ stream_id /= 4;
+
+ if (conn->first_stream_id > stream_id) {
+ return NULL;
+ }
+ if (conn->streams_count > stream_id - conn->first_stream_id) {
+ return &conn->streams[stream_id - conn->first_stream_id];
+ }
+
+ if (create) {
+ size_t new_streams_count;
+ struct kr_quic_stream *new_streams;
+
+ if (conn->streams_count == 0) {
+ new_streams = malloc(sizeof(new_streams[0]));
+ if (new_streams == NULL) {
+ return NULL;
+ }
+ new_streams_count = 1;
+ conn->first_stream_id = stream_id;
+ } else {
+ new_streams_count = stream_id + 1 - conn->first_stream_id;
+ if (new_streams_count > MAX_STREAMS_PER_CONN) {
+ return NULL;
+ }
+ new_streams = realloc(conn->streams, new_streams_count * sizeof(*new_streams));
+ if (new_streams == NULL) {
+ return NULL;
+ }
+ }
+
+ for (struct kr_quic_stream *si = new_streams;
+ si < new_streams + conn->streams_count; si++) {
+ if (si->obufs_size == 0) {
+ queue_init(si->outbufs);
+ // init_list(&si->outbufs);
+ } else {
+ // fix_list(&si->outbufs);
+ }
+ }
+
+ for (struct kr_quic_stream *si = new_streams + conn->streams_count;
+ si < new_streams + new_streams_count; si++) {
+ memset(si, 0, sizeof(*si));
+ queue_init(si->outbufs);
+ // init_list(&si->outbufs);
+ }
+
+ conn->streams = new_streams;
+ conn->streams_count = new_streams_count;
+
+ return &conn->streams[stream_id - conn->first_stream_id];
+ }
+
+ return NULL;
+}
+
+void kr_quic_stream_mark_sent(kr_quic_conn_t *conn,
+ int64_t stream_id, size_t amount_sent)
+{
+ struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false);
+ if (s == NULL) {
+ return;
+ }
+
+ s->unsent_offset += amount_sent;
+ assert(s->unsent_offset <= s->unsent_obuf->len);
+ if (s->unsent_offset == s->unsent_obuf->len) {
+ s->unsent_offset = 0;
+ s->unsent_obuf = (kr_quic_obuf_t *)s->unsent_obuf->node.next;
+ if (s->unsent_obuf->node.next == NULL) { // already behind the tail of list
+ s->unsent_obuf = NULL;
+ }
+ }
+}
+
+int kr_quic_stream_recv_data(struct kr_quic_conn *qconn, int64_t stream_id,
+ const uint8_t *data, size_t len, bool fin)
+{
+ if (len == 0 || qconn == NULL || data == NULL) {
+ return KNOT_EINVAL;
+ }
+
+ struct kr_quic_stream *stream = kr_quic_conn_get_stream(qconn, stream_id, true);
+ if (stream == NULL) {
+ return KNOT_ENOENT;
+ }
+
+ struct iovec in = { (void *)data, len };
+ ssize_t prev_ibufs_size = qconn->ibufs_size;
+ // TODO:
+ // int ret = kr_tcp_inbufs_upd(&stream->inbuf, in, true,
+ // &stream->inbufs, &conn->ibufs_size);
+ int ret = KNOT_EOK;
+
+ qconn->quic_table->ibufs_size += (ssize_t)qconn->ibufs_size - prev_ibufs_size;
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ if (fin && stream->inbufs == NULL) {
+ return KNOT_ESEMCHECK;
+ }
+
+ if (stream->inbufs != NULL) {
+ // TODO:
+ // stream_inprocess(conn, stream);
+ }
+ return KNOT_EOK;
+}
--- /dev/null
+/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#pragma once
+
+#include "quic.h"
+
+bool kr_quic_stream_exists(kr_quic_conn_t *conn, int64_t stream_id);
+
+/** We have to buffer all data that has been send but still waits
+ * to be acked, move the pointer of "yet to be acked" data */
+// void kr_quic_stream_ack_data(struct kr_quic_conn *conn, int64_t stream_id,
+// size_t end_acked, bool keep_stream);
+void kr_quic_stream_mark_sent(kr_quic_conn_t *conn,
+ int64_t stream_id, size_t amount_sent);
+
+struct kr_quic_stream *quic_conn_get_stream(struct kr_quic_conn *conn,
+ int64_t stream_id, bool create);
+
+int kr_quic_stream_recv_data(kr_quic_conn_t *conn, int64_t stream_id,
+ const uint8_t *data, size_t len, bool fin);
+
+void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id);
+
+struct kr_quic_stream *kr_quic_conn_get_stream(kr_quic_conn_t *conn,
+ int64_t stream_id, bool create);