#include <asm-generic/errno-base.h>
#include <asm-generic/errno.h>
#include <bits/types/struct_iovec.h>
-#include <errno.h>
#include <gnutls/x509.h>
#include <gnutls/gnutls.h>
#include <gnutls/crypto.h>
#include "lib/defines.h"
#include "lib/log.h"
#include "lib/resolve-impl.h"
+// #include "mempattern.h"
#include "session2.h"
#include "network.h"
#include "lib/resolve.h"
// #include "libknot/quic/quic.h"
#include "libdnssec/random.h"
+#include <libknot/wire.h>
#include <libknot/xdp/tcp_iobuf.h>
#include <stdint.h>
#include <contrib/ucw/heap.h>
if (conn == NULL)
return NULL;
+ /* FIXME magic numbers */
+
+
conn->conn = ngconn;
conn->quic_table = table;
conn->stream_inprocess = -1;
conn->qlog_fd = -1;
+ wire_buf_init(&conn->unwrap_buf, 1200);
conn->next_expiry = UINT64_MAX;
if (!heap_insert(table->expiry_heap, (heap_val_t *)conn)) {
quic->conn_count = 0;
}
+ // TODO set setings?
+
return 0;
}
/**
*/
static int handle_packet(struct pl_quic_sess_data *quic,
- struct protolayer_iter_ctx *pkt_ctx,
+ struct protolayer_iter_ctx *ctx,
const uint8_t *pkt, size_t pktlen, ngtcp2_cid *dcid,
struct kr_quic_conn **out_conn)
{
uint32_t supported_quic[1] = { NGTCP2_PROTO_VER_V1 };
if (ret == NGTCP2_ERR_VERSION_NEGOTIATION) {
// FIXME: This will be broken by trimming the pkt below
+ // this will need to be sent out immediatelly
ngtcp2_pkt_write_version_negotiation(
- wire_buf_free_space(pkt_ctx->payload.wire_buf),
- wire_buf_free_space_length(pkt_ctx->payload.wire_buf),
+ wire_buf_free_space(ctx->payload.wire_buf),
+ wire_buf_free_space_length(ctx->payload.wire_buf),
random(),
- // FIXME: Maybe switch
+ // WARNING Maybe switch
decoded_cids.scid,
decoded_cids.scidlen,
decoded_cids.dcid,
qconn = kr_quic_table_lookup(dcid, quic->conn_table);
if (!qconn) {
- if ((ret = quic_init_server_conn(quic->conn_table, pkt_ctx,
+ if ((ret = quic_init_server_conn(quic->conn_table, ctx,
UINT64_MAX - 1, &scid, dcid, decoded_cids,
pkt, pktlen, &qconn)) != kr_ok()) {
return ret;
}
- // if ((ret = wire_buf_trim(pkt_ctx->payload.wire_buf, pktlen)) != 0) {
+ // if ((ret = wire_buf_trim(ctx->payload.wire_buf, pktlen)) != 0) {
// kr_log_error(DOQ, "wirebuf failed to trim: %s (%d)\n",
// kr_strerror(ret), ret);
// return kr_error(ret);
ret = ngtcp2_conn_read_pkt(qconn->conn,
path,
&pi,
- wire_buf_data(pkt_ctx->payload.wire_buf),
- wire_buf_data_length(pkt_ctx->payload.wire_buf),
+ pkt,
+ pktlen,
+ // wire_buf_data(ctx->payload.wire_buf),
+ // wire_buf_data_length(ctx->payload.wire_buf),
now);
if (ret == NGTCP2_ERR_DRAINING) { // doq received CONNECTION_CLOSE from the counterpart
ngtcp2_conn_handle_expiry(qconn->conn, now);
- if (wire_buf_trim(pkt_ctx->payload.wire_buf, wire_buf_data_length(pkt_ctx->payload.wire_buf))) {
+ if (wire_buf_trim(ctx->payload.wire_buf, pktlen)) {
kr_log_error(DOQ, "Failed to trim wire_buf\n");
return ret;
}
- if (kr_fails_assert(wire_buf_data_length(pkt_ctx->payload.wire_buf) == 0)) {
+ if (kr_fails_assert(wire_buf_data_length(ctx->payload.wire_buf) == 0)) {
kr_log_error(DOQ, "read pkt should consume the entire packet\n");
return -1; /* TODO errcode */
}
- // pkt_ctx->comm->target = &dcid;
+ // ctx->comm->target = &dcid;
// kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0);
return kr_ok();
void __attribute__ ((noinline)) empty_call(void) { }
+static int collect_queries(struct kr_quic_conn *qconn)
+{
+
+ // if (0/* use dns_dgram (iovec input)*/) {
+ // // Perhaps make permanent for each conn;
+ // struct iovec *iov = mm_calloc(&ctx->pool,
+ // qconn->streams_pending, sizeof(struct iovec));
+ // kr_require(iov);
+ // struct protolayer_payload payload = protolayer_payload_iovec(iov,
+ // 0/* increase upon fill */, false);
+ //
+ // int64_t stream_id;
+ // struct kr_quic_stream *stream;
+ // while (qconn != NULL && (stream = kr_quic_stream_get_process(qconn, &stream_id)) != NULL) {
+ // assert(stream->inbufs != NULL);
+ // assert(stream->inbufs->n_inbufs > 0);
+ //
+ // uint16_t msg_size = knot_wire_read_u16(wire_buf_data(&stream->pers_inbuf));
+ // payload.iovec.iov[payload.iovec.cnt].iov_base =
+ // wire_buf_data(&stream->pers_inbuf) + sizeof(uint16_t);
+ //
+ // payload.iovec.iov[payload.iovec.cnt].iov_len = msg_size;
+ // ++payload.iovec.cnt;
+ // }
+ //
+ // struct protolayer_iter_ctx *ctx_head = queue_head(quic->unwrap_queue);
+ // if (!kr_fails_assert(ctx == ctx_head))
+ // queue_pop(quic->unwrap_queue);
+ //
+ // if (payload.iovec.cnt > 0) {
+ // ctx->payload = payload;
+ // return protolayer_continue(ctx);
+ // }
+ //
+ // mm_free(&ctx->pool, iov);
+ // return protolayer_break(ctx, kr_error(ENODATA));
+
+ kr_require(wire_buf_data_length(&qconn->unwrap_buf) == 0);
+ size_t free_space = wire_buf_free_space_length(&qconn->unwrap_buf);
+
+ int64_t stream_id;
+ struct kr_quic_stream *stream;
+ while (qconn != NULL
+ && (stream = kr_quic_stream_get_process(qconn,
+ &stream_id)) != NULL) {
+
+ size_t to_write = wire_buf_data_length(&stream->pers_inbuf);
+ kr_assert(to_write > 0);
+ if (to_write > free_space) {
+ kr_log_error(DOQ, "unwrap buf is not big enough\n");
+ return kr_error(ENOMEM);
+ }
+
+ memcpy(wire_buf_free_space(&qconn->unwrap_buf),
+ wire_buf_data(&stream->pers_inbuf), to_write);
+ wire_buf_consume(&qconn->unwrap_buf, to_write);
+ wire_buf_trim(&stream->pers_inbuf, to_write);
+ /* TODO inspect assembly diff compared to repeated
+ * call of wire_buf_free_space_length
+ * should make no difference though*/
+ free_space -= to_write;
+ }
+
+ return wire_buf_data(&qconn->unwrap_buf) > 0
+ ? kr_ok() : kr_error(ENODATA);
+}
+
static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data,
void *iter_data, struct protolayer_iter_ctx *ctx)
{
- int ret;
- struct pl_quic_sess_data *quic = sess_data;
+ int rv = kr_error(ENOSPC);
kr_quic_conn_t *qconn = NULL;
+ struct pl_quic_sess_data *quic = sess_data;
queue_push(quic->unwrap_queue, ctx);
while (protolayer_queue_has_payload(&quic->unwrap_queue)) {
- struct protolayer_iter_ctx *pkt_ctx = queue_head(quic->unwrap_queue);
-
- queue_pop(quic->unwrap_queue);
-
- kr_assert(pkt_ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF);
- size_t pktlen = wire_buf_data_length(pkt_ctx->payload.wire_buf);
- const uint8_t *pkt = wire_buf_data(pkt_ctx->payload.wire_buf);
+ kr_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF);
ngtcp2_cid dcid = { 0 };
- ret = handle_packet(quic,
- /** FIXME */pkt_ctx,
- pkt, pktlen, &dcid, &qconn);
+ int ret = handle_packet(quic,
+ ctx,
+ wire_buf_data(ctx->payload.wire_buf),
+ wire_buf_data_length(ctx->payload.wire_buf),
+ &dcid, &qconn);
+
if (ret != kr_ok()) {
- protolayer_break(ctx, ret);
+ goto fail;
}
/* TODO Verify this doesn't leak */
- pkt_ctx->comm->target = mm_calloc(&ctx->pool, sizeof(ngtcp2_cid), 1);
- /* TODO log failed allocation "iteration ctx ran out of memory" */
- kr_require(pkt_ctx->comm->target);
- memcpy(pkt_ctx->comm->target, &dcid, sizeof(ngtcp2_cid));
-
- if (qconn->stream_inprocess >= 0) {
- // This branch is only accessed once a stream has
- // finished receiving a query (stream_inprocess received FIN)
- // TODO: protolayer_continue with the query in the first finished stream
- empty_call();
- struct kr_quic_stream *stream = kr_quic_conn_get_stream(
- qconn, qconn->stream_inprocess, true
- );
-
- if (stream == NULL) {
- return KNOT_ENOENT;
- }
+ ctx->comm->target = malloc(sizeof(ngtcp2_cid));
+ /* TODO log failed allocation "iterctx ran out of memory" */
+ kr_require(ctx->comm->target);
+ memcpy(ctx->comm->target, &dcid, sizeof(ngtcp2_cid));
+
+ if (qconn->stream_inprocess == -1) {
+ /* This will produce a reposponse and pass it towards
+ * lower layers (UDP or anything between the current
+ * layer and udp). */
+ kr_quic_send(quic->conn_table, qconn, quic,
+ ctx, QUIC_MAX_SEND_PER_RECV, 0);
+ rv = kr_ok();
+ goto fail;
+ }
- pkt_ctx->payload.wire_buf = &stream->pers_inbuf;
- kr_log_info(DOQ, "Proceeding protolayer_continue in quic\n");
- return protolayer_continue(pkt_ctx);
+ if (kr_fails_assert(queue_len(quic->unwrap_queue) == 1)) {
+ ret = kr_error(EINVAL);
+ goto fail;
}
+ }
- // ctx->comm->target = pkt_ctx->comm->target;
+ struct protolayer_iter_ctx *ctx_head = queue_head(quic->unwrap_queue);
+ if (!kr_fails_assert(ctx == ctx_head))
+ queue_pop(quic->unwrap_queue);
- if (qconn->flags & KR_QUIC_CONN_HANDSHAKE_DONE) {
- // && qconn->flags & ) {
- // kr_log_info(DOQ, "Proceeding to next layer\n");
- kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0);
- // return protolayer_continue(pkt_ctx);
- } else {
- // proceed with nodata handshake process
- kr_quic_send(quic->conn_table, qconn, quic, ctx, QUIC_MAX_SEND_PER_RECV, 0);
- }
+ if ((rv = collect_queries(qconn)) == kr_ok()) {
+ ctx->payload = protolayer_payload_wire_buf(&qconn->unwrap_buf,
+ false);
+ return protolayer_continue(ctx);
}
- // return protolayer_break(ctx, 0);
- return protolayer_continue(ctx);
+ return protolayer_break(ctx, rv);
+
+fail:
+ ctx_head = queue_head(quic->unwrap_queue);
+ if (!kr_fails_assert(ctx == ctx_head))
+ queue_pop(quic->unwrap_queue);
+ return protolayer_break(ctx, rv);
}
/* TODO perhaps also move to quic_stream */
static int send_stream(struct protolayer_iter_ctx *ctx,
kr_quic_conn_t *qconn, int64_t stream_id,
- uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent,
- )
+ uint8_t *data, size_t len, bool fin, ngtcp2_ssize *sent)
{
assert(stream_id >= 0 || (data == NULL && len == 0));
kr_log_info(DOQ, "Openning bidirectional stream no: %zu\n",
stream_id);
- int ret = ngtcp2_conn_open_bidi_stream(qconn->conn, &opened, NULL);
+ int ret = ngtcp2_conn_open_bidi_stream(qconn->conn,
+ &opened, NULL);
if (ret != kr_ok()) {
/** This should not happen */
kr_log_info(DOQ, "remote endpoint isn't ready for streams: %s (%d)\n",
ngtcp2_strerror(ret), ret);
return ret;
}
- assert((bool)(opened == stream_id) == kr_quic_stream_exists(qconn, stream_id));
+ assert((bool)(opened == stream_id)
+ == kr_quic_stream_exists(qconn, stream_id));
}
- uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN :
- NGTCP2_WRITE_STREAM_FLAG_NONE);
+ uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN
+ : NGTCP2_WRITE_STREAM_FLAG_NONE);
+
ngtcp2_vec vec = { .base = data, .len = len };
ngtcp2_pkt_info pi = { 0 };
ngtcp2_conn_info info = { 0 };
ngtcp2_conn_get_conn_info(qconn->conn, &info);
-
int nwrite = 0;
- nwrite = ngtcp2_conn_writev_stream(qconn->conn, path, &pi,
- wire_buf_free_space(ctx->payload.wire_buf),
- wire_buf_free_space_length(ctx->payload.wire_buf),
- sent, fl, stream_id, &vec,
- (stream_id >= 0 ? 1 : 0), quic_timestamp());
-
- /* TODO:
- * This packet may contain frames other than STREAM frame. The
- * packet might not contain STREAM frame if other frames
- * occupy the packet. In that case, *pdatalen would
- * be -1 if pdatalen is not NULL.
- */
- // TODO: abstract error printing, likely shared across mane ngtcp2_ calls
- if (nwrite < 0) {
- switch (nwrite) {
- case NGTCP2_ERR_NOMEM:
- kr_log_error(DOQ, "write failed: %s (%d)",
- ngtcp2_strerror(nwrite), nwrite);
- // TODO terminal
-
- case NGTCP2_ERR_STREAM_NOT_FOUND:
- kr_log_error(DOQ, "write stream failed to find: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
- // TODO terminal
-
- case NGTCP2_ERR_STREAM_SHUT_WR:
- kr_log_error(DOQ, "local write endpoint is shut or stream is beeing reset: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
- // TODO attempt later once (if reset)
-
- case NGTCP2_ERR_PKT_NUM_EXHAUSTED:
- kr_log_error(DOQ, "no more pkt numbers available: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
- // TODO terminal or reset pktn
-
- case NGTCP2_ERR_CALLBACK_FAILURE:
- kr_log_error(DOQ, "user callback failed: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
- // TODO attempt later
-
- case NGTCP2_ERR_INVALID_ARGUMENT:
- kr_log_error(DOQ, "The total length of stream data is too large: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
- // TODO attempt differently
-
- case NGTCP2_ERR_STREAM_DATA_BLOCKED:
- kr_log_error(DOQ, "stream is blocked due to flow controll: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
- // TODO attempt later
-
- /* only when NGTCP2_WRITE_STREAM_FLAG_MORE (currently not used) */
- case NGTCP2_ERR_WRITE_MORE:
- kr_log_error(DOQ, "should not happen: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
- kr_require(false);
- default:
- kr_log_error(DOQ, "unknown error in writev_stream: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
- kr_require(false);
- }
- } else if (*sent >= 0) { /** FIXME */ }
+ struct wire_buf *wb = mm_alloc(&ctx->pool, sizeof(struct wire_buf));
+ kr_require(wb);
- if (nwrite == 0) {
- return nwrite;
- }
+ wire_buf_init(wb, 1200/* TODO perhaps maxudp payload */);
+ struct protolayer_payload pl = protolayer_payload_wire_buf(wb, false);
- if (len) {
- /* FIXME this is horrible */
- if (wire_buf_trim(ctx->payload.wire_buf, len) != kr_ok()) {
- kr_log_error(DOQ, "Wrire buf failed to trim: %s (%d)\n",
+ do {
+ nwrite = ngtcp2_conn_writev_stream(qconn->conn, path, &pi,
+ wire_buf_free_space(pl.wire_buf),
+ wire_buf_free_space_length(pl.wire_buf),
+ sent, fl, stream_id, &vec,
+ (stream_id >= 0 ? 1 : 0), quic_timestamp());
+
+ /* TODO:
+ * This packet may contain frames other than STREAM frame. The
+ * packet might not contain STREAM frame if other frames
+ * occupy the packet. In that case, *pdatalen would
+ * be -1 if pdatalen is not NULL.
+ */
+ // TODO: abstract error printing, likely shared across mane ngtcp2_ calls
+ if (nwrite < 0) {
+ mm_free(&ctx->pool, wb);
+ switch (nwrite) {
+ case NGTCP2_ERR_NOMEM:
+ kr_log_error(DOQ, "write failed: %s (%d)",
+ ngtcp2_strerror(nwrite), nwrite);
+ // TODO terminal
+
+ case NGTCP2_ERR_STREAM_NOT_FOUND:
+ kr_log_error(DOQ, "write stream failed to find: %s (%d)\n",
+ ngtcp2_strerror(nwrite), nwrite);
+ // TODO terminal
+
+ case NGTCP2_ERR_STREAM_SHUT_WR:
+ kr_log_error(DOQ, "local write endpoint is shut or stream is beeing reset: %s (%d)\n",
+ ngtcp2_strerror(nwrite), nwrite);
+ return kr_ok();
+ // TODO attempt later once (if reset)
+
+ case NGTCP2_ERR_PKT_NUM_EXHAUSTED:
+ kr_log_error(DOQ, "no more pkt numbers available: %s (%d)\n",
+ ngtcp2_strerror(nwrite), nwrite);
+ // TODO terminal or reset pktn
+
+ case NGTCP2_ERR_CALLBACK_FAILURE:
+ kr_log_error(DOQ, "user callback failed: %s (%d)\n",
+ ngtcp2_strerror(nwrite), nwrite);
+ // TODO attempt later
+
+ case NGTCP2_ERR_INVALID_ARGUMENT:
+ kr_log_error(DOQ, "The total length of stream data is too large: %s (%d)\n",
+ ngtcp2_strerror(nwrite), nwrite);
+ // TODO attempt differently
+
+ case NGTCP2_ERR_STREAM_DATA_BLOCKED:
+ kr_log_error(DOQ, "stream is blocked due to flow controll: %s (%d)\n",
+ ngtcp2_strerror(nwrite), nwrite);
+ // TODO attempt later
+
+ /* only when NGTCP2_WRITE_STREAM_FLAG_MORE (currently not used) */
+ case NGTCP2_ERR_WRITE_MORE:
+ kr_log_error(DOQ, "should not happen: %s (%d)\n",
+ ngtcp2_strerror(nwrite), nwrite);
+ kr_require(false);
+ default:
+ kr_log_error(DOQ, "unknown error in writev_stream: %s (%d)\n",
+ ngtcp2_strerror(nwrite), nwrite);
+ kr_require(false);
+ }
+
+ } else if (*sent >= 0) {
+ /* TODO this data has to be kept untill ack arives */
+ vec.len -= *sent;
+ }
+
+ if (wire_buf_consume(pl.wire_buf, nwrite) != kr_ok()) {
+ kr_log_error(DOQ, "Wire_buf failed to consume: %s (%d)\n",
ngtcp2_strerror(nwrite), nwrite);
- return -1;
+ mm_free(&ctx->pool, wb);
+ return nwrite;
}
- }
- if (wire_buf_consume(ctx->payload.wire_buf, nwrite) != kr_ok()) {
- kr_log_error(DOQ, "Wire_buf failed to consume: %s (%d)\n",
- ngtcp2_strerror(nwrite), nwrite);
+ } while (nwrite && vec.len > 0);
+
+ ctx->payload = pl;
+
+ /* called from wrap, return the written amount and continue */
+ if (len) {
return nwrite;
}
/* case HS has not finished, we have to switch to wrap direction
* without proceeding to the resolve layer */
- if (nwrite || *sent) {
- // written++;
+ if (nwrite || *sent) {
+ /* session2_wrap requires nonempty payload
+ * thats why we call it here, instead
+ * off calling wrap_after from unwrap and filling
+ * the buffer via this function called from wrap */
int wrap_ret = session2_wrap_after(ctx->session,
PROTOLAYER_TYPE_QUIC,
- // int wrap_ret = session2_wrap(ctx->session,
-
ctx->payload,
ctx->comm,
- // NULL,/*req*/
ctx->finished_cb,
ctx->finished_cb_baton);
- return 1;
+ return nwrite;
} else {
kr_require(nwrite || *sent);
}
- return 0;
+ /* make sure return is negative
+ * though the code shouldn't currently get here */
+ return nwrite <= 0 ? nwrite : -1;
}
// maybe rename kr_quic_respond?
kr_quic_conn_t *conn,
/* kr_quic_reply_t *reply */void *sess_data,
struct protolayer_iter_ctx *ctx,
- unsigned max_msgs, kr_quic_send_flag_t flags)
+ unsigned max_msgs,
+ kr_quic_send_flag_t flags)
{
pl_quic_sess_data_t *quic = (pl_quic_sess_data_t *)sess_data;
return kr_error(EINVAL);
} else if ((conn->flags & KR_QUIC_CONN_BLOCKED) && !(flags & KR_QUIC_SEND_IGNORE_BLOCKED)) {
return kr_error(EINVAL);
+ // TODO
// } else if (reply->handle_ret > 0) {
// return send_special(quic_table, reply, conn);
} else if (conn == NULL) {
unsigned sent_msgs = 0, stream_msgs = 0, ignore_last = ((flags & KR_QUIC_SEND_IGNORE_LASTBYTE) ? 1 : 0);
int ret = 1;
- /* KnotDNS stores data to be written into a replay in the unsent_obuf
- * since this will be called from pl_quic_wrap, and I'll have the
- * payload in the form of a iovec anyway, I can just change the
- * conditionals a bit and call send_Stream with the payload iovec */
+ /* This is really wasteful, we always have payload for one stream. */
for (int64_t si = 0; si < conn->streams_count && sent_msgs < max_msgs; /* NO INCREMENT */) {
int64_t stream_id = 4 * (conn->first_stream_id + si);
ngtcp2_ssize sent = 0;
size_t uf = conn->streams[si].unsent_offset;
kr_quic_obuf_t *uo = conn->streams[si].unsent_obuf;
-
- kr_require(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF);
-
if (uo == NULL) {
si++;
// continue;
}
// bool fin = (((node_t *)uo->node.next)->next == NULL) && ignore_last == 0;
- // size_t len = 5534u; /* size of the following buffer */
- // uint8_t *data = /* alloc buffer */ NULL;
- // ret = quic_package_payload(quic_table, ctx->payload, sess_data,
- // NULL, stream_id, data, len, fin, &sent);
-
- // ret = send_stream(ctx, conn, stream_id,
- // (uint8_t *)uo->buf + uf, uo->len - uf - ignore_last,
- // 0/* FIXME `fin` probably for client side "request sent" */,
- // &sent);
-
- ret = send_stream(ctx, conn, stream_id,
- wire_buf_data(ctx->payload.wire_buf),
- wire_buf_data_length(ctx->payload.wire_buf),
- // ctx->req->answer->wire,
- // ctx->req->answer->size,
- 0/* FIXME `fin` probably for client side "request sent" */,
- &sent);
-
+ // ret = send_stream(ctx, conn, stream_id, (uint8_t *)uo->buf + uf,
+ // uo->len - uf - ignore_last, fin, &sent);
+
+ if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
+ ret = send_stream(ctx, conn, stream_id,
+ wire_buf_data(ctx->payload.wire_buf),
+ wire_buf_data_length(ctx->payload.wire_buf),
+ // ctx->req->answer->wire,
+ // ctx->req->answer->size,
+ 0/* FIXME `fin` probably for client side "request sent" */,
+ &sent);
+
+ } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
+ kr_require(ctx->payload.iovec.cnt == 2);
+ ret = send_stream(ctx, conn, stream_id,
+ ctx->payload.iovec.iov[1].iov_base,
+ ctx->payload.iovec.iov[1].iov_len,
+ // ctx->req->answer->wire,
+ // ctx->req->answer->size,
+ 0/* fixme `fin` probably for client side "request sent" */,
+ &sent);
+
+ } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
+ ret = send_stream(ctx, conn, stream_id,
+ ctx->payload.buffer.buf,
+ ctx->payload.buffer.len,
+ // ctx->req->answer->wire,
+ // ctx->req->answer->size,
+ 0/* fixme `fin` probably for client side "request sent" */,
+ &sent);
+ } else {
+ kr_assert(false && "Invalid payload type\n");
+ return kr_error(EINVAL);
+ }
if (ret < 0) {
return ret;
if (sent > 0 && ignore_last > 0) {
sent++;
}
-
- if (sent > 0) {
- // TODO
- // kr_quic_stream_mark_sent(conn, stream_id, sent);
- }
+ // if (sent > 0) {
+ // kr_quic_stream_mark_sent(conn, stream_id, sent);
+ // }
if (stream_msgs >= max_msgs / conn->streams_count) {
stream_msgs = 0;
return ret;
}
+/* For now we assume any iovec payload we get
+ * will just be a single (the second iovec, first one holds size)
+ * giant buffer. FIXME if proper iovec support ever comes. */
static enum protolayer_iter_cb_result pl_quic_wrap(
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
queue_push(quic->wrap_queue, ctx);
ngtcp2_cid *dcid = ctx->comm_storage.target;
+ kr_log_info(DOQ, "Quic wrap prototype: %s\n",
+ protolayer_payload_name(ctx->payload.type));
+
while (protolayer_queue_has_payload(&quic->wrap_queue)) {
struct protolayer_iter_ctx *data = queue_head(quic->wrap_queue);
- kr_log_info(DOQ, "queue_len: %zu\n", queue_len(quic->wrap_queue));
queue_pop(quic->wrap_queue);
if (!data || !data->comm || !data->comm->target) {
kr_quic_conn_t *conn = kr_quic_table_lookup(dcid, quic->conn_table);
if (!conn) {
kr_log_warning(DOQ, "Missing associated connection\n");
- int ret = kr_quic_send(quic->conn_table,
- conn, sess_data, ctx, 1, 0);
+ /* There is nothing we can do */
return protolayer_break(ctx, EINVAL /* TODO */);
- // return -1; // TODO
}
+ /* Here we will actually have payload to be sent out
+ * TODO assert that requirement? */
kr_quic_send(quic->conn_table,
conn,
sess_data,
- data,
+ ctx,
QUIC_MAX_SEND_PER_RECV,
0 /* no flags */);
kr_log_info(DOQ, "About to continue from quic_wrap: %s\n",
protolayer_payload_name(data->payload.type));
- return protolayer_continue(data);
- // data->async_mode = true;
- // protolayer_async();
- // return protolayer_continue(data);
- // kr_log_info(DOQ, "protolayer_continue returned %d\n", ret);
- // return protolayer_async();
+ return protolayer_continue(ctx);
}
- /* We had nothing to send TODO error*/
+ /* We had nothing to send TODO error */
return protolayer_break(ctx, kr_ok());
- // return protolayer_continue(ctx);
- // return protolayer_break(ctx, PROTOLAYER_RET_NORMAL);
}
static enum protolayer_event_cb_result pl_quic_event_unwrap(
req->qsource.comm_flags.quic = true;
struct pl_quic_sess_data *quic = sess_data;
quic->req = req;
+
+ // req->qsource.stream_id = session->comm_storage.target;
}
__attribute__((constructor))