struct kr_quic_conn *qconn = (struct kr_quic_conn *)user_data;
assert(ctx->conn == conn);
- kr_log_info(DOQ, "recved stream data: %s\n", data);
int ret = kr_quic_stream_recv_data(qconn, stream_id, data, datalen,
(flags & NGTCP2_STREAM_DATA_FLAG_FIN));
return ret == KNOT_EOK ? 0 : NGTCP2_ERR_CALLBACK_FAILURE;
}
-
-// TODO Will likely be removed once the proper buffer scheme for
-// pl is figured out
-uint64_t buffer_alloc_size(uint64_t buffer_len)
-{
- if (buffer_len == 0) {
- return 0;
- }
- buffer_len -= 1;
- buffer_len |= 0x3f; // the result will be at least 64
- buffer_len |= (buffer_len >> 1);
- buffer_len |= (buffer_len >> 2);
- buffer_len |= (buffer_len >> 4);
- buffer_len |= (buffer_len >> 8);
- buffer_len |= (buffer_len >> 16);
- buffer_len |= (buffer_len >> 32);
- return buffer_len + 1;
-}
-
void kr_quic_table_rem2(kr_quic_cid_t **pcid, kr_quic_table_t *table)
{
kr_quic_cid_t *cid = *pcid;
// ignore for now; allow any
params.max_idle_timeout = 0;
- params.stateless_reset_token_present = 1;
- params.active_connection_id_limit = 8;
+ // params.stateless_reset_token_present = 1;
+ // params.active_connection_id_limit = 8;
if (odcid != NULL) {
params.original_dcid = *odcid;
void __attribute__ ((noinline)) empty_call(void) { }
-static int collect_queries(struct kr_quic_conn *qconn)
+static int collect_queries(struct protolayer_iter_ctx *ctx,
+ struct kr_quic_conn *qconn, struct quic_target *target)
{
// if (0/* use dns_dgram (iovec input)*/) {
kr_require(wire_buf_data_length(&qconn->unwrap_buf) == 0);
size_t free_space = wire_buf_free_space_length(&qconn->unwrap_buf);
+ uint16_t queries_agregated = 0;
int64_t stream_id;
struct kr_quic_stream *stream;
&stream_id)) != NULL) {
size_t to_write = wire_buf_data_length(&stream->pers_inbuf);
+ ctx->payload = protolayer_payload_wire_buf(&stream->pers_inbuf,
+ false);
+
+ target->stream_id = stream_id;
+ ctx->comm->target = target;
+ // session2_unwrap_after(ctx->session, PROTOLAYER_TYPE_QUIC,
+ // protolayer_payload_wire_buf(&stream->pers_inbuf, false),
+ // ctx->comm,
+ // ctx->finished_cb,
+ // ctx->finished_cb_baton);
+
kr_assert(to_write > 0);
if (to_write > free_space) {
kr_log_error(DOQ, "unwrap buf is not big enough\n");
wire_buf_data(&stream->pers_inbuf), to_write);
wire_buf_consume(&qconn->unwrap_buf, to_write);
wire_buf_trim(&stream->pers_inbuf, to_write);
- /* TODO inspect assembly diff compared to repeated
- * call of wire_buf_free_space_length
- * should make no difference though*/
free_space -= to_write;
+ ++queries_agregated;
}
- return wire_buf_data(&qconn->unwrap_buf) > 0
- ? kr_ok() : kr_error(ENODATA);
+ return queries_agregated;
}
static enum protolayer_iter_cb_result pl_quic_unwrap(void *sess_data,
struct pl_quic_sess_data *quic = sess_data;
queue_push(quic->unwrap_queue, ctx);
+ /* TODO Verify this doesn't leak */
+ struct quic_target *target = malloc(sizeof(struct quic_target));
+ /* TODO log failed allocation "iterctx ran out of memory" */
+ kr_require(target);
while (protolayer_queue_has_payload(&quic->unwrap_queue)) {
kr_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF);
goto fail;
}
- /* TODO Verify this doesn't leak */
- ctx->comm->target = malloc(sizeof(ngtcp2_cid));
- /* TODO log failed allocation "iterctx ran out of memory" */
- kr_require(ctx->comm->target);
- memcpy(ctx->comm->target, &dcid, sizeof(ngtcp2_cid));
+ memcpy(&target->dcid, &dcid, sizeof(ngtcp2_cid));
if (qconn->stream_inprocess == -1) {
/* This will produce a reposponse and pass it towards
if (!kr_fails_assert(ctx == ctx_head))
queue_pop(quic->unwrap_queue);
- if ((rv = collect_queries(qconn)) == kr_ok()) {
+ // rv = collect_queries(ctx, qconn, target);
+ if ((rv = collect_queries(ctx, qconn, target)) > 0) {
ctx->payload = protolayer_payload_wire_buf(&qconn->unwrap_buf,
false);
return protolayer_continue(ctx);
}
+ // return protolayer_continue(ctx);
return protolayer_break(ctx, rv);
fail:
ctx_head = queue_head(quic->unwrap_queue);
if (!kr_fails_assert(ctx == ctx_head))
queue_pop(quic->unwrap_queue);
+ free(target); // We asume the folowing protolayers are synchronous
return protolayer_break(ctx, rv);
}
int ret = ngtcp2_conn_open_bidi_stream(qconn->conn,
&opened, NULL);
if (ret != kr_ok()) {
- /** This should not happen */
- kr_log_info(DOQ, "remote endpoint isn't ready for streams: %s (%d)\n",
+ kr_log_warning(DOQ, "remote endpoint isn't ready for streams: %s (%d)\n",
ngtcp2_strerror(ret), ret);
return ret;
}
- assert((bool)(opened == stream_id)
- == kr_quic_stream_exists(qconn, stream_id));
+ kr_require((bool)(opened == stream_id) == kr_quic_stream_exists(qconn, stream_id));
}
uint32_t fl = ((stream_id >= 0 && fin) ? NGTCP2_WRITE_STREAM_FLAG_FIN
kr_require(nwrite || *sent);
}
- /* make sure return is negative
+ /* make sure return is negative
* though the code shouldn't currently get here */
return nwrite <= 0 ? nwrite : -1;
}
unsigned sent_msgs = 0, stream_msgs = 0, ignore_last = ((flags & KR_QUIC_SEND_IGNORE_LASTBYTE) ? 1 : 0);
int ret = 1;
- /* This is really wasteful, we always have payload for one stream. */
for (int64_t si = 0; si < conn->streams_count && sent_msgs < max_msgs; /* NO INCREMENT */) {
int64_t stream_id = 4 * (conn->first_stream_id + si);
kr_quic_obuf_t *uo = conn->streams[si].unsent_obuf;
if (uo == NULL) {
si++;
- // continue;
+ continue;
}
- // bool fin = (((node_t *)uo->node.next)->next == NULL) && ignore_last == 0;
- // ret = send_stream(ctx, conn, stream_id, (uint8_t *)uo->buf + uf,
- // uo->len - uf - ignore_last, fin, &sent);
-
- if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
- ret = send_stream(ctx, conn, stream_id,
- wire_buf_data(ctx->payload.wire_buf),
- wire_buf_data_length(ctx->payload.wire_buf),
- // ctx->req->answer->wire,
- // ctx->req->answer->size,
- 0/* FIXME `fin` probably for client side "request sent" */,
- &sent);
-
- } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
- kr_require(ctx->payload.iovec.cnt == 2);
- ret = send_stream(ctx, conn, stream_id,
- ctx->payload.iovec.iov[1].iov_base,
- ctx->payload.iovec.iov[1].iov_len,
- // ctx->req->answer->wire,
- // ctx->req->answer->size,
- 0/* fixme `fin` probably for client side "request sent" */,
- &sent);
-
- } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
- ret = send_stream(ctx, conn, stream_id,
- ctx->payload.buffer.buf,
- ctx->payload.buffer.len,
- // ctx->req->answer->wire,
- // ctx->req->answer->size,
- 0/* fixme `fin` probably for client side "request sent" */,
- &sent);
- } else {
- kr_assert(false && "Invalid payload type\n");
- return kr_error(EINVAL);
- }
+ bool fin = (((node_t *)uo->node.next)->next == NULL) && ignore_last == 0;
+ ret = send_stream(ctx, conn, stream_id, uo->buf + uf,
+ uo->len - uf - ignore_last, fin, &sent);
if (ret < 0) {
return ret;
if (sent > 0 && ignore_last > 0) {
sent++;
}
- // if (sent > 0) {
- // kr_quic_stream_mark_sent(conn, stream_id, sent);
- // }
+ if (sent > 0) {
+ kr_quic_stream_mark_sent(conn, stream_id, sent);
+ }
if (stream_msgs >= max_msgs / conn->streams_count) {
stream_msgs = 0;
}
ngtcp2_conn_update_pkt_tx_time(conn->conn, quic_timestamp());
- return ret;
+ return sent_msgs;
}
/* For now we assume any iovec payload we get
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
+ int rv;
pl_quic_sess_data_t *quic = sess_data;
queue_push(quic->wrap_queue, ctx);
- ngtcp2_cid *dcid = ctx->comm_storage.target;
+ struct quic_target *target = ctx->comm_storage.target;
+ ngtcp2_cid *dcid = &target->dcid;
+ uint64_t stream_id = target->stream_id;
kr_log_info(DOQ, "Quic wrap prototype: %s\n",
protolayer_payload_name(ctx->payload.type));
return protolayer_break(ctx, EINVAL /* TODO */);
}
+
+ knot_pkt_t *ans = kr_request_ensure_answer(ctx->req);
+ kr_require(ans != NULL);
+
+ kr_require(data->payload.type == PROTOLAYER_PAYLOAD_IOVEC);
+ kr_quic_stream_add_data(conn, stream_id,
+ &data->payload);
+ // data->payload.iovec.iov[i].iov_base,
+ // data->payload.iovec.iov[i].iov_len);
+
/* Here we will actually have payload to be sent out
- * TODO assert that requirement? */
- kr_quic_send(quic->conn_table,
+ * TODO assert that? */
+ rv = kr_quic_send(quic->conn_table,
conn,
sess_data,
ctx,
kr_log_info(DOQ, "About to continue from quic_wrap: %s\n",
protolayer_payload_name(data->payload.type));
+ if (rv == 0)
+ break;
+
return protolayer_continue(ctx);
}
{
kr_log_warning(DOQ, "IN request init\n");
req->qsource.comm_flags.quic = true;
- struct pl_quic_sess_data *quic = sess_data;
- quic->req = req;
+ // struct pl_quic_sess_data *quic = sess_data;
+ // quic->req = req;
// req->qsource.stream_id = session->comm_storage.target;
}