From: Frantisek Tobias Date: Mon, 9 Jun 2025 12:17:14 +0000 (+0200) Subject: daemon/quic: implement send stream functions X-Git-Tag: v6.2.0~2^2~62 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ff4c99ecfef1a5f86320555a4bd28ccd385daf1a;p=thirdparty%2Fknot-resolver.git daemon/quic: implement send stream functions --- diff --git a/daemon/quic.c b/daemon/quic.c index 6e1db1a8e..75aa15c39 100644 --- a/daemon/quic.c +++ b/daemon/quic.c @@ -7,6 +7,7 @@ #include #include #include "lib/log.h" +#include "lib/resolve-impl.h" #include "session2.h" #include "network.h" #include "lib/resolve.h" @@ -426,14 +427,14 @@ static int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, static int handshake_confirmed_cb(ngtcp2_conn *conn, void *ctx) { (void)conn; - kr_log_info(DOQ, "Handshake confirmed\n"); + kr_log_info(DOQ, "Handshake QUIC\n"); // ctx->state = CONNECTED; return kr_ok(); } static int handshake_completed_cb(ngtcp2_conn *conn, void *user_data) { - kr_log_info(DOQ, "Handshake completed\n"); + kr_log_info(DOQ, "Handshake QUIC\n"); kr_quic_conn_t *ctx = (kr_quic_conn_t *)user_data; assert(ctx->conn == conn); @@ -1263,6 +1264,7 @@ static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data, } } + // Drive handshake // if (ctx->comm->target != NULL) { // kr_log_info(DOQ, "rewriting iter_ctx->comm->target of value: %p\n", // ctx->comm->target); @@ -1270,89 +1272,103 @@ static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data, pkt_ctx->comm->target = &dcid; - // kr_quic_send(quic->conn_table, conn, NULL, pkt_ctx, -1, -1); + kr_quic_send(quic->conn_table, conn, NULL, pkt_ctx, -1, -1); - if (!ngtcp2_conn_get_handshake_completed(conn->conn)) { - uint64_t now = quic_timestamp(); - kr_require(conn->conn); - - const ngtcp2_path *path = ngtcp2_conn_get_path(conn->conn); - ngtcp2_pkt_info pi = { .ecn = NGTCP2_ECN_NOT_ECT, }; - - /* Just for logging, pass through since conn_write will fail anyways*/ - if (wire_buf_free_space_length(pkt_ctx->payload.wire_buf) < 1200) { - kr_log_error(DOQ, "insufficient free space in wire-buf\n"); - } - - ret = ngtcp2_conn_write_pkt(conn->conn, path, &pi, - wire_buf_free_space(pkt_ctx->payload.wire_buf), - wire_buf_free_space_length(pkt_ctx->payload.wire_buf), - now); - kr_log_info(DOQ, "Written into wire_buf: %d\n", ret); - - if (ret <= 0) { - kr_log_error(DOQ, "Failed to write %s (%d)\n", ngtcp2_strerror(ret), ret); - // TODO: retry? - protolayer_break(ctx, ret); - } - - if ((ret = wire_buf_consume(pkt_ctx->payload.wire_buf, ret)) != 0) { - kr_log_error(DOQ, "wirebuf failed to consume: %s (%d)\n", - kr_strerror(ret), ret); - return kr_error(ret); - } - - pkt_ctx->comm->target = &dcid; - queue_push(quic->wrap_queue, pkt_ctx); - kr_log_info(DOQ, "DBG wrap: pkt_ctx: %p\n", pkt_ctx->comm->target); - - quic->h.session->outgoing = !quic->h.session->outgoing; - ret = session2_wrap(quic->h.session, - pkt_ctx->payload, - pkt_ctx->comm, - pkt_ctx->finished_cb, - pkt_ctx->finished_cb_baton); - - - kr_log_info(DOQ, "Result of session2_wrap: %s\n", - ret >= 0 ? "succeeded" : "failed"); - - ret = ngtcp2_conn_read_pkt(conn->conn, - &path, - &pi, - wire_buf_data(pkt_ctx->payload.wire_buf), - wire_buf_data_length(pkt_ctx->payload.wire_buf), - now); - - ngtcp2_conn_handle_expiry(conn->conn, now); - if (ret == NGTCP2_ERR_DRAINING) { // doq received CONNECTION_CLOSE from the counterpart - kr_quic_table_rem(conn, quic->conn_table); - ret = KNOT_EOK; - return ret; + kr_log_info(DOQ, "protolayer_has payload: %d\n", + protolayer_queue_has_payload(&quic->unwrap_queue)); - } else if (ngtcp2_err_is_fatal(ret)) { // connection doomed - kr_log_error(DOQ, "fatal error in ngtcp2_conn_read_pkt: %s (%d)", ngtcp2_strerror(ret), ret); - if (ret == NGTCP2_ERR_CALLBACK_FAILURE) { - ret = KNOT_EBADCERTKEY; - } else { - ret = KNOT_ECONN; - } - - kr_quic_table_rem(conn, quic->conn_table); - return ret; + // if (!ngtcp2_conn_get_handshake_completed(conn->conn)) { + // uint64_t now = quic_timestamp(); + // kr_require(conn->conn); + // + // const ngtcp2_path *path = ngtcp2_conn_get_path(conn->conn); + // ngtcp2_pkt_info pi = { .ecn = NGTCP2_ECN_NOT_ECT, }; + // + // /* Just for logging, pass through since conn_write will fail anyways*/ + // if (wire_buf_free_space_length(pkt_ctx->payload.wire_buf) < 1200) { + // kr_log_error(DOQ, "insufficient free space in wire-buf\n"); + // } + // + // ret = ngtcp2_conn_write_pkt(conn->conn, path, &pi, + // wire_buf_free_space(pkt_ctx->payload.wire_buf), + // wire_buf_free_space_length(pkt_ctx->payload.wire_buf), + // now); + // kr_log_info(DOQ, "Written into wire_buf: %d\n", ret); + // + // if (ret <= 0) { + // kr_log_error(DOQ, "Failed to write %s (%d)\n", ngtcp2_strerror(ret), ret); + // // TODO: retry? + // protolayer_break(ctx, ret); + // } + // + // if ((ret = wire_buf_consume(pkt_ctx->payload.wire_buf, ret)) != 0) { + // kr_log_error(DOQ, "wirebuf failed to consume: %s (%d)\n", + // kr_strerror(ret), ret); + // return kr_error(ret); + // } + // + // pkt_ctx->comm->target = &dcid; + // queue_push(quic->wrap_queue, pkt_ctx); + // kr_log_info(DOQ, "DBG wrap: pkt_ctx: %p\n", pkt_ctx->comm->target); + // + // quic->h.session->outgoing = !quic->h.session->outgoing; + // ret = session2_wrap(quic->h.session, + // pkt_ctx->payload, + // pkt_ctx->comm, + // pkt_ctx->finished_cb, + // pkt_ctx->finished_cb_baton); + // + // + // kr_log_info(DOQ, "Result of session2_wrap: %s\n", + // ret >= 0 ? "succeeded" : "failed"); + // + // // ret = ngtcp2_conn_read_pkt(conn->conn, + // // &path, + // // &pi, + // // wire_buf_data(pkt_ctx->payload.wire_buf), + // // wire_buf_data_length(pkt_ctx->payload.wire_buf), + // // now); + // + // ngtcp2_conn_handle_expiry(conn->conn, now); + // + // // if (ret == NGTCP2_ERR_DRAINING) { // doq received CONNECTION_CLOSE from the counterpart + // // kr_quic_table_rem(conn, quic->conn_table); + // // ret = KNOT_EOK; + // // return ret; + // // + // // } else if (ngtcp2_err_is_fatal(ret)) { // connection doomed + // // kr_log_error(DOQ, "fatal error in ngtcp2_conn_read_pkt: %s (%d)", ngtcp2_strerror(ret), ret); + // // if (ret == NGTCP2_ERR_CALLBACK_FAILURE) { + // // ret = KNOT_EBADCERTKEY; + // // } else { + // // ret = KNOT_ECONN; + // // } + // // + // // kr_quic_table_rem(conn, quic->conn_table); + // // return ret; + // // + // // } else if (ret != NGTCP2_NO_ERROR) { // non-fatal error, discard packet + // // kr_log_error(DOQ, "discarding recieved pkt: %s (%d)", ngtcp2_strerror(ret), ret); + // // ret = KNOT_EOK; + // // return ret; + // // } + // // + // // if (wire_buf_trim(pkt_ctx->payload.wire_buf, + // // ret)) { + // // kr_log_error(DOQ, "Failed to trim wire_buf\n"); + // // return ret; + // // } + // } else { + // pkt_ctx->comm->target = &dcid; + // + // kr_quic_send(quic->conn_table, conn, NULL, pkt_ctx, -1, -1); + // + // kr_log_info(DOQ, "protolayer_has payload: %d\n", + // protolayer_queue_has_payload(&quic->unwrap_queue)); + // } - } else if (ret != NGTCP2_NO_ERROR) { // non-fatal error, discard packet - kr_log_error(DOQ, "discarding recieved pkt: %s (%d)", ngtcp2_strerror(ret), ret); - ret = KNOT_EOK; - return ret; - } - if (wire_buf_trim(pkt_ctx->payload.wire_buf, - ret)) { - kr_log_error(DOQ, "Failed to trim wire_buf\n"); - return ret; - } - } + // protolayer_continue(pkt_ctx); } kr_log_info(DOQ, "handshake completed :%d\n", ngtcp2_conn_get_handshake_completed(conn->conn)); @@ -1363,7 +1379,7 @@ static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data, return protolayer_break(ctx, 0); kr_log_info(DOQ, "returning protolayer_finished, hopefully quic_wrap gets called\n"); - protolayer_break(ctx, 0); + return protolayer_break(ctx, 0); // protolayer_continue(ctx); } @@ -1402,6 +1418,9 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx * while (stream_id >= 0 && !stream_exists(relay, stream_id)) { int64_t opened = 0; + kr_log_info(DOQ, "Openning bidirectional stream no: %zu\n", + stream_id); + int ret = ngtcp2_conn_open_bidi_stream(relay->conn, &opened, NULL); if (ret != kr_ok()) { return ret; @@ -1419,23 +1438,35 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx * ngtcp2_vec vec = { .base = data, .len = len }; ngtcp2_pkt_info pi = { 0 }; - struct sockaddr_storage path_loc = { 0 }, path_rem = { 0 }; - ngtcp2_path path = { .local = { .addr = (struct sockaddr *)&path_loc, .addrlen = sizeof(path_loc) }, - .remote = { .addr = (struct sockaddr *)&path_rem, .addrlen = sizeof(path_rem) }, - .user_data = NULL }; + // struct sockaddr_storage path_loc = { 0 }, path_rem = { 0 }; + const ngtcp2_path *path = ngtcp2_conn_get_path(relay->conn); + + // bool find_path = (ctx->comm->src_addr == NULL); + // assert(find_path == (bool)(ctx->comm->dst_addr == NULL)); - bool find_path = (ctx->comm->src_addr == NULL); - assert(find_path == (bool)(ctx->comm->dst_addr == NULL)); + ngtcp2_conn_info info = { 0 }; + ngtcp2_conn_get_conn_info(relay->conn, &info); - int ret = ngtcp2_conn_writev_stream(relay->conn, find_path ? &path : NULL, &pi, - wire_buf_free_space(ctx->payload.wire_buf), + // int ret = ngtcp2_conn_writev_stream(relay->conn, find_path ? &path : NULL, &pi, + int ret = ngtcp2_conn_writev_stream(relay->conn, path, &pi, + wire_buf_free_space(ctx->payload.wire_buf), wire_buf_free_space_length(ctx->payload.wire_buf), - sent, fl, stream_id, &vec, - (stream_id >= 0 ? 1 : 0), quic_timestamp()); - if (ret <= 0) { + sent, fl, stream_id, &vec, + (stream_id >= 0 ? 1 : 0), quic_timestamp()); + + if (ret < 0) { // rpl->free_reply(rpl); + kr_log_info(DOQ, "Failed to write: %s (%d)", + ngtcp2_strerror(ret), ret); return ret; } + + if (wire_buf_consume(ctx->payload.wire_buf, ret) != 0) { + kr_log_error(DOQ, "Wire_buf failed to consume: %s (%d)\n", + kr_strerror(ret), ret); + return -1; + } + if (*sent < 0) { *sent = 0; } @@ -1448,6 +1479,10 @@ static int send_stream(kr_quic_table_t *quic_table, struct protolayer_iter_ctx * ctx->finished_cb, ctx->finished_cb_baton); + if (ret >= 0) { + return 1; + } + // rpl->out_payload->iov_len = ret; // rpl->ecn = pi.ecn; // if (find_path) { @@ -1568,22 +1603,21 @@ static enum protolayer_iter_cb_result pl_quic_wrap( kr_quic_conn_t *conn = kr_quic_table_lookup(dcid, quic->conn_table); if (!conn) { - kr_log_info(DOQ, "No conn found!\n"); + kr_log_warning(DOQ, "Missing associated connection\n"); + int ret = kr_quic_send(quic->conn_table, + conn, sess_data, ctx, 1, 0); return -1; // TODO } - if (/* no connection present */ 0) { - kr_log_warning(DOQ, "Missing associated connection\n"); - // int ret = kr_quic_send(quic->conn_table, - // conn, sess_data, ctx->payload, 1, 0); - - // if (ret != KNOT_EOK) - // // kr_log_error(DOQ, "knot_quic_send failed (%d)", ret); - // ; - } + data->async_mode = true; + protolayer_async(); + int ret = protolayer_continue(data); + kr_log_info(DOQ, "protolayer_continue returned %d\n", ret); + // return protolayer_async(); } - return protolayer_continue(ctx); + // return protolayer_continue(ctx); + return protolayer_break(ctx, PROTOLAYER_RET_NORMAL); } static enum protolayer_event_cb_result pl_quic_event_unwrap(