if (!qcs)
goto out;
- /* allocate transport layer stream descriptor */
- stream = qc_stream_desc_new(id, qcs);
+ /* allocate transport layer stream descriptor
+ *
+ * TODO qc_stream_desc is only useful for Tx buffering. It should not
+ * be required for unidirectional remote streams.
+ */
+ stream = qc_stream_desc_new(id, qcs, qcc->conn->handle.qc);
if (!stream) {
pool_free(pool_head_qcs, qcs);
qcs = NULL;
qcs->endp->ctx = qcc->conn;
qcs->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST);
- qcs->id = id;
+ qcs->id = qcs->by_id.key = id;
/* store transport layer stream descriptor in qcc tree */
- eb64_insert(&qcc->streams_by_id, &stream->by_id);
+ eb64_insert(&qcc->streams_by_id, &qcs->by_id);
qcc->strms[type].nb_streams++;
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams);
--qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams;
- /* stream desc must be removed from MUX tree before release it */
- eb64_delete(&qcs->stream->by_id);
- qc_stream_desc_release(qcs->stream, qcs->qcc->conn->handle.qc);
+ qc_stream_desc_release(qcs->stream);
+
BUG_ON(qcs->endp && !(qcs->endp->flags & CS_EP_ORPHAN));
cs_endpoint_free(qcs->endp);
+
+ eb64_delete(&qcs->by_id);
pool_free(pool_head_qcs, qcs);
}
*/
struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
{
- struct qc_stream_desc *stream;
unsigned int strm_type;
int64_t sub_id;
- struct eb64_node *strm_node;
+ struct eb64_node *node;
struct qcs *qcs = NULL;
strm_type = id & QCS_ID_TYPE_MASK;
sub_id = id >> QCS_ID_TYPE_SHIFT;
- strm_node = NULL;
+ node = NULL;
if (quic_stream_is_local(qcc, id)) {
/* Local streams: this stream must be already opened. */
- strm_node = eb64_lookup(&qcc->streams_by_id, id);
- if (!strm_node) {
+ node = eb64_lookup(&qcc->streams_by_id, id);
+ if (!node) {
/* unknown stream id */
goto out;
}
- stream = eb64_entry(strm_node, struct qc_stream_desc, by_id);
- qcs = stream->ctx;
+ qcs = eb64_entry(node, struct qcs, by_id);
}
else {
/* Remote streams. */
qcs = tmp_qcs;
}
else {
- strm_node = eb64_lookup(strms, id);
- if (strm_node) {
- stream = eb64_entry(strm_node, struct qc_stream_desc, by_id);
- qcs = stream->ctx;
- }
+ node = eb64_lookup(strms, id);
+ if (node)
+ qcs = eb64_entry(node, struct qcs, by_id);
}
}
*/
int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
{
- struct qc_stream_desc *stream;
struct qcs *qcs;
struct eb64_node *node;
node = eb64_lookup(&qcc->streams_by_id, id);
if (node) {
- stream = eb64_entry(node, struct qc_stream_desc, by_id);
- qcs = stream->ctx;
+ qcs = eb64_entry(node, struct qcs, by_id);
if (max > qcs->tx.msd) {
qcs->tx.msd = max;
/* liberate remaining qcs instances */
node = eb64_first(&qcc->streams_by_id);
while (node) {
- struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
+ struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
node = eb64_next(node);
- qcs_free(stream->ctx);
+ qcs_free(qcs);
}
pool_free(pool_head_qcc, qcc);
*/
node = eb64_first(&qcc->streams_by_id);
while (node) {
- struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
- struct qcs *qcs = stream->ctx;
+ struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
struct buffer *buf = &qcs->tx.buf;
struct buffer *out = &qcs->stream->buf;
node = eb64_first(&qcc->streams_by_id);
while (node) {
- struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
- struct qcs *qcs = stream->ctx;
+ struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
node = eb64_next(node);
if (qcs->flags & QC_SF_DETACH) {
*/
static int qc_wake_some_streams(struct qcc *qcc)
{
- struct qc_stream_desc *stream;
struct qcs *qcs;
struct eb64_node *node;
for (node = eb64_first(&qcc->streams_by_id); node;
node = eb64_next(node)) {
- stream = eb64_entry(node, struct qc_stream_desc, by_id);
- qcs = stream->ctx;
+ qcs = eb64_entry(node, struct qcs, by_id);
if (!qcs->cs)
continue;
*
* Returns the newly allocated instance on success or else NULL.
*/
-struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx)
+struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx,
+ struct quic_conn *qc)
{
struct qc_stream_desc *stream;
return NULL;
stream->by_id.key = id;
- stream->by_id.node.leaf_p = NULL;
+ eb64_insert(&qc->streams_by_id, &stream->by_id);
stream->buf = BUF_NULL;
stream->acked_frms = EB_ROOT;
return stream;
}
-/* Mark the stream descriptor <stream> as released by the upper layer. It will
- * be freed as soon as all its buffered data are acknowledged. In the meantime,
- * the stream is stored in the <qc> tree : thus it must have been removed from
- * any other tree before calling this function.
+/* Mark the stream descriptor <stream> as released. It will be freed as soon as
+ * all its buffered data are acknowledged.
*/
-void qc_stream_desc_release(struct qc_stream_desc *stream,
- struct quic_conn *qc)
+void qc_stream_desc_release(struct qc_stream_desc *stream)
{
- BUG_ON(stream->by_id.node.leaf_p);
+ /* A stream can be released only one time. */
+ BUG_ON(stream->release);
stream->release = 1;
stream->ctx = NULL;
if (!b_data(&stream->buf))
qc_stream_desc_free(stream);
- else
- eb64_insert(&qc->streams_by_id, &stream->by_id);
}
/* Free the stream descriptor <stream> buffer. This function should be used
/* do not use strm_frm->stream as the qc_stream_desc instance
* might be freed at this stage. Use the id to do a proper
- * lookup. First search in the MUX then in the released stream
- * list.
+ * lookup.
*
* TODO if lookup operation impact on the perf is noticeable,
* implement a refcount on qc_stream_desc instances.
*/
- if (qc->mux_state == QC_MUX_READY)
- stream = qcc_get_stream(qc->qcc, strm_frm->id);
- if (!stream) {
- node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
- stream = eb64_entry(node, struct qc_stream_desc, by_id);
- }
-
- if (!stream) {
+ node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
+ if (!node) {
TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm);
LIST_DELETE(&frm->list);
quic_tx_packet_refdec(frm->pkt);
/* early return */
return;
}
+ stream = eb64_entry(node, struct qc_stream_desc, by_id);
TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
if (strm_frm->offset.key <= stream->ack_offset) {