struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
{
struct qcs *qcs;
+ struct qc_stream_desc *stream;
TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
if (!qcs)
goto out;
+ /* allocate transport layer stream descriptor */
+ stream = qc_stream_desc_new(qcc->conn->qc, id, qcs);
+ if (!stream) {
+ pool_free(pool_head_qcs, qcs);
+ qcs = NULL;
+ goto out;
+ }
+
+ qcs->stream = stream;
qcs->qcc = qcc;
qcs->cs = NULL;
qcs->flags = QC_SF_NONE;
qcs->rx.frms = EB_ROOT_UNIQUE;
qcs->tx.buf = BUF_NULL;
- qcs->tx.xprt_buf = BUF_NULL;
qcs->tx.offset = 0;
qcs->tx.sent_offset = 0;
- qcs->tx.ack_offset = 0;
- qcs->tx.acked_frms = EB_ROOT;
qcs->wait_event.tasklet = NULL;
qcs->wait_event.events = 0;
{
b_free(&qcs->rx.buf);
b_free(&qcs->tx.buf);
- b_free(&qcs->tx.xprt_buf);
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams);
--qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams;
+ qc_stream_desc_release(qcs->stream);
+
eb64_delete(&qcs->by_id);
pool_free(pool_head_qcs, qcs);
}
for (i = largest_id + 1; i <= sub_id; i++) {
uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
+
tmp_qcs = qcs_new(qcc, id, type);
if (!tmp_qcs) {
/* allocation failure */
* |xxxxxxxxxxxxxxxxx|
*/
- BUG_ON_HOT(qcs->tx.sent_offset < qcs->tx.ack_offset);
+ BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset);
BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
- head = qcs->tx.sent_offset - qcs->tx.ack_offset;
+ head = qcs->tx.sent_offset - qcs->stream->ack_offset;
left = qcs->tx.offset - qcs->tx.sent_offset;
to_xfer = QUIC_MIN(b_data(payload), b_room(out));
total = b_force_xfer(out, payload, to_xfer);
frm->type = QUIC_FT_STREAM_8;
- frm->stream.qcs = (struct qcs *)qcs;
+ frm->stream.stream = qcs->stream;
frm->stream.id = qcs->by_id.key;
frm->stream.buf = out;
frm->stream.data = (unsigned char *)b_peek(out, head);
while (node) {
struct qcs *qcs = container_of(node, struct qcs, by_id);
struct buffer *buf = &qcs->tx.buf;
- struct buffer *out = &qcs->tx.xprt_buf;
+ struct buffer *out = &qcs->stream->buf;
/* TODO
* for the moment, unidirectional streams have their own
node = eb64_next(node);
if (qcs->flags & QC_SF_DETACH) {
- if ((!b_data(&qcs->tx.buf) && !b_data(&qcs->tx.xprt_buf))) {
+ if (!b_data(&qcs->tx.buf) &&
+ qcs->tx.offset == qcs->tx.sent_offset) {
qcs_destroy(qcs);
release = 1;
}
* managment between xprt and mux is reorganized.
*/
- if ((b_data(&qcs->tx.buf) || b_data(&qcs->tx.xprt_buf))) {
+ if (b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) {
TRACE_DEVEL("leaving with remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs);
qcs->flags |= QC_SF_DETACH;
return;
if (mask & QUIC_EV_CONN_ACKSTRM) {
const struct quic_stream *s = a2;
- const struct qcs *qcs = a3;
+ const struct qc_stream_desc *stream = a3;
if (s)
chunk_appendf(&trace_buf, " off=%llu len=%llu", (ull)s->offset.key, (ull)s->len);
- if (qcs)
- chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)qcs->tx.ack_offset);
+ if (stream)
+ chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)stream->ack_offset);
}
if (mask & QUIC_EV_CONN_RTTUPDT) {
return 0;
}
-/* Remove from <qcs> stream the acknowledged frames.
+/* Remove from <stream> the acknowledged frames.
*
* Returns 1 if at least one frame was removed else 0.
*/
-static int qcs_try_to_consume(struct qcs *qcs)
+static int quic_stream_try_to_consume(struct quic_conn *qc,
+ struct qc_stream_desc *stream)
{
int ret;
struct eb64_node *frm_node;
ret = 0;
- frm_node = eb64_first(&qcs->tx.acked_frms);
+ frm_node = eb64_first(&stream->acked_frms);
while (frm_node) {
struct quic_stream *strm;
struct quic_frame *frm;
strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
- if (strm->offset.key > qcs->tx.ack_offset)
+ if (strm->offset.key > stream->ack_offset)
break;
TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM,
- qcs->qcc->conn->qc, strm, qcs);
- if (strm->offset.key + strm->len > qcs->tx.ack_offset) {
+ qc, strm, stream);
+
+ if (strm->offset.key + strm->len > stream->ack_offset) {
const size_t diff = strm->offset.key + strm->len -
- qcs->tx.ack_offset;
- qcs->tx.ack_offset += diff;
+ stream->ack_offset;
+ stream->ack_offset += diff;
b_del(strm->buf, diff);
ret = 1;
-
- if (!b_data(strm->buf)) {
- b_free(strm->buf);
- offer_buffers(NULL, 1);
- }
}
frm_node = eb64_next(frm_node);
pool_free(pool_head_quic_frame, frm);
}
+ if (!b_data(&stream->buf))
+ qc_stream_desc_free(stream);
+
return ret;
}
switch (frm->type) {
case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
{
- struct quic_stream *strm = &frm->stream;
+ struct quic_stream *strm_frm = &frm->stream;
struct eb64_node *node = NULL;
- struct qcs *qcs = NULL;
+ struct qc_stream_desc *stream = NULL;
- /* do not use strm->qcs as the qcs instance might be freed at
- * this stage. Use the id to do a proper lookup.
+ /* do not use strm_frm->stream as the qc_stream_desc instance
+ * might be freed at this stage. Use the id to do a proper
+ * lookup.
*
* TODO if lookup operation impact on the perf is noticeable,
- * implement a refcount on qcs instances.
+ * implement a refcount on qc_stream_desc instances.
*/
- if (qc->mux_state == QC_MUX_READY) {
- node = eb64_lookup(&qc->qcc->streams_by_id, strm->id);
- qcs = eb64_entry(node, struct qcs, by_id);
- }
+ node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
+ stream = eb64_entry(node, struct qc_stream_desc, by_id);
- if (!qcs) {
- TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm);
+ if (!stream) {
+ TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm);
LIST_DELETE(&frm->list);
quic_tx_packet_refdec(frm->pkt);
pool_free(pool_head_quic_frame, frm);
return;
}
- TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm, qcs);
- if (strm->offset.key <= qcs->tx.ack_offset) {
- if (strm->offset.key + strm->len > qcs->tx.ack_offset) {
- const size_t diff = strm->offset.key + strm->len -
- qcs->tx.ack_offset;
- qcs->tx.ack_offset += diff;
- b_del(strm->buf, diff);
+ TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
+ if (strm_frm->offset.key <= stream->ack_offset) {
+ if (strm_frm->offset.key + strm_frm->len > stream->ack_offset) {
+ const size_t diff = strm_frm->offset.key + strm_frm->len -
+ stream->ack_offset;
+ stream->ack_offset += diff;
+ b_del(strm_frm->buf, diff);
stream_acked = 1;
- if (!b_data(strm->buf)) {
- b_free(strm->buf);
- offer_buffers(NULL, 1);
+ if (!b_data(strm_frm->buf)) {
+ if (qc_stream_desc_free(stream)) {
+ /* early return */
+ return;
+ }
}
}
TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM,
- qcs->qcc->conn->qc, strm, qcs);
+ qc, strm_frm, stream);
LIST_DELETE(&frm->list);
quic_tx_packet_refdec(frm->pkt);
pool_free(pool_head_quic_frame, frm);
}
else {
- eb64_insert(&qcs->tx.acked_frms, &strm->offset);
+ eb64_insert(&stream->acked_frms, &strm_frm->offset);
}
- stream_acked |= qcs_try_to_consume(qcs);
+ stream_acked |= quic_stream_try_to_consume(qc, stream);
}
break;
default:
LIST_DELETE(&cf->list);
LIST_APPEND(outlist, &cf->list);
- qcc_streams_sent_done(cf->stream.qcs,
- cf->stream.len,
- cf->stream.offset.key);
+ /* The MUX stream might be released at this
+ * stage. This can most notably happen on
+ * retransmission.
+ */
+ if (qc->mux_state == QC_MUX_READY &&
+ !cf->stream.stream->release) {
+ qcc_streams_sent_done(cf->stream.stream->ctx,
+ cf->stream.len,
+ cf->stream.offset.key);
+ }
}
else {
struct quic_frame *new_cf;
}
new_cf->type = cf->type;
- new_cf->stream.qcs = cf->stream.qcs;
+ new_cf->stream.stream = cf->stream.stream;
new_cf->stream.buf = cf->stream.buf;
new_cf->stream.id = cf->stream.id;
if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)
cf->stream.offset.key += dlen;
cf->stream.data = (unsigned char *)b_peek(&cf_buf, dlen);
- qcc_streams_sent_done(new_cf->stream.qcs,
- new_cf->stream.len,
- new_cf->stream.offset.key);
+ /* The MUX stream might be released at this
+ * stage. This can most notably happen on
+ * retransmission.
+ */
+ if (qc->mux_state == QC_MUX_READY &&
+ !cf->stream.stream->release) {
+ qcc_streams_sent_done(new_cf->stream.stream->ctx,
+ new_cf->stream.len,
+ new_cf->stream.offset.key);
+ }
}
/* TODO the MUX is notified about the frame sending via