*/
struct buffer *buf;
- struct eb64_node offset;
+ uint64_t offset;
uint64_t len;
/* for TX pointer into <buf> field.
case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F: {
struct qf_stream *f = &frm->stream;
len += 1 + quic_int_getsize(f->id) +
- ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset.key) : 0) +
+ ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset) : 0) +
((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) ? quic_int_getsize(f->len) : 0) + f->len;
break;
}
struct buffer cf_buf;
/* Set offset bit if not already there. */
- strm_frm->offset.key += data;
+ strm_frm->offset += data;
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
strm_frm->len -= data;
* can be freed in strict order.
*/
struct qc_stream_buf {
- struct eb_root acked_frms; /* storage for out-of-order ACKs */
+ struct eb_root ack_tree; /* storage for out-of-order ACKs */
struct eb64_node offset_node; /* node for qc_stream_desc buf tree */
struct buffer buf; /* STREAM payload */
int sbuf;
void *ctx; /* notify context */
};
+/* Represents a range of acknowledged data that cannot be immediately deleted. */
+struct qc_stream_ack {
+ struct eb64_node offset_node; /* range starting offset, used as attach point to streambuf <ack_tree>. */
+ uint64_t len; /* length of the acknowledged range */
+ int fin; /* set if the related STREAM frame had FIN bit set */
+};
+
#endif /* USE_QUIC */
#endif /* _HAPROXY_QUIC_STREAM_T_H_ */
#include <haproxy/quic_stream-t.h>
struct quic_conn;
-struct quic_frame;
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type, void *ctx,
struct quic_conn *qc);
void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size,
void *new_ctx);
-int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm);
+int qc_stream_desc_ack(struct qc_stream_desc *stream,
+ uint64_t offset, uint64_t len, int fin);
void qc_stream_desc_free(struct qc_stream_desc *stream, int closing);
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
frm->stream.stream = qcs->stream;
frm->stream.id = qcs->id;
- frm->stream.offset.key = 0;
+ frm->stream.offset = 0;
frm->stream.dup = 0;
if (total) {
if (qcs->tx.fc.off_real) {
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
- frm->stream.offset.key = qcs->tx.fc.off_real;
+ frm->stream.offset = qcs->tx.fc.off_real;
}
/* Always set length bit as we do not know if there is remaining frames
{
struct qcs_build_stream_trace_arg arg = {
.len = frm->stream.len, .fin = fin,
- .offset = frm->stream.offset.key,
+ .offset = frm->stream.offset,
};
TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_BUILD_STRM,
qcc->conn, qcs, &arg);
chunk_appendf(&trace_buf, " uni=%d fin=%d id=%llu off=%llu len=%llu",
!!(strm_frm->id & QUIC_STREAM_FRAME_ID_DIR_BIT),
!!(frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT),
- (ull)strm_frm->id, (ull)strm_frm->offset.key, (ull)strm_frm->len);
+ (ull)strm_frm->id, (ull)strm_frm->offset, (ull)strm_frm->len);
break;
}
case QUIC_FT_MAX_DATA:
/* Caller must set OFF bit if and only if a non-null offset is used. */
BUG_ON(!!(frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) !=
- !!strm_frm->offset.key);
+ !!strm_frm->offset);
if (!quic_enc_int(pos, end, strm_frm->id) ||
- ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(pos, end, strm_frm->offset.key)) ||
+ ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(pos, end, strm_frm->offset)) ||
((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) &&
(!quic_enc_int(pos, end, strm_frm->len) || end - *pos < strm_frm->len)))
return 0;
return 0;
/* Offset parsing */
- if (!(frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)) {
- strm_frm->offset.key = 0;
- }
- else if (!quic_dec_int((uint64_t *)&strm_frm->offset.key, pos, end))
+ if (!(frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT))
+ strm_frm->offset = 0;
+ else if (!quic_dec_int((uint64_t *)&strm_frm->offset, pos, end))
return 0;
/* Length parsing */
}
/* Frame cannot advertise FIN for a smaller data range. */
- BUG_ON(frm_fin && frm->offset.key + frm->len < s->ack_offset);
+ BUG_ON(frm_fin && frm->offset + frm->len < s->ack_offset);
- if (frm->offset.key + frm->len < s->ack_offset ||
- (frm->offset.key + frm->len == s->ack_offset &&
+ if (frm->offset + frm->len < s->ack_offset ||
+ (frm->offset + frm->len == s->ack_offset &&
(!frm_fin || !(s->flags & QC_SD_FL_WAIT_FOR_FIN)))) {
TRACE_DEVEL("STREAM frame already acked : fully acked range", QUIC_EV_CONN_PRSAFRM, qc, f);
return 1;
}
- if (frm->offset.key < s->ack_offset &&
- frm->offset.key + frm->len > s->ack_offset) {
+ if (frm->offset < s->ack_offset &&
+ frm->offset + frm->len > s->ack_offset) {
/* Data range partially acked, remove it from STREAM frame. */
- const uint64_t diff = s->ack_offset - frm->offset.key;
+ const uint64_t diff = s->ack_offset - frm->offset;
TRACE_DEVEL("updated partially acked frame", QUIC_EV_CONN_PRSAFRM, qc, f);
qc_stream_frm_mv_fwd(f, diff);
}
/* Handle <frm> frame whose packet it is attached to has just been acknowledged. The memory allocated
* for this frame will be at least released in every cases.
- * Never fail.
+ *
+ * Returns 1 on sucess else 0.
*/
-static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *frm)
+static int qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *frm)
{
+ int ret = 0;
+
TRACE_ENTER(QUIC_EV_CONN_PRSAFRM, qc);
TRACE_PROTO("RX ack TX frm", QUIC_EV_CONN_PRSAFRM, qc, frm);
struct qf_stream *strm_frm = &frm->stream;
struct eb64_node *node = NULL;
struct qc_stream_desc *stream = NULL;
+ int ack;
/* do not use strm_frm->stream as the qc_stream_desc instance
* might be freed at this stage. Use the id to do a proper
/* early return */
goto leave;
}
- stream = eb64_entry(node, struct qc_stream_desc, by_id);
-
- if (!qc_stream_desc_ack(stream, frm)) {
- TRACE_DEVEL("stream consumed on ACK received", QUIC_EV_CONN_ACKSTRM,
- qc, strm_frm, stream);
+ else {
+ stream = eb64_entry(node, struct qc_stream_desc, by_id);
+
+ ack = qc_stream_desc_ack(stream, strm_frm->offset,
+ strm_frm->len,
+ frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT);
+ if (!ack) {
+ TRACE_DEVEL("stream consumed on ACK received",
+ QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
+
+ if (qc_stream_desc_done(stream)) {
+ /* no need to continue if stream freed. */
+ TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
+ qc_check_close_on_released_mux(qc);
+ }
- if (qc_stream_desc_done(stream)) {
- /* no need to continue if stream freed. */
- TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
- qc_check_close_on_released_mux(qc);
+ qc_release_frm(qc, frm);
+ }
+ else if (ack > 0) {
+ TRACE_DEVEL("handled out-of-order stream ACK",
+ QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
+ qc_release_frm(qc, frm);
+ }
+ else {
+ /* Fatal error during qc_stream_desc_ack(). */
+ goto leave;
}
-
- qc_release_frm(qc, frm);
- }
- else {
- TRACE_DEVEL("handled out-of-order stream ACK", QUIC_EV_CONN_ACKSTRM,
- qc, strm_frm, stream);
}
}
break;
qc_release_frm(qc, frm);
}
+ ret = 1;
leave:
TRACE_LEAVE(QUIC_EV_CONN_PRSAFRM, qc);
+ return ret;
}
/* Collect newly acknowledged TX packets from <pkts> ebtree into <newly_acked_pkts>
TRACE_LEAVE(QUIC_EV_CONN_PRSAFRM, qc);
}
-/* Handle <newly_acked_pkts> list of newly acknowledged TX packets */
-static void qc_handle_newly_acked_pkts(struct quic_conn *qc,
- unsigned int *pkt_flags, struct list *newly_acked_pkts)
+/* Handle <newly_acked_pkts> list of newly acknowledged TX packets.
+ *
+ * Returns 1 on sucess else 0.
+ */
+static int qc_handle_newly_acked_pkts(struct quic_conn *qc,
+ unsigned int *pkt_flags, struct list *newly_acked_pkts)
{
struct quic_tx_packet *pkt, *tmp;
+ int ret = 0;
TRACE_ENTER(QUIC_EV_CONN_PRSAFRM, qc);
*pkt_flags |= pkt->flags;
TRACE_DEVEL("Removing packet #", QUIC_EV_CONN_PRSAFRM, qc, NULL, &pkt->pn_node.key);
- list_for_each_entry_safe(frm, frmbak, &pkt->frms, list)
- qc_handle_newly_acked_frm(qc, frm);
+ list_for_each_entry_safe(frm, frmbak, &pkt->frms, list) {
+ if (!qc_handle_newly_acked_frm(qc, frm))
+ goto leave;
+ }
/* If there are others packet in the same datagram <pkt> is attached to,
* detach the previous one and the next one from <pkt>.
*/
eb64_delete(&pkt->pn_node);
}
+ ret = 1;
leave:
TRACE_LEAVE(QUIC_EV_CONN_PRSAFRM, qc);
+ return ret;
}
/* Handle all frames sent from <pkt> packet and reinsert them in the same order
} while (1);
if (!LIST_ISEMPTY(&newly_acked_pkts)) {
- qc_handle_newly_acked_pkts(qc, &pkt_flags, &newly_acked_pkts);
+ if (!qc_handle_newly_acked_pkts(qc, &pkt_flags, &newly_acked_pkts))
+ goto leave;
+
if (new_largest_acked_pn && (pkt_flags & QUIC_FL_TX_PACKET_ACK_ELICITING)) {
*rtt_sample = tick_remain(time_sent, now_ms);
qel->pktns->rx.largest_acked_pn = ack_frm->largest_ack;
TRACE_ENTER(QUIC_EV_CONN_PRSFRM, qc);
ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
- strm_frm->offset.key, fin, (char *)strm_frm->data);
+ strm_frm->offset, fin, (char *)strm_frm->data);
/* frame rejected - packet must not be acknowledeged */
TRACE_LEAVE(QUIC_EV_CONN_PRSFRM, qc);
#include <haproxy/mux_quic.h>
#include <haproxy/pool.h>
#include <haproxy/quic_conn.h>
-#include <haproxy/quic_frame-t.h>
#include <haproxy/task.h>
DECLARE_STATIC_POOL(pool_head_quic_stream_desc, "qc_stream_desc",
sizeof(struct qc_stream_desc));
DECLARE_STATIC_POOL(pool_head_quic_stream_buf, "qc_stream_buf",
sizeof(struct qc_stream_buf));
+DECLARE_STATIC_POOL(pool_head_quic_stream_ack, "qc_stream_ack",
+ sizeof(struct qc_stream_ack));
static struct pool_head *pool_head_sbuf;
uint64_t room;
/* Caller is responsible to remove buffered ACK frames before destroying a buffer instance. */
- BUG_ON(!eb_is_empty(&(*stream_buf)->acked_frms));
+ BUG_ON(!eb_is_empty(&(*stream_buf)->ack_tree));
eb64_delete(&(*stream_buf)->offset_node);
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
}
- if (!b_data(&buf->buf) && eb_is_empty(&buf->acked_frms)) {
+ if (!b_data(&buf->buf) && eb_is_empty(&buf->ack_tree)) {
qc_stream_buf_free(stream, &buf);
/* Retrieve next buffer instance. */
buf = !eb_is_empty(&stream->buf_tree) ?
static void qc_stream_buf_consume(struct qc_stream_buf *stream_buf,
struct qc_stream_desc *stream)
{
- struct quic_conn *qc = stream->qc;
- struct eb64_node *frm_node;
- struct qf_stream *strm_frm;
- struct quic_frame *frm;
- uint64_t offset, len;
- int fin;
-
- frm_node = eb64_first(&stream_buf->acked_frms);
- while (frm_node) {
- strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
- frm = container_of(strm_frm, struct quic_frame, stream);
-
- offset = strm_frm->offset.key;
- len = strm_frm->len;
- fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
-
- if (offset > stream->ack_offset)
+ struct qc_stream_ack *ack;
+ struct eb64_node *ack_node;
+
+ ack_node = eb64_first(&stream_buf->ack_tree);
+ while (ack_node) {
+ ack = eb64_entry(ack_node, struct qc_stream_ack, offset_node);
+ if (ack->offset_node.key > stream->ack_offset)
break;
- /* Delete frame before acknowledged it. This prevents BUG_ON()
- * on non-empty acked_frms tree when stream_buf is empty and removed.
+ /* Delete range before acknowledged it. This prevents BUG_ON()
+ * on non-empty ack_tree tree when stream_buf is empty and removed.
*/
- eb64_delete(frm_node);
- stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
- qc_release_frm(qc, frm);
+ eb64_delete(ack_node);
+ stream_buf = qc_stream_buf_ack(stream_buf, stream,
+ ack->offset_node.key, ack->len, ack->fin);
+ pool_free(pool_head_quic_stream_ack, ack);
- frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL;
+ ack_node = stream_buf ? eb64_first(&stream_buf->ack_tree) : NULL;
}
}
* Returns 0 if the frame has been handled and can be removed.
* Returns a positive value if acknowledgement is out-of-order and
* corresponding STREAM frame has been buffered.
+ * Returns a negative value on fatal error.
*/
-int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm)
+int qc_stream_desc_ack(struct qc_stream_desc *stream,
+ uint64_t offset, uint64_t len, int fin)
{
- struct qf_stream *strm_frm = &frm->stream;
- const uint64_t offset = strm_frm->offset.key;
- const uint64_t len = strm_frm->len;
- const int fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
-
struct qc_stream_buf *stream_buf = NULL;
struct eb64_node *buf_node;
int ret = 0;
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
}
else if (offset > stream->ack_offset) {
+ struct qc_stream_ack *ack;
+
buf_node = eb64_lookup_le(&stream->buf_tree, offset);
BUG_ON(!buf_node); /* Cannot acknowledged a STREAM frame for a non existing buffer. */
stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
- eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
+
+ ack = pool_alloc(pool_head_quic_stream_ack);
+ if (!ack)
+ return -1;
+
+ ack->offset_node.key = offset;
+ ack->len = len;
+ ack->fin = fin;
+
+ eb64_insert(&stream_buf->ack_tree, &ack->offset_node);
ret = 1;
}
else if (offset + len > stream->ack_offset) {
void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
{
struct qc_stream_buf *buf;
- struct quic_conn *qc = stream->qc;
- struct eb64_node *frm_node, *buf_node;
+ struct eb64_node *ack_node, *buf_node;
unsigned int free_count = 0;
/* This function only deals with released streams. */
BUG_ON(b_data(&buf->buf) && !closing);
/* qc_stream_desc might be freed before having received all its ACKs. */
- while (!eb_is_empty(&buf->acked_frms)) {
- struct qf_stream *strm_frm;
- struct quic_frame *frm;
+ while (!eb_is_empty(&buf->ack_tree)) {
+ struct qc_stream_ack *ack;
- frm_node = eb64_first(&buf->acked_frms);
- eb64_delete(frm_node);
+ ack_node = eb64_first(&buf->ack_tree);
+ eb64_delete(ack_node);
- strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
- frm = container_of(strm_frm, struct quic_frame, stream);
- qc_release_frm(qc, frm);
+ ack = eb64_entry(ack_node, struct qc_stream_ack, offset_node);
+ pool_free(pool_head_quic_stream_ack, ack);
}
if (buf->sbuf)
if (!stream->buf)
return NULL;
- stream->buf->acked_frms = EB_ROOT;
+ stream->buf->ack_tree = EB_ROOT;
stream->buf->buf = BUF_NULL;
stream->buf->offset_node.key = offset;
const struct qc_stream_desc *stream = a3;
if (strm_frm)
- chunk_appendf(&trace_buf, " off=%llu len=%llu", (ull)strm_frm->offset.key, (ull)strm_frm->len);
+ chunk_appendf(&trace_buf, " off=%llu len=%llu", (ull)strm_frm->offset, (ull)strm_frm->len);
if (stream)
chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)stream->ack_offset);
}
* excepting the variable ones. Note that +1 is for the type of this frame.
*/
hlen = 1 + quic_int_getsize(cf->stream.id) +
- ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset.key) : 0);
+ ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset) : 0);
/* Compute the data length of this STREAM frame. */
avail_room = room - hlen;
if ((ssize_t)avail_room <= 0)
LIST_APPEND(outlist, &cf->list);
qc_stream_desc_send(cf->stream.stream,
- cf->stream.offset.key,
+ cf->stream.offset,
cf->stream.len);
}
else {
b_size(cf->stream.buf),
(char *)cf->stream.data - b_orig(cf->stream.buf), 0);
cf->stream.len -= dlen;
- cf->stream.offset.key += dlen;
+ cf->stream.offset += dlen;
cf->stream.data = (unsigned char *)b_peek(&cf_buf, dlen);
qc_stream_desc_send(new_cf->stream.stream,
- new_cf->stream.offset.key,
+ new_cf->stream.offset,
new_cf->stream.len);
}