struct qcc *qcc = qcs->qcc;
struct quic_frame *frm;
int head, total;
+ uint64_t base_off;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
- /* cf buffer schema in qcs_xfer_data */
- head = qcs->tx.sent_offset - qcs->stream->ack_offset;
+ /* if ack_offset < buf_offset, it points to an older buffer. */
+ base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
+ BUG_ON(qcs->tx.sent_offset < base_off);
+
+ head = qcs->tx.sent_offset - base_off;
total = b_data(out) - head;
+ BUG_ON(total < 0);
+
if (!total) {
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
return 0;
}
+ BUG_ON(qcs->tx.sent_offset >= qcs->tx.offset);
+ BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset);
frm = pool_zalloc(pool_head_quic_frame);
if (!frm)
uint64_t diff;
BUG_ON(offset > qcs->tx.sent_offset);
+ BUG_ON(offset >= qcs->tx.offset);
/* check if the STREAM frame has already been notified. It can happen
* for retransmission.
/* increase offset on stream */
qcs->tx.sent_offset += diff;
BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
+ BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset);
if (qcs->tx.sent_offset == qcs->tx.msd)
qcs->flags |= QC_SF_BLK_SFCTL;
+
+ if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
+ qc_stream_buf_release(qcs->stream);
+ tasklet_wakeup(qcc->wait_event.tasklet);
+ }
}
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
while (node) {
struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
struct buffer *buf = &qcs->tx.buf;
- struct buffer *out = &qcs->stream->buf;
+ struct buffer *out = qc_stream_buf_get(qcs->stream);
/* TODO
* for the moment, unidirectional streams have their own
continue;
}
+ if (!b_data(buf) && !out) {
+ node = eb64_next(node);
+ continue;
+ }
+
+ if (!out) {
+ struct connection *conn = qcc->conn;
+
+ out = qc_stream_buf_alloc(qcs->stream,
+ qcs->tx.offset);
+ if (!out) {
+ conn->xprt->subscribe(conn, conn->xprt_ctx,
+ SUB_RETRY_SEND, &qcc->wait_event);
+ node = eb64_next(node);
+ continue;
+ }
+ }
+
/* Prepare <out> buffer with data from <buf>. */
if (b_data(buf)) {
int ret = qcs_xfer_data(qcs, out, buf,
}
/* Build a new STREAM frame with <out> buffer. */
- if (b_data(out)) {
+ if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) {
int ret;
char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
DECLARE_STATIC_POOL(pool_head_quic_conn_stream, "qc_stream_desc",
sizeof(struct qc_stream_desc));
+DECLARE_STATIC_POOL(pool_head_quic_conn_stream_buf, "qc_stream_buf",
+ sizeof(struct qc_stream_buf));
+
/* Allocate a new stream descriptor with id <id>. The caller is responsible to
* store the stream in the appropriate tree.
eb64_insert(&qc->streams_by_id, &stream->by_id);
stream->qc = qc;
- stream->buf = BUF_NULL;
+ stream->buf = NULL;
+ LIST_INIT(&stream->buf_list);
+ stream->buf_offset = 0;
+
stream->acked_frms = EB_ROOT;
stream->ack_offset = 0;
stream->release = 0;
stream->release = 1;
stream->ctx = NULL;
- if (!b_data(&stream->buf))
+ if (LIST_ISEMPTY(&stream->buf_list)) {
+ /* if no buffer left we can free the stream. */
qc_stream_desc_free(stream);
+ }
+ else {
+ /* A released stream does not use <stream.buf>. */
+ stream->buf = NULL;
+ }
}
-/* Free the stream descriptor <stream> buffer. This function should be used
- * when all its data have been acknowledged. If the stream was released by the
- * upper layer, the stream descriptor will be freed.
- *
- * Returns 0 if the stream was not freed else non-zero.
+/* Free the stream descriptor <stream> content. This function should be used
+ * when all its data have been acknowledged or on full connection closing. It
+ * must only be called after the stream is released.
*/
-int qc_stream_desc_free(struct qc_stream_desc *stream)
+void qc_stream_desc_free(struct qc_stream_desc *stream)
{
- b_free(&stream->buf);
- offer_buffers(NULL, 1);
+ struct qc_stream_buf *buf, *buf_back;
+ struct eb64_node *frm_node;
+ unsigned int free_count = 0;
- if (stream->release) {
- /* Free frames still waiting for an ACK. Even if the stream buf
- * is NULL, some frames could still be not acknowledged. This
- * is notably the case for retransmission where multiple frames
- * points to the same buffer content.
- */
- struct eb64_node *frm_node = eb64_first(&stream->acked_frms);
- while (frm_node) {
- struct quic_stream *strm;
- struct quic_frame *frm;
-
- strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
-
- frm_node = eb64_next(frm_node);
- eb64_delete(&strm->offset);
-
- frm = container_of(strm, struct quic_frame, stream);
- LIST_DELETE(&frm->list);
- quic_tx_packet_refdec(frm->pkt);
- pool_free(pool_head_quic_frame, frm);
+ /* This function only deals with released streams. */
+ BUG_ON(!stream->release);
+
+ /* free remaining stream buffers */
+ list_for_each_entry_safe(buf, buf_back, &stream->buf_list, list) {
+ if (!(b_data(&buf->buf))) {
+ b_free(&buf->buf);
+ LIST_DELETE(&buf->list);
+ pool_free(pool_head_quic_conn_stream_buf, buf);
+
+ ++free_count;
}
+ }
+
+ if (free_count)
+ offer_buffers(NULL, free_count);
+
+ /* qc_stream_desc might be freed before having received all its ACKs.
+ * This is the case if some frames were retransmitted.
+ */
+ frm_node = eb64_first(&stream->acked_frms);
+ while (frm_node) {
+ struct quic_stream *strm;
+ struct quic_frame *frm;
+
+ strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
- eb64_delete(&stream->by_id);
- pool_free(pool_head_quic_conn_stream, stream);
+ frm_node = eb64_next(frm_node);
+ eb64_delete(&strm->offset);
+ frm = container_of(strm, struct quic_frame, stream);
+ LIST_DELETE(&frm->list);
+ quic_tx_packet_refdec(frm->pkt);
+ pool_free(pool_head_quic_frame, frm);
+ }
+
+ eb64_delete(&stream->by_id);
+ pool_free(pool_head_quic_conn_stream, stream);
+}
+
+/* Return the current buffer of <stream>. May be NULL if not allocated. */
+struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
+{
+ if (!stream->buf)
+ return NULL;
+
+ return &stream->buf->buf;
+}
+
+/* Allocate a new current buffer for <stream>. This function is not allowed if
+ * current buffer is not NULL prior to this call. The new buffer represents
+ * stream payload at offset <offset>.
+ *
+ * Returns the buffer or NULL.
+ */
+struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
+ uint64_t offset)
+{
+ /* current buffer must be released first before allocate a new one. */
+ BUG_ON(stream->buf);
+
+ stream->buf_offset = offset;
+ stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
+ if (!stream->buf)
+ return NULL;
+
+ stream->buf->buf = BUF_NULL;
+ LIST_APPEND(&stream->buf_list, &stream->buf->list);
+
+ return &stream->buf->buf;
+}
+
+/* Release the current buffer of <stream>. It will be kept internally by
+ * the <stream>. The current buffer cannot be NULL.
+ */
+void qc_stream_buf_release(struct qc_stream_desc *stream)
+{
+ /* current buffer already released */
+ BUG_ON(!stream->buf);
+
+ stream->buf = NULL;
+ stream->buf_offset = 0;
+}
+
+/* Free the oldest buffer of <stream>. If the stream was already released and
+ * does not references any buffers, it is freed. This function must only be
+ * called if at least one buffer is present. Use qc_stream_desc_free() to free
+ * a released stream.
+ *
+ * Returns 0 if the stream is not yet freed else 1.
+ */
+int qc_stream_desc_free_buf(struct qc_stream_desc *stream)
+{
+ struct qc_stream_buf *stream_buf;
+
+ BUG_ON(LIST_ISEMPTY(&stream->buf_list) && !stream->buf);
+
+ if (!LIST_ISEMPTY(&stream->buf_list)) {
+ /* get first buffer from buf_list and remove it */
+ stream_buf = LIST_NEXT(&stream->buf_list, struct qc_stream_buf *,
+ list);
+ LIST_DELETE(&stream_buf->list);
+ }
+ else {
+ stream_buf = stream->buf;
+ stream->buf = NULL;
+ }
+
+ b_free(&stream_buf->buf);
+ pool_free(pool_head_quic_conn_stream_buf, stream_buf);
+
+ offer_buffers(NULL, 1);
+
+ /* If qc_stream_desc is released and does not contains any buffers, we
+ * can free it now.
+ */
+ if (stream->release && LIST_ISEMPTY(&stream->buf_list)) {
+ qc_stream_desc_free(stream);
return 1;
}