From: Frantisek Tobias Date: Mon, 7 Jul 2025 08:15:21 +0000 (+0200) Subject: daemon/quic: move stream functions to separate file, replace quic_ctx with struct... X-Git-Tag: v6.2.0~2^2~60 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ef7e1264a749d30cb1ff889f22116ee7f882d247;p=thirdparty%2Fknot-resolver.git daemon/quic: move stream functions to separate file, replace quic_ctx with struct kr_quic_conn everywhere --- diff --git a/daemon/meson.build b/daemon/meson.build index b9523ebd0..7f1ff0676 100644 --- a/daemon/meson.build +++ b/daemon/meson.build @@ -24,6 +24,7 @@ kresd_src = files([ 'worker.c', 'zimport.c', 'quic.c', + 'quic_stream.c', ]) if nghttp2.found() diff --git a/daemon/quic.c b/daemon/quic.c index dcb78a4fa..d70a06d73 100644 --- a/daemon/quic.c +++ b/daemon/quic.c @@ -2,7 +2,6 @@ * SPDX-License-Identifier: GPL-3.0-or-later */ -#include "quic.h" #include #include #include @@ -39,13 +38,14 @@ // FIXME: For now just to perform the pin check once HS finishes #include +#include "quic.h" +#include "quic_stream.h" + static uint64_t cid2hash(const ngtcp2_cid *cid, kr_quic_table_t *table); kr_quic_conn_t *kr_quic_table_lookup(const ngtcp2_cid *cid, kr_quic_table_t *table); kr_quic_cid_t **kr_quic_table_lookup2(const ngtcp2_cid *cid, kr_quic_table_t *table); kr_quic_cid_t **kr_quic_table_insert(kr_quic_conn_t *conn, const ngtcp2_cid *cid, kr_quic_table_t *table); -kr_quic_stream_t *kr_quic_conn_get_stream(kr_quic_conn_t *conn, - int64_t stream_id, bool create); static int pl_quic_client_init(struct session2 *session, pl_quic_sess_data_t *quic, tls_client_param_t *param); @@ -54,9 +54,15 @@ int kr_quic_send(kr_quic_table_t *quic_table, kr_quic_conn_t *conn, /* kr_quic_reply_t *reply */void *sess_data, struct protolayer_iter_ctx *ctx, unsigned max_msgs, kr_quic_send_flag_t flags); - uint64_t quic_timestamp(void); +#define set_application_error(ctx, error_code, reason, reason_len) \ + ngtcp2_ccerr_set_application_error(&(ctx)->last_err, \ + error_code, reason, reason_len) +#define set_transport_error(ctx, error_code, reason, reason_len) \ + ngtcp2_ccerr_set_transport_error(&(ctx)->last_err, \ + error_code, reason, reason_len) + static int cmp_expiry_heap_nodes(void *c1, void *c2) { if (((kr_quic_conn_t *)c1)->next_expiry < ((kr_quic_conn_t *)c2)->next_expiry) @@ -164,134 +170,23 @@ kr_quic_cid_t **kr_quic_table_insert(kr_quic_conn_t *conn, const ngtcp2_cid *cid return addto; } -// int kr_quic_stream_recv_data(kr_quic_conn_t *conn, int64_t stream_id, -// const uint8_t *data, size_t len, bool fin) -// { -// if (len == 0 || conn == NULL || data == NULL) { -// return KNOT_EINVAL; -// } -// -// kr_quic_stream_t *stream = kr_quic_conn_get_stream(conn, stream_id, true); -// if (stream == NULL) { -// return KNOT_ENOENT; -// } -// -// struct iovec in = { (void *)data, len }; -// ssize_t prev_ibufs_size = conn->ibufs_size; -// int ret = kr_tcp_inbufs_upd(&stream->inbuf, in, true, -// &stream->inbufs, &conn->ibufs_size); -// conn->quic_table->ibufs_size += (ssize_t)conn->ibufs_size - prev_ibufs_size; -// if (ret != KNOT_EOK) { -// return ret; -// } -// -// if (fin && stream->inbufs == NULL) { -// return KNOT_ESEMCHECK; -// } -// -// if (stream->inbufs != NULL) { -// stream_inprocess(conn, stream); -// } -// return KNOT_EOK; -// } - static int kr_recv_stream_data_cb(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id, uint64_t offset, const uint8_t *data, size_t datalen, void *user_data, void *stream_user_data) { - (void)conn; - (void)flags; - (void)offset; - (void)stream_user_data; - - quic_ctx_t *ctx = (quic_ctx_t *)user_data; - - if (stream_id != ctx->stream.id) { - // const uint8_t msg[] = "Unknown stream"; - // set_application_error(ctx, DOQ_PROTOCOL_ERROR, msg, sizeof(msg) - 1); - return NGTCP2_ERR_CALLBACK_FAILURE; - } - - struct iovec in = { - .iov_base = (uint8_t *)data, - .iov_len = datalen - }; + (void)(stream_user_data); // always NULL + (void)(offset); // QUIC shall ensure that data arrive in-order - int ret = 0; - // int ret = knot_tcp_inbufs_upd(&ctx->stream.in_buffer, in, true, - // &ctx->stream.in_parsed, - // &ctx->stream.in_parsed_total); + struct kr_quic_conn *qconn = (struct kr_quic_conn *)user_data; + assert(ctx->conn == conn); - if (ret != KNOT_EOK) { - // const uint8_t msg[] = "Malformed payload"; - // set_application_error(ctx, DOQ_PROTOCOL_ERROR, msg, sizeof(msg) - 1); - return NGTCP2_ERR_CALLBACK_FAILURE; - } + int ret = kr_quic_stream_recv_data(qconn, stream_id, data, datalen, + (flags & NGTCP2_STREAM_DATA_FLAG_FIN)); - ctx->stream.in_parsed_it = 0; - return 0; + return ret == KNOT_EOK ? 0 : NGTCP2_ERR_CALLBACK_FAILURE; } -kr_quic_stream_t *kr_quic_conn_get_stream(kr_quic_conn_t *conn, - int64_t stream_id, bool create) -{ - if (stream_id % 4 != 0 || conn == NULL) { - return NULL; - } - stream_id /= 4; - - if (conn->first_stream_id > stream_id) { - return NULL; - } - if (conn->streams_count > stream_id - conn->first_stream_id) { - return &conn->streams[stream_id - conn->first_stream_id]; - } - - if (create) { - size_t new_streams_count; - kr_quic_stream_t *new_streams; - - if (conn->streams_count == 0) { - new_streams = malloc(sizeof(new_streams[0])); - if (new_streams == NULL) { - return NULL; - } - new_streams_count = 1; - conn->first_stream_id = stream_id; - } else { - new_streams_count = stream_id + 1 - conn->first_stream_id; - if (new_streams_count > MAX_STREAMS_PER_CONN) { - return NULL; - } - new_streams = realloc(conn->streams, new_streams_count * sizeof(*new_streams)); - if (new_streams == NULL) { - return NULL; - } - } - - for (kr_quic_stream_t *si = new_streams; - si < new_streams + conn->streams_count; si++) { - if (si->obufs_size == 0) { - init_list(&si->outbufs); - } else { - fix_list(&si->outbufs); - } - } - - for (kr_quic_stream_t *si = new_streams + conn->streams_count; - si < new_streams + new_streams_count; si++) { - memset(si, 0, sizeof(*si)); - init_list(&si->outbufs); - } - conn->streams = new_streams; - conn->streams_count = new_streams_count; - - return &conn->streams[stream_id - conn->first_stream_id]; - } - return NULL; -} - // TODO Will likely be removed once the proper buffer scheme for // pl is figured out uint64_t buffer_alloc_size(uint64_t buffer_len) @@ -310,79 +205,6 @@ uint64_t buffer_alloc_size(uint64_t buffer_len) return buffer_len + 1; } -void kr_quic_stream_ack_data(kr_quic_conn_t *conn, int64_t stream_id, - size_t end_acked, bool keep_stream) -{ - kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false); - if (s == NULL) { - return; - } - - list_t *obs = &s->outbufs; - - kr_quic_obuf_t *first; - while (!EMPTY_LIST(*obs) && end_acked >= (first = HEAD(*obs))->len + s->first_offset) { - rem_node((node_t *)first); - s->obufs_size -= first->len; - conn->obufs_size -= first->len; - conn->quic_table->obufs_size -= first->len; - s->first_offset += first->len; - free(first); - if (s->unsent_obuf == first) { - s->unsent_obuf = EMPTY_LIST(*obs) ? NULL : HEAD(*obs); - s->unsent_offset = 0; - } - } - - if (EMPTY_LIST(*obs) && !keep_stream) { - // stream_outprocess(conn, s); - memset(s, 0, sizeof(*s)); - init_list(&s->outbufs); - while (s = &conn->streams[0], s->inbuf.iov_len == 0 && s->inbufs == NULL && s->obufs_size == 0) { - assert(conn->streams_count > 0); - conn->streams_count--; - - if (conn->streams_count == 0) { - free(conn->streams); - conn->streams = 0; - conn->first_stream_id = 0; - break; - } else { - conn->first_stream_id++; - conn->stream_inprocess--; - memmove(s, s + 1, sizeof(*s) * conn->streams_count); - // possible realloc to shrink allocated space, but probably useless - for (kr_quic_stream_t *si = s; si < s + conn->streams_count; si++) { - if (si->obufs_size == 0) { - init_list(&si->outbufs); - } else { - fix_list(&si->outbufs); - } - } - } - } - } -} - -void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id) -{ - kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false); - if (s != NULL && s->inbuf.iov_len > 0) { - free(s->inbuf.iov_base); - conn->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len); - conn->quic_table->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len); - memset(&s->inbuf, 0, sizeof(s->inbuf)); - } - - while (s != NULL && s->inbufs != NULL) { - void *tofree = s->inbufs; - s->inbufs = s->inbufs->next; - free(tofree); - } - - kr_quic_stream_ack_data(conn, stream_id, SIZE_MAX, false); -} - void kr_quic_table_rem2(kr_quic_cid_t **pcid, kr_quic_table_t *table) { kr_quic_cid_t *cid = *pcid; @@ -1309,32 +1131,7 @@ static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data, return protolayer_continue(ctx); } -static bool stream_exists(kr_quic_conn_t *conn, int64_t stream_id) -{ - // Taken from Knot, TODO fix if we use stream_user_data - // TRICK, we never use stream_user_data - return (ngtcp2_conn_set_stream_user_data(conn->conn, stream_id, NULL) == NGTCP2_NO_ERROR); -} - -void kr_quic_stream_mark_sent(kr_quic_conn_t *conn, int64_t stream_id, - size_t amount_sent) -{ - kr_quic_stream_t *s = kr_quic_conn_get_stream(conn, stream_id, false); - if (s == NULL) { - return; - } - - s->unsent_offset += amount_sent; - assert(s->unsent_offset <= s->unsent_obuf->len); - if (s->unsent_offset == s->unsent_obuf->len) { - s->unsent_offset = 0; - s->unsent_obuf = (kr_quic_obuf_t *)s->unsent_obuf->node.next; - if (s->unsent_obuf->node.next == NULL) { // already behind the tail of list - s->unsent_obuf = NULL; - } - } -} - +/* TODO perhaps also move to quic_stream */ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx *ctx, kr_quic_conn_t *qconn, int64_t stream_id, uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent) @@ -1342,7 +1139,7 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx * (void)quic_table; assert(stream_id >= 0 || (data == NULL && len == 0)); - while (stream_id >= 0 && !stream_exists(qconn, stream_id)) { + while (stream_id >= 0 && !kr_quic_stream_exists(qconn, stream_id)) { int64_t opened = 0; kr_log_info(DOQ, "Openning bidirectional stream no: %zu\n", stream_id); @@ -1351,7 +1148,7 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx * if (ret != kr_ok()) { return ret; } - assert((bool)(opened == stream_id) == stream_exists(qconn, stream_id)); + assert((bool)(opened == stream_id) == kr_quic_stream_exists(qconn, stream_id)); } uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN : @@ -1444,6 +1241,7 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx * int wrap_ret = session2_wrap(ctx->session, ctx->payload, ctx->comm, + NULL,/*req*/ ctx->finished_cb, ctx->finished_cb_baton); @@ -1543,7 +1341,8 @@ static enum protolayer_iter_cb_result pl_quic_wrap( while (protolayer_queue_has_payload(&quic->wrap_queue)) { struct protolayer_iter_ctx *data = queue_head(quic->wrap_queue); - queue_push(quic->resend_queue, data); + // queue_push(quic->resend_queue, data); + kr_log_info(DOQ, "queue_len: %zu\n", queue_len(quic->wrap_queue)); queue_pop(quic->wrap_queue); @@ -1596,7 +1395,7 @@ static void pl_quic_request_init(struct session2 *session, { kr_log_warning(DOQ, "IN request init\n"); req->qsource.comm_flags.quic = true; - pl_quic_sess_data_t *quic = sess_data; + struct pl_quic_sess_data *quic = sess_data; quic->req = req; } diff --git a/daemon/quic.h b/daemon/quic.h index a4ffe6bb5..f9852c009 100644 --- a/daemon/quic.h +++ b/daemon/quic.h @@ -41,6 +41,29 @@ typedef enum { VERIFIED, // RTT-1 } quic_state_t; +typedef enum { + /*! No error. This is used when the connection or stream needs to be + closed, but there is no error to signal. */ + DOQ_NO_ERROR = 0x0, + /*! The DoQ implementation encountered an internal error and is + incapable of pursuing the transaction or the connection. */ + DOQ_INTERNAL_ERROR = 0x1, + /*! The DoQ implementation encountered a protocol error and is forcibly + aborting the connection. */ + DOQ_PROTOCOL_ERROR = 0x2, + /*! A DoQ client uses this to signal that it wants to cancel an + outstanding transaction. */ + DOQ_REQUEST_CANCELLED = 0x3, + /*! A DoQ implementation uses this to signal when closing a connection + due to excessive load. */ + DOQ_EXCESSIVE_LOAD = 0x4, + /*! A DoQ implementation uses this in the absence of a more specific + error code. */ + DOQ_UNSPECIFIED_ERROR = 0x5, + /*! Alternative error code used for tests. */ + DOQ_ERROR_RESERVED = 0xd098ea5e +} quic_doq_error_t; + /*! \brief QUIC parameters. */ typedef struct { @@ -67,11 +90,10 @@ typedef enum { KR_QUIC_SEND_IGNORE_BLOCKED = (1 << 1), } kr_quic_send_flag_t; -struct quic_ctx; // TODO maybe rename to something more in line with iter_data struct pl_quic_state { struct protolayer_data h; - struct quic_ctx *quic_ctx; + struct kr_quic_conn *qconn; /* struct ortt_ NOTE: Or some other data */ ; }; @@ -93,6 +115,7 @@ typedef struct { typedef struct kr_quic_table { kr_quic_table_flag_t flags; + /* general "settings" for connections */ size_t size; size_t usage; size_t pointers; @@ -131,51 +154,19 @@ typedef struct kr_tcp_inbufs_upd_res { struct iovec inbufs[]; } kr_tcp_inbufs_udp_res_t; -typedef struct kr_quic_stream { - /** the inbuf for small, singlepacket messages - * while the latter is for larger comunications, but still... */ +struct kr_quic_stream { struct iovec inbuf; struct kr_tcp_inbufs_upd_res *inbufs; size_t firstib_consumed; - /*ucw_*/list_t outbufs; + /*ucw_*/queue_t(uint8_t *) outbufs; + // /*ucw_*/list_t outbufs; size_t obufs_size; kr_quic_obuf_t *unsent_obuf; size_t first_offset; size_t unsent_offset; -} kr_quic_stream_t; - -typedef struct quic_ctx { - ngtcp2_crypto_conn_ref conn_ref; - - // // Parameters - quic_params_t params; - - // Context - ngtcp2_settings settings; - struct { - int64_t id; - uint64_t out_ack; - struct iovec in_buffer; - struct knot_tcp_inbufs_upd_res *in_parsed; - size_t in_parsed_it; - size_t in_parsed_total; - } stream; - ngtcp2_ccerr last_err; - uint8_t secret[32]; - - // tls_ctx_t *tls; - - // convenient struct to store Connection ID, its associated path, and stateless reset token. - ngtcp2_cid_token dcid_token; - ngtcp2_cid_token scid_token; - // ngtcp2_cid_token odcid_token; // maybe? - ngtcp2_conn *conn; - ngtcp2_pkt_info pi; - ngtcp2_path path; - quic_state_t state; -} quic_ctx_t; +}; typedef struct kr_quic_conn { int heap_node_placeholder; // MUST be first field of the struct @@ -194,7 +185,7 @@ typedef struct kr_quic_conn { ngtcp2_crypto_conn_ref crypto_ref; // QUIC stream abstraction - kr_quic_stream_t *streams; + struct kr_quic_stream *streams; // number of allocated streams structures int16_t streams_count; // index of first stream that has complete incomming data to be processed (aka inbuf_fin) @@ -221,24 +212,9 @@ typedef struct kr_quic_conn { typedef struct pl_quic_sess_data { struct protolayer_data h; - // ngtcp2_conn *conns; This one might be wrong - // ngtcp2_crypto_conn_ref conn_ref; - // Parameters quic_params_t params; ngtcp2_settings settings; - // Context - // struct { - // int64_t id; - // uint64_t out_ack; - // struct iovec in_buffer; - // struct knot_tcp_inbufs_upd_res *in_parsed; - // size_t in_parsed_it; - // size_t in_parsed_total; - // } stream; - // ngtcp2_ccerr last_err; - // uint8_t secret[32]; - uint32_t conn_count; protolayer_iter_ctx_queue_t unwrap_queue; protolayer_iter_ctx_queue_t wrap_queue; @@ -247,6 +223,5 @@ typedef struct pl_quic_sess_data { kr_quic_table_t *conn_table; struct kr_request *req; - quic_state_t state; - // struct wire_buf wire_buf; + // quic_state_t state; } pl_quic_sess_data_t; diff --git a/daemon/quic_stream.c b/daemon/quic_stream.c new file mode 100644 index 000000000..10c78029a --- /dev/null +++ b/daemon/quic_stream.c @@ -0,0 +1,228 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +#include "lib/generic/queue.h" +#include "quic.h" +#include +#include "quic_stream.h" + +typedef queue_t(kr_quic_obuf_t *) q_stream_buf; + +static void stream_outprocess(struct kr_quic_conn *conn, struct kr_quic_stream *stream) +{ + if (stream != &conn->streams[conn->stream_inprocess]) { + return; + } + + for (int16_t idx = conn->stream_inprocess + 1; idx < conn->streams_count; idx++) { + stream = &conn->streams[idx]; + if (stream->inbufs != NULL) { + conn->stream_inprocess = stream - conn->streams; + return; + } + } + conn->stream_inprocess = -1; +} + +void kr_quic_stream_ack_data(struct kr_quic_conn *conn, int64_t stream_id, + size_t end_acked, bool keep_stream) +{ + struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false); + if (s == NULL) { + return; + } + + q_stream_buf *obs = (q_stream_buf *)&s->outbufs; + + kr_quic_obuf_t *first; + + while (queue_len(*obs) != 0 && end_acked >= (first = queue_head(*obs))->len + s->first_offset) { + queue_pop(*obs); + assert(queue_head(*obs) != first); // help CLANG analyzer understand what rem_node did and that further usage of HEAD(*obs) is safe + s->obufs_size -= first->len; + conn->obufs_size -= first->len; + conn->quic_table->obufs_size -= first->len; + s->first_offset += first->len; + free(first); + if (s->unsent_obuf == first) { + s->unsent_obuf = queue_len(*obs) == 0 ? NULL : queue_head(*obs); + s->unsent_offset = 0; + } + } + + if (queue_len(*obs) == 0 && !keep_stream) { + stream_outprocess(conn, s); + memset(s, 0, sizeof(*s)); + init_list((list_t *)&s->outbufs); + while (s = &conn->streams[0], s->inbuf.iov_len == 0 && s->inbufs == NULL && s->obufs_size == 0) { + assert(conn->streams_count > 0); + conn->streams_count--; + + if (conn->streams_count == 0) { + free(conn->streams); + conn->streams = 0; + conn->first_stream_id = 0; + break; + } else { + conn->first_stream_id ++; + conn->stream_inprocess--; + memmove(s, s + 1, sizeof(*s) * conn->streams_count); + // possible realloc to shrink allocated space, but probably useless + for (struct kr_quic_stream *si = s; si < s + conn->streams_count; si++) { + if (si->obufs_size == 0) { + queue_init(si->outbufs); + // init_list((list_t *)&si->outbufs); + } else { + // fix_list((list_t *)&si->outbufs); + } + } + } + } + } +} + +void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id) +{ + // TODO: + // struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false); + // if (s != NULL && s->inbuf.iov_len > 0) { + // free(s->inbuf.iov_base); + // conn->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len); + // conn->quic_table->ibufs_size -= buffer_alloc_size(s->inbuf.iov_len); + // memset(&s->inbuf, 0, sizeof(s->inbuf)); + // } + // + // while (s != NULL && s->inbufs != NULL) { + // void *tofree = s->inbufs; + // s->inbufs = s->inbufs->next; + // free(tofree); + // } + + // TODO: + // kr_quic_stream_ack_data(conn, stream_id, SIZE_MAX, false); +} + +bool kr_quic_stream_exists(kr_quic_conn_t *conn, int64_t stream_id) +{ + // Taken from Knot, TODO fix if we use stream_user_data + // TRICK, we never use stream_user_data + return (ngtcp2_conn_set_stream_user_data(conn->conn, stream_id, NULL) == NGTCP2_NO_ERROR); +} + +struct kr_quic_stream *kr_quic_conn_get_stream(kr_quic_conn_t *conn, + int64_t stream_id, bool create) +{ + if (stream_id % 4 != 0 || conn == NULL) { + return NULL; + } + stream_id /= 4; + + if (conn->first_stream_id > stream_id) { + return NULL; + } + if (conn->streams_count > stream_id - conn->first_stream_id) { + return &conn->streams[stream_id - conn->first_stream_id]; + } + + if (create) { + size_t new_streams_count; + struct kr_quic_stream *new_streams; + + if (conn->streams_count == 0) { + new_streams = malloc(sizeof(new_streams[0])); + if (new_streams == NULL) { + return NULL; + } + new_streams_count = 1; + conn->first_stream_id = stream_id; + } else { + new_streams_count = stream_id + 1 - conn->first_stream_id; + if (new_streams_count > MAX_STREAMS_PER_CONN) { + return NULL; + } + new_streams = realloc(conn->streams, new_streams_count * sizeof(*new_streams)); + if (new_streams == NULL) { + return NULL; + } + } + + for (struct kr_quic_stream *si = new_streams; + si < new_streams + conn->streams_count; si++) { + if (si->obufs_size == 0) { + queue_init(si->outbufs); + // init_list(&si->outbufs); + } else { + // fix_list(&si->outbufs); + } + } + + for (struct kr_quic_stream *si = new_streams + conn->streams_count; + si < new_streams + new_streams_count; si++) { + memset(si, 0, sizeof(*si)); + queue_init(si->outbufs); + // init_list(&si->outbufs); + } + + conn->streams = new_streams; + conn->streams_count = new_streams_count; + + return &conn->streams[stream_id - conn->first_stream_id]; + } + + return NULL; +} + +void kr_quic_stream_mark_sent(kr_quic_conn_t *conn, + int64_t stream_id, size_t amount_sent) +{ + struct kr_quic_stream *s = kr_quic_conn_get_stream(conn, stream_id, false); + if (s == NULL) { + return; + } + + s->unsent_offset += amount_sent; + assert(s->unsent_offset <= s->unsent_obuf->len); + if (s->unsent_offset == s->unsent_obuf->len) { + s->unsent_offset = 0; + s->unsent_obuf = (kr_quic_obuf_t *)s->unsent_obuf->node.next; + if (s->unsent_obuf->node.next == NULL) { // already behind the tail of list + s->unsent_obuf = NULL; + } + } +} + +int kr_quic_stream_recv_data(struct kr_quic_conn *qconn, int64_t stream_id, + const uint8_t *data, size_t len, bool fin) +{ + if (len == 0 || qconn == NULL || data == NULL) { + return KNOT_EINVAL; + } + + struct kr_quic_stream *stream = kr_quic_conn_get_stream(qconn, stream_id, true); + if (stream == NULL) { + return KNOT_ENOENT; + } + + struct iovec in = { (void *)data, len }; + ssize_t prev_ibufs_size = qconn->ibufs_size; + // TODO: + // int ret = kr_tcp_inbufs_upd(&stream->inbuf, in, true, + // &stream->inbufs, &conn->ibufs_size); + int ret = KNOT_EOK; + + qconn->quic_table->ibufs_size += (ssize_t)qconn->ibufs_size - prev_ibufs_size; + if (ret != KNOT_EOK) { + return ret; + } + + if (fin && stream->inbufs == NULL) { + return KNOT_ESEMCHECK; + } + + if (stream->inbufs != NULL) { + // TODO: + // stream_inprocess(conn, stream); + } + return KNOT_EOK; +} diff --git a/daemon/quic_stream.h b/daemon/quic_stream.h new file mode 100644 index 000000000..e849dc3fa --- /dev/null +++ b/daemon/quic_stream.h @@ -0,0 +1,27 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +#pragma once + +#include "quic.h" + +bool kr_quic_stream_exists(kr_quic_conn_t *conn, int64_t stream_id); + +/** We have to buffer all data that has been send but still waits + * to be acked, move the pointer of "yet to be acked" data */ +// void kr_quic_stream_ack_data(struct kr_quic_conn *conn, int64_t stream_id, +// size_t end_acked, bool keep_stream); +void kr_quic_stream_mark_sent(kr_quic_conn_t *conn, + int64_t stream_id, size_t amount_sent); + +struct kr_quic_stream *quic_conn_get_stream(struct kr_quic_conn *conn, + int64_t stream_id, bool create); + +int kr_quic_stream_recv_data(kr_quic_conn_t *conn, int64_t stream_id, + const uint8_t *data, size_t len, bool fin); + +void kr_quic_conn_stream_free(kr_quic_conn_t *conn, int64_t stream_id); + +struct kr_quic_stream *kr_quic_conn_get_stream(kr_quic_conn_t *conn, + int64_t stream_id, bool create);