struct session *sess;
struct qcc *qcc;
struct eb64_node by_id; /* place in qcc's streams_by_id */
- struct eb_root frms;
uint64_t id; /* stream ID */
uint32_t flags; /* QC_SF_* */
struct {
uint64_t offset; /* the current offset of received data */
uint64_t bytes; /* number of bytes received */
struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
+ struct eb_root frms; /* received frames ordered by their offsets */
} rx;
struct {
enum qcs_tx_st st; /* TX state */
uint64_t max_data; /* maximum number of bytes which may be sent */
uint64_t offset; /* the current offset of data to send */
uint64_t bytes; /* number of bytes sent */
+ uint64_t ack_offset; /* last acked ordered byte offset */
+ struct eb_root acked_frms; /* acked frames ordered by their offsets */
struct buffer buf; /* transmit buffer, always valid (buf_empty or real buffer) */
struct buffer mbuf[QCC_MBUF_CNT];
uint64_t left; /* data currently stored in mbuf waiting for send */
#include <haproxy/list.h>
+#include <import/eb64tree.h>
+
/* QUIC frame types. */
enum quic_frame_type {
QUIC_FT_PADDING = 0x00,
struct quic_stream {
uint64_t id;
- uint64_t offset;
+ struct qcs *qcs;
+ struct buffer *buf;
+ struct eb64_node offset;
uint64_t len;
const unsigned char *data;
};
case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F: {
struct quic_stream *f = &frm->stream;
len += 1 + quic_int_getsize(f->id) +
- ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset) : 0) +
+ ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset.key) : 0) +
((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) ? quic_int_getsize(f->len) : 0) + f->len;
break;
}
qcs->qcc = qcc;
qcs->cs = NULL;
qcs->id = qcs->by_id.key = id;
- qcs->frms = EB_ROOT_UNIQUE;
qcs->flags = QC_SF_NONE;
qcs->rx.buf = BUF_NULL;
qcs->rx.st = QC_RX_SS_IDLE;
qcs->rx.bytes = qcs->rx.offset = 0;
qcs->rx.max_data = qcc->strms[qcs_type].rx.max_data;
-
qcs->rx.buf = BUF_NULL;
+ qcs->rx.frms = EB_ROOT_UNIQUE;
+
qcs->tx.st = QC_TX_SS_IDLE;
- qcs->tx.bytes = qcs->tx.offset = 0;
+ qcs->tx.bytes = qcs->tx.offset = qcs->tx.ack_offset = 0;
+ qcs->tx.acked_frms = EB_ROOT_UNIQUE;
qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
- qcs->tx.buf = BUF_NULL;
+ qcs->tx.buf = BUF_NULL;
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
qcs->tx.left = 0;
qcs->qcc = qcc;
qcs->cs = NULL;
qcs->id = qcs->by_id.key = next_id;
- qcs->frms = EB_ROOT_UNIQUE;
qcs->flags = QC_SF_NONE;
- qcs->tx.st = QC_TX_SS_IDLE;
- qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
- qcs->tx.offset = qcs->tx.bytes = 0;
- qcs->tx.buf = BUF_NULL;
+ qcs->tx.st = QC_TX_SS_IDLE;
+ qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
+ qcs->tx.offset = qcs->tx.bytes = qcs->tx.ack_offset = 0;
+ qcs->tx.acked_frms = EB_ROOT_UNIQUE;
+ qcs->tx.buf = BUF_NULL;
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
qcs->tx.left = 0;
qcs->qcc = qcc;
qcs->id = qcs->by_id.key = id;
- qcs->frms = EB_ROOT_UNIQUE;
qcs->flags = QC_SF_NONE;
qcs->rx.st = QC_RX_SS_IDLE;
qcs->rx.max_data = qcc->strms[qcs_type].rx.max_data;
qcs->rx.offset = qcs->rx.bytes = 0;
qcs->rx.buf = BUF_NULL;
+ qcs->rx.frms = EB_ROOT_UNIQUE;
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
qcs->tx.left = 0;
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
{
struct quic_frame *frm;
- struct buffer buf = BUF_NULL;
+ struct buffer *buf = &qcs->tx.buf;
+ struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
int total = 0;
- qc_get_buf(qcs->qcc, &buf);
- total = b_xfer(&buf, payload, b_data(payload));
-
+ qc_get_buf(qcs->qcc, buf);
+ total = b_force_xfer(buf, payload, QUIC_MIN(b_data(payload), b_room(buf)));
frm = pool_zalloc(pool_head_quic_frame);
if (!frm)
goto err;
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
if (offset) {
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
- frm->stream.offset = offset;
+ frm->stream.offset.key = offset;
}
+ frm->stream.qcs = qcs;
+ frm->stream.buf = buf;
frm->stream.id = qcs->by_id.key;
if (total) {
frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
frm->stream.len = total;
- frm->stream.data = (unsigned char *)b_head(&buf);
}
- struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
MT_LIST_APPEND(&qel->pktns->tx.frms, &frm->mt_list);
fprintf(stderr, "%s: total=%d fin=%d offset=%lu\n", __func__, total, fin, offset);
return total;
struct quic_frame *frm, struct quic_conn *conn)
{
struct quic_stream *stream = &frm->stream;
+ size_t offset, block1, block2;
+ struct buffer b;
if (!quic_enc_int(buf, end, stream->id) ||
- ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(buf, end, stream->offset)) ||
+ ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(buf, end, stream->offset.key)) ||
((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) &&
(!quic_enc_int(buf, end, stream->len) || end - *buf < stream->len)))
return 0;
- memcpy(*buf, stream->data, stream->len);
- *buf += stream->len;
+ /* Buffer copy */
+ b = *stream->buf;
+ offset = (frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ?
+ stream->offset.key & (b_size(stream->buf) - 1): 0;
+ block1 = b_wrap(&b) - (b_orig(&b) + offset);
+ if (block1 > stream->len)
+ block1 = stream->len;
+ block2 = stream->len - block1;
+ memcpy(*buf, b_orig(&b) + offset, block1);
+ *buf += block1;
+ if (block2) {
+ memcpy(*buf, b_orig(&b), block2);
+ *buf += block2;
+ }
return 1;
}
/* Offset parsing */
if (!(frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)) {
- stream->offset = 0;
+ stream->offset.key = 0;
}
- else if (!quic_dec_int(&stream->offset, buf, end))
+ else if (!quic_dec_int((uint64_t *)&stream->offset.key, buf, end))
return 0;
/* Length parsing */
!!(s->id & QUIC_STREAM_FRAME_ID_DIR_BIT),
!!(frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT),
(unsigned long long)s->id,
- (unsigned long long)s->offset,
+ (unsigned long long)s->offset.key,
(unsigned long long)s->len);
}
}
return 1;
}
+/* Remove from <qcs> stream the acknowledged frames.
+ * Never fails.
+ */
+static void qcs_try_to_consume(struct qcs *qcs)
+{
+ struct eb64_node *frm_node;
+
+ frm_node = eb64_first(&qcs->tx.acked_frms);
+ while (frm_node) {
+ struct quic_stream *strm;
+
+ strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
+ if (strm->offset.key != qcs->tx.ack_offset)
+ break;
+
+ b_del(strm->buf, strm->len);
+ qcs->tx.ack_offset += strm->len;
+ frm_node = eb64_next(frm_node);
+ eb64_delete(&strm->offset);
+ }
+}
+
/* Treat <frm> frame whose packet it is attached to has just been acknowledged. */
static inline void qc_treat_acked_tx_frm(struct quic_frame *frm,
struct ssl_sock_ctx *ctx)
{
+
TRACE_PROTO("Removing frame", QUIC_EV_CONN_PRSAFRM, ctx->conn, frm);
- LIST_DELETE(&frm->list);
- pool_free(pool_head_quic_frame, frm);
+ switch (frm->type) {
+ case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
+ {
+ struct qcs *qcs = frm->stream.qcs;
+ struct quic_stream *strm = &frm->stream;
+
+ if (qcs->tx.ack_offset == strm->offset.key) {
+ b_del(strm->buf, strm->len);
+ qcs->tx.ack_offset += strm->len;
+ LIST_DELETE(&frm->list);
+ pool_free(pool_head_quic_frame, frm);
+ }
+ else {
+ eb64_insert(&qcs->tx.acked_frms, &strm->offset);
+ }
+ qcs_try_to_consume(qcs);
+ }
+ break;
+ default:
+ LIST_DELETE(&frm->list);
+ pool_free(pool_head_quic_frame, frm);
+ }
}
/* Remove <largest> down to <smallest> node entries from <pkts> tree of TX packet,
frm = pool_alloc(pool_head_quic_rx_strm_frm);
if (frm) {
- frm->offset_node.key = stream_frm->offset;
+ frm->offset_node.key = stream_frm->offset.key;
frm->len = stream_frm->len;
frm->data = stream_frm->data;
frm->pkt = pkt;
try = strm_frm->len;
memcpy(b_tail(buf), strm_frm->data, try);
strm_frm->len -= try;
- strm_frm->offset += try;
+ strm_frm->offset.key += try;
b_add(buf, try);
ret += try;
}
}
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
- frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
+ frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key);
/* FIXME: handle the case where this frame overlap others */
if (frm_node) {
TRACE_PROTO("Already existing stream data",
goto out;
}
- if (strm_frm->offset == strm->rx.offset) {
+ if (strm_frm->offset.key == strm->rx.offset) {
int ret;
if (!qc_get_buf(qc->qcc, &strm->rx.buf))
return 0;
}
- eb64_insert(&strm->frms, &frm->offset_node);
+ eb64_insert(&strm->rx.frms, &frm->offset_node);
quic_rx_packet_refinc(pkt);
out:
}
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
- frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
+ frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key);
/* FIXME: handle the case where this frame overlap others */
if (frm_node) {
TRACE_PROTO("Already existing stream data",
}
strm_frm_len = strm_frm->len;
- if (strm_frm->offset == strm->rx.offset) {
+ if (strm_frm->offset.key == strm->rx.offset) {
int ret;
if (!qc_get_buf(qc->qcc, &strm->rx.buf))
if (ret)
ruqs_notify_recv(strm);
- strm_frm->offset += ret;
+ strm_frm->offset.key += ret;
}
/* Take this frame into an account for the stream flow control */
strm->rx.offset += strm_frm_len;
return 0;
}
- eb64_insert(&strm->frms, &frm->offset_node);
+ eb64_insert(&strm->rx.frms, &frm->offset_node);
quic_rx_packet_refinc(pkt);
out:
* excepting the variable ones. Note that +1 is for the type of this frame.
*/
hlen = 1 + quic_int_getsize(cf->stream.id) +
- ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset) : 0);
+ ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset.key) : 0);
/* Compute the data length of this STREAM frame. */
avail_room = room - hlen - *len;
if ((ssize_t)avail_room <= 0)
- continue;
+ break;
if (cf->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) {
dlen = max_available_room(avail_room, &dlen_sz);
}
new_cf->type = cf->type;
+ new_cf->stream.qcs = cf->stream.qcs;
+ new_cf->stream.buf = cf->stream.buf;
new_cf->stream.id = cf->stream.id;
if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)
new_cf->stream.offset = cf->stream.offset;
cf->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
/* Consume <dlen> bytes of the current frame. */
cf->stream.len -= dlen;
- cf->stream.offset += dlen;
+ cf->stream.offset.key += dlen;
cf->stream.data += dlen;
}
break;