uint64_t offset; /* absolute current base offset of ncbuf */
struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
struct buffer app_buf; /* receive buffer used by conn_stream layer */
- uint64_t msd; /* fctl bytes limit to enforce */
+ uint64_t msd; /* current max-stream-data limit to enforce */
+ uint64_t msd_init; /* initial max-stream-data */
} rx;
struct {
uint64_t offset; /* last offset of data ready to be sent */
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
+void qcs_consume(struct qcs *qcs, uint64_t bytes);
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
char fin, char *data);
h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
__func__, ftype, flen);
- ncb_advance(rxbuf, hlen);
h3s->demux_frame_type = ftype;
h3s->demux_frame_len = flen;
- qcs->rx.offset += hlen;
+ qcs_consume(qcs, hlen);
}
flen = h3s->demux_frame_len;
}
if (ret) {
- ncb_advance(rxbuf, ret);
BUG_ON(h3s->demux_frame_len < ret);
h3s->demux_frame_len -= ret;
- qcs->rx.offset += ret;
+ qcs_consume(qcs, ret);
}
}
if (flen > b_data(&b))
break;
- ncb_advance(rxbuf, hlen);
- h3_uqs->qcs->rx.offset += hlen;
+ qcs_consume(h3_uqs->qcs, hlen);
/* From here, a frame must not be truncated */
switch (ftype) {
case H3_FT_CANCEL_PUSH:
h3->err = H3_FRAME_UNEXPECTED;
return 0;
}
- ncb_advance(rxbuf, flen);
- h3_uqs->qcs->rx.offset += flen;
+ qcs_consume(h3_uqs->qcs, flen);
}
/* Handle the case where remaining data are present in the buffer. This
if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX)
return 0;
- ncb_advance(rxbuf, len);
- qcs->rx.offset += len;
+ qcs_consume(qcs, len);
/* Note that for all the uni-streams below, this is an error to receive two times the
* same type of uni-stream (even for Push stream which is not supported at this time.
if (!cs)
return -1;
-
- qcs->rx.offset += ncb_data(rxbuf, 0);
- ncb_advance(rxbuf, ncb_data(rxbuf, 0));
+ qcs_consume(qcs, ncb_data(rxbuf, 0));
b_free(&htx_buf);
if (fin)
/* TODO use uni limit for unidirectional streams */
qcs->rx.msd = quic_stream_is_local(qcc, id) ? qcc->lfctl.msd_bidi_l :
qcc->lfctl.msd_bidi_r;
+ qcs->rx.msd_init = qcs->rx.msd;
qcs->tx.buf = BUF_NULL;
qcs->tx.offset = 0;
}
}
+/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
+ * after STREAM parsing. Flow-control for received offsets may be allocated for
+ * the peer if needed.
+ */
+void qcs_consume(struct qcs *qcs, uint64_t bytes)
+{
+ struct qcc *qcc = qcs->qcc;
+ struct quic_frame *frm;
+ enum ncb_ret ret;
+
+ ret = ncb_advance(&qcs->rx.ncbuf, bytes);
+ if (ret) {
+ ABORT_NOW(); /* should not happens because removal only in data */
+ }
+
+ qcs->rx.offset += bytes;
+ if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
+ frm = pool_zalloc(pool_head_quic_frame);
+ BUG_ON(!frm); /* TODO handle this properly */
+
+ qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
+
+ LIST_INIT(&frm->reflist);
+ frm->type = QUIC_FT_MAX_STREAM_DATA;
+ frm->max_stream_data.id = qcs->id;
+ frm->max_stream_data.max_stream_data = qcs->rx.msd;
+
+ LIST_APPEND(&qcc->lfctl.frms, &frm->list);
+ tasklet_wakeup(qcc->wait_event.tasklet);
+ }
+}
+
/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
* several streams, depending on the already open ones.
* Return this node if succeeded, NULL if not.
return 1;
}
- /* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
- BUG_ON(offset + len == qcs->rx.msd);
-
if (fin)
qcs->flags |= QC_SF_FIN_RECV;