/* Maximum size of stream Rx buffer. */
#define QC_S_RX_BUF_SZ (global.tune.bufsize - NCB_RESERVED_SZ)
+/* QUIC stream states
+ *
+ * On initialization a stream is put on idle state. It is opened as soon as
+ * data has been successfully sent or received on it.
+ *
+ * A bidirectional stream has two channels which can be closed separately. The
+ * local channel is closed when the STREAM frame with FIN or a RESET_STREAM has
+ * been emitted. The remote channel is closed as soon as all data from the peer
+ * has been received. The stream goes instantely to the close state once both
+ * channels are closed.
+ *
+ * A unidirectional stream has only one channel of communication. Thus, it does
+ * not use half closed states and transition directly from open to close state.
+ */
+enum qcs_state {
+ QC_SS_IDLE = 0, /* initial state */
+ QC_SS_OPEN, /* opened */
+ QC_SS_HLOC, /* half-closed local */
+ QC_SS_HREM, /* half-closed remote */
+ QC_SS_CLO, /* closed */
+} __attribute__((packed));
+
struct qcs {
struct qcc *qcc;
struct sedesc *sd;
uint32_t flags; /* QC_SF_* */
+ enum qcs_state st; /* QC_SS_* state */
void *ctx; /* app-ops context */
struct {
qcs->qcc = qcc;
qcs->sd = NULL;
qcs->flags = QC_SF_NONE;
+ qcs->st = QC_SS_IDLE;
qcs->ctx = NULL;
/* Allocate transport layer stream descriptor. Only needed for TX. */
return qcs->sd ? qcs->sd->sc : NULL;
}
+/* Mark a stream as open if it was idle. This can be used on every
+ * successful emission/reception operation to update the stream state.
+ */
+static void qcs_idle_open(struct qcs *qcs)
+{
+ /* This operation must not be used if the stream is already closed. */
+ BUG_ON_HOT(qcs->st == QC_SS_CLO);
+
+ if (qcs->st == QC_SS_IDLE) {
+ qcs->st = QC_SS_OPEN;
+ TRACE_DEVEL("opening stream", QMUX_EV_QCS_NEW, qcs->qcc->conn, qcs);
+ }
+}
+
+/* Close the local channel of <qcs> instance. */
+static void qcs_close_local(struct qcs *qcs)
+{
+ /* The stream must have already been opened. */
+ BUG_ON_HOT(qcs->st == QC_SS_IDLE);
+
+ /* This operation cannot be used multiple times. */
+ BUG_ON_HOT(qcs->st == QC_SS_HLOC || qcs->st == QC_SS_CLO);
+
+ if (quic_stream_is_bidi(qcs->id)) {
+ qcs->st = (qcs->st == QC_SS_HREM) ? QC_SS_CLO : QC_SS_HLOC;
+ }
+ else {
+ /* Only local uni streams are valid for this operation. */
+ BUG_ON_HOT(quic_stream_is_remote(qcs->qcc, qcs->id));
+ qcs->st = QC_SS_CLO;
+ }
+
+ TRACE_DEVEL("closing stream locally", QMUX_EV_QCS_END, qcs->qcc->conn, qcs);
+}
+
+/* Close the remote channel of <qcs> instance. */
+static void qcs_close_remote(struct qcs *qcs)
+{
+ /* The stream must have already been opened. */
+ BUG_ON_HOT(qcs->st == QC_SS_IDLE);
+
+ /* This operation cannot be used multiple times. */
+ BUG_ON_HOT(qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO);
+
+ if (quic_stream_is_bidi(qcs->id)) {
+ qcs->st = (qcs->st == QC_SS_HLOC) ? QC_SS_CLO : QC_SS_HREM;
+ }
+ else {
+ /* Only remote uni streams are valid for this operation. */
+ BUG_ON_HOT(quic_stream_is_local(qcs->qcc, qcs->id));
+ qcs->st = QC_SS_CLO;
+ }
+
+ TRACE_DEVEL("closing stream remotely", QMUX_EV_QCS_END, qcs->qcc->conn, qcs);
+}
+
+static int qcs_is_close_local(struct qcs *qcs)
+{
+ return qcs->st == QC_SS_HLOC || qcs->st == QC_SS_CLO;
+}
+
+static __maybe_unused int qcs_is_close_remote(struct qcs *qcs)
+{
+ return qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO;
+}
+
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr)
{
struct buffer *buf = b_alloc(bptr);
return 0;
}
+ qcs_idle_open(qcs);
+
if (offset + len > qcs->rx.offset_max) {
uint64_t diff = offset + len - qcs->rx.offset_max;
qcs->rx.offset_max = offset + len;
if (fin)
qcs->flags |= QC_SF_SIZE_KNOWN;
+ if (qcs->flags & QC_SF_SIZE_KNOWN && !ncb_is_fragmented(&qcs->rx.ncbuf))
+ qcs_close_remote(qcs);
+
if (ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL))
qcc_decode_qcs(qcc, qcs);
if (offset + data < qcs->tx.sent_offset)
return;
+ qcs_idle_open(qcs);
+
diff = offset + data - qcs->tx.sent_offset;
if (diff) {
/* increase offset sum on connection */
}
if (qcs->tx.offset == qcs->tx.sent_offset && qcs_stream_fin(qcs)) {
+ /* Close stream locally. */
+ qcs_close_local(qcs);
/* Reset flag to not emit multiple FIN STREAM frames. */
qcs->flags &= ~QC_SF_FIN_STREAM;
}
continue;
}
+ if (qcs_is_close_local(qcs)) {
+ node = eb64_next(node);
+ continue;
+ }
+
if (qcs->flags & QC_SF_BLK_SFCTL) {
node = eb64_next(node);
continue;
struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
node = eb64_next(node);
+ /* Release not attached closed streams. */
+ if (qcs->st == QC_SS_CLO && !qcs_sc(qcs)) {
+ TRACE_DEVEL("purging closed stream", QMUX_EV_QCC_WAKE, qcs->qcc->conn, qcs);
+ qcs_destroy(qcs);
+ release = 1;
+ continue;
+ }
+
/* Release detached streams with empty buffer. */
if (qcs->flags & QC_SF_DETACH) {
if (!b_data(&qcs->tx.buf) &&
TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs);
+ /* TODO this BUG_ON_HOT() is not correct as the stconn layer may detach
+ * from the stream even if it is not closed remotely at the QUIC layer.
+ * This happens for example when a stream must be closed due to a
+ * rejected request. To better handle these cases, it will be required
+ * to implement shutr/shutw MUX operations. Once this is done, this
+ * BUG_ON_HOT() statement can be adjusted.
+ */
+ //BUG_ON_HOT(!qcs_is_close_remote(qcs));
--qcc->nb_sc;
if ((b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) &&
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ if (qcs_is_close_local(qcs)) {
+ ret = count;
+ count = 0;
+ goto end;
+ }
+
ret = qcs->qcc->app_ops->snd_buf(sc, buf, count, flags);
+ end:
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
return ret;
}
+static char *qcs_st_to_str(enum qcs_state st)
+{
+ switch (st) {
+ case QC_SS_IDLE: return "IDL";
+ case QC_SS_OPEN: return "OPN";
+ case QC_SS_HLOC: return "HCL";
+ case QC_SS_HREM: return "HCR";
+ case QC_SS_CLO: return "CLO";
+ default: return "???";
+ }
+}
+
static void qmux_trace_frm(const struct quic_frame *frm)
{
switch (frm->type) {
chunk_appendf(&trace_buf, " : qcc=%p(F)", qcc);
if (qcs)
- chunk_appendf(&trace_buf, " qcs=%p(%llu)", qcs, (ull)qcs->id);
+ chunk_appendf(&trace_buf, " qcs=%p .id=%llu .st=%s",
+ qcs, (ull)qcs->id,
+ qcs_st_to_str(qcs->st));
if (mask & QMUX_EV_QCC_NQCS) {
const uint64_t *id = a3;