}
}
-/* Returns the current Rx buffer instance for <qcs> stream. */
+/* Returns the Rx buffer instance for <qcs> stream read offset. May be NULL if
+ * not already allocated.
+ */
static struct qc_stream_rxbuf *qcs_get_curr_rxbuf(struct qcs *qcs)
{
struct eb64_node *node;
struct qc_stream_rxbuf *buf;
node = eb64_first(&qcs->rx.bufs);
+ if (!node)
+ return NULL;
+
buf = container_of(node, struct qc_stream_rxbuf, off_node);
+ if (qcs->rx.offset < buf->off_node.key) {
+ /* first buffer allocated for a future offset */
+ return NULL;
+ }
+
+ /* Ensures obsolete buffer are not kept inside QCS */
+ BUG_ON(buf->off_end < qcs->rx.offset);
return buf;
}
-/* Returns the amount of data readable at <qcs> stream current offset. */
+/* Returns the amount of data readable at <qcs> stream on current buffer. Note
+ * that this does account for hypothetical contiguous data divided on other
+ * Rx buffers instances.
+ */
static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
{
struct qc_stream_rxbuf *b = qcs_get_curr_rxbuf(qcs);
if (qcc->glitches != prev_glitches)
session_add_glitch_ctr(qcc->conn->owner, qcc->glitches - prev_glitches);
+ /* TODO not enough data in current rxbuf, merging required with next buffer */
+ BUG_ON(rxbuf && !ret && qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end);
+
if (ret < 0) {
TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
goto err;
if (ret)
qcs_consume(qcs, ret, rxbuf);
- if (ncb_is_empty(&rxbuf->ncb))
+ if (ncb_is_empty(&rxbuf->ncb)) {
qcs_free_rxbuf(qcs, rxbuf);
+
+ /* Close QCS remotely if only one Rx buffer remains and
+ * all data with FIN already stored in it. This is
+ * necessary to be performed before app_ops rcv_buf to
+ * ensure FIN is correctly signalled.
+ */
+ if (qcs->flags & QC_SF_SIZE_KNOWN && !eb_is_empty(&qcs->rx.bufs)) {
+ const ncb_sz_t avail = qcs_rx_avail_data(qcs);
+ if (qcs->rx.offset + avail == qcs->rx.offset_max)
+ qcs_close_remote(qcs);
+ }
+ }
}
if (ret || (!b_data(&b) && fin))
return 1;
}
+/* Retrieves the Rx buffer instance usable to store STREAM data starting at
+ * <offset>. It is dynamically allocated if not already instantiated. <len>
+ * must contains the size of the STREAM frame. It may be reduced by the
+ * function if data is too large relative to the buffer starting offset.
+ * Another buffer instance should be allocated to store the remaining data.
+ *
+ * Returns the buffer instance or NULL in case of error.
+ */
+static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset,
+ uint64_t *len)
+{
+ struct qcc *qcc = qcs->qcc;
+ struct eb64_node *node;
+ struct qc_stream_rxbuf *buf;
+ struct ncbuf *ncbuf;
+
+ TRACE_ENTER(QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
+
+ node = eb64_lookup_le(&qcs->rx.bufs, offset);
+ if (node)
+ buf = container_of(node, struct qc_stream_rxbuf, off_node);
+
+ if (!node || offset >= buf->off_end) {
+ const uint64_t aligned_off = offset - (offset % qmux_stream_rx_bufsz());
+
+ TRACE_DEVEL("allocating a new entry", QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
+ buf = pool_alloc(pool_head_qc_stream_rxbuf);
+ if (!buf) {
+ TRACE_ERROR("qcs rxbuf alloc error", QMUX_EV_QCC_RECV, qcc->conn, qcs);
+ goto err;
+ }
+
+ buf->ncb = NCBUF_NULL;
+ buf->off_node.key = aligned_off;
+ buf->off_end = aligned_off + qmux_stream_rx_bufsz();
+ eb64_insert(&qcs->rx.bufs, &buf->off_node);
+ }
+
+ ncbuf = &buf->ncb;
+ if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
+ TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
+ qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
+ goto err;
+ }
+
+ if (offset + *len > buf->off_end)
+ *len = buf->off_end - offset;
+
+ TRACE_LEAVE(QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
+ return buf;
+
+ err:
+ TRACE_DEVEL("leaving on error", QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
+ return NULL;
+}
+
/* Handle a new STREAM frame for stream with id <id>. Payload is pointed by
* <data> with length <len> and represents the offset <offset>. <fin> is set if
* the QUIC frame FIN bit is set.
char fin, char *data)
{
const int fin_standalone = (!len && fin);
- struct ncbuf *ncbuf;
struct qcs *qcs;
- enum ncb_ret ret;
+ enum ncb_ret ncb_ret;
+ uint64_t left;
+ int ret;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
offset = qcs->rx.offset;
}
- if (len) {
- if (eb_is_empty(&qcs->rx.bufs)) {
- struct qc_stream_rxbuf *buf;
- buf = pool_alloc(pool_head_qc_stream_rxbuf);
- if (!buf) {
- TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
- qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
- goto err;
- }
-
- buf->ncb = NCBUF_NULL;
- buf->off_node.key = qcs->rx.offset;
- buf->off_end = qcs->rx.offset + qmux_stream_rx_bufsz();
- eb64_insert(&qcs->rx.bufs, &buf->off_node);
+ left = len;
+ while (left) {
+ struct qc_stream_rxbuf *buf;
+ ncb_sz_t ncb_off;
- ncbuf = &buf->ncb;
- }
- else {
- struct qc_stream_rxbuf *buf = qcs_get_curr_rxbuf(qcs);
- ncbuf = &buf->ncb;
- }
-
- if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
- TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
+ buf = qcs_get_rxbuf(qcs, offset, &len);
+ if (!buf) {
+ TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
goto err;
}
- ret = ncb_add(ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
- switch (ret) {
+ /* For oldest buffer, ncb_advance() may already have been performed. */
+ ncb_off = offset - MAX(qcs->rx.offset, buf->off_node.key);
+
+ ncb_ret = ncb_add(&buf->ncb, ncb_off, data, len, NCB_ADD_COMPARE);
+ switch (ncb_ret) {
case NCB_RET_OK:
break;
qcc->conn, qcs);
return 1;
}
+
+ offset += len;
+ data += len;
+ left -= len;
+ len = left;
}
if (fin)
qcs_close_remote(qcs);
}
- if ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) ||
- unlikely(fin_standalone && qcs_is_close_remote(qcs))) {
- qcc_decode_qcs(qcc, qcs);
+ while ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) ||
+ unlikely(fin_standalone && qcs_is_close_remote(qcs))) {
+
+ ret = qcc_decode_qcs(qcc, qcs);
LIST_DEL_INIT(&qcs->el_recv);
qcc_refresh_timeout(qcc);
+
+ if (ret <= 0)
+ break;
+
+ BUG_ON_HOT(fin_standalone); /* On fin_standalone <ret> should be NULL, which ensures no infinite loop. */
}
out:
static int qcc_io_recv(struct qcc *qcc)
{
struct qcs *qcs;
+ int ret;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv);
/* No need to add an uni local stream in recv_list. */
BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_local(qcc, qcs->id));
- qcc_decode_qcs(qcc, qcs);
- LIST_DEL_INIT(&qcs->el_recv);
+
+ while (qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) {
+ ret = qcc_decode_qcs(qcc, qcs);
+ LIST_DEL_INIT(&qcs->el_recv);
+
+ if (ret <= 0)
+ break;
+ }
}
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);