/* flow-control fields set by the peer which we must respect. */
struct {
+ uint64_t msd_bidi_l; /* initial max-stream-data for peer local streams */
+ uint64_t msd_bidi_r; /* initial max-stream-data for peer remote streams */
} rfctl;
struct {
#define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */
#define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */
#define QC_SF_DETACH 0x00000008 /* cs is detached but there is remaining data to send */
+#define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */
struct qcs {
struct qcc *qcc;
uint64_t ack_offset; /* last acked ordered byte offset */
struct buffer buf; /* transmit buffer before sending via xprt */
struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */
+ uint64_t msd; /* fctl bytes limit to respect on emission */
} tx;
struct eb64_node by_id; /* place in qcc's streams_by_id */
eb64_insert(&qcc->streams_by_id, &qcs->by_id);
qcc->strms[type].nb_streams++;
+ /* If stream is local, use peer remote-limit, or else the opposite. */
+ /* TODO use uni limit for unidirectional streams */
+ qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r :
+ qcc->rfctl.msd_bidi_l;
+
qcs->rx.buf = BUF_NULL;
qcs->rx.app_buf = BUF_NULL;
qcs->rx.offset = 0;
* <payload> to <out> buffer. The STREAM frame payload points to the <out>
* buffer. The frame is then pushed to <frm_list>. If <fin> is set, and the
* <payload> buf is emptied after transfer, FIN bit is set on the STREAM frame.
+ * Transfer is automatically adjusted to not exceed the stream flow-control
+ * limit.
*
* Returns the total bytes of newly transferred data or a negative error code.
*/
head = qcs->tx.sent_offset - qcs->tx.ack_offset;
left = qcs->tx.offset - qcs->tx.sent_offset;
to_xfer = QUIC_MIN(b_data(payload), b_room(out));
+
+ BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
+ /* do not exceed flow control limit */
+ if (qcs->tx.offset + to_xfer > qcs->tx.msd)
+ to_xfer = qcs->tx.msd - qcs->tx.offset;
+
if (!left && !to_xfer)
goto out;
/* increase offset on stream */
qcs->tx.sent_offset += diff;
+ BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
+ if (qcs->tx.sent_offset == qcs->tx.msd)
+ qcs->flags |= QC_SF_BLK_SFCTL;
}
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
continue;
}
+ if (qcs->flags & QC_SF_BLK_SFCTL) {
+ node = eb64_next(node);
+ continue;
+ }
+
if (b_data(buf) || b_data(out)) {
int ret;
char fin = qcs->flags & QC_SF_FIN_STREAM;
struct session *sess, struct buffer *input)
{
struct qcc *qcc;
- struct quic_transport_params *lparams;
+ struct quic_transport_params *lparams, *rparams;
qcc = pool_alloc(pool_head_qcc);
if (!qcc)
qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
qcc->lfctl.cl_bidi_r = 0;
+ rparams = &conn->qc->tx.params;
+ qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
+ qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
+
qcc->wait_event.tasklet = tasklet_new();
if (!qcc->wait_event.tasklet)
goto fail_no_tasklet;