DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
+DECLARE_STATIC_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", sizeof(struct qc_stream_rxbuf));
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
return !(quic_tune.options & QUIC_TUNE_NO_PACING);
}
-static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+/* Free <rxbuf> instance and its inner data storage attached to <qcs> stream. */
+static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
{
+ struct ncbuf *ncbuf;
struct buffer buf;
- if (ncb_is_null(ncbuf))
+ if (!rxbuf)
return;
- buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
- b_free(&buf);
- offer_buffers(NULL, 1);
+ ncbuf = &rxbuf->ncb;
- *ncbuf = NCBUF_NULL;
+ if (!ncb_is_null(ncbuf)) {
+ buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
+ b_free(&buf);
+ offer_buffers(NULL, 1);
+ }
/* Reset DEM_FULL as buffer is released. This ensures mux is not woken
* up from rcv_buf stream callback when demux was previously blocked.
*/
qcs->flags &= ~QC_SF_DEM_FULL;
+
+ pool_free(pool_head_qc_stream_rxbuf, rxbuf);
+ qcs->rx.buf = NULL;
}
/* Free <qcs> instance. This function is reserved for internal usage : it must
}
/* Free Rx buffer. */
- qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
+ qcs_free_rxbuf(qcs, qcs->rx.buf);
/* Remove qcs from qcc tree. */
eb64_delete(&qcs->by_id);
qfctl_init(&qcs->tx.fc, 0);
}
- qcs->rx.ncbuf = NCBUF_NULL;
+ qcs->rx.buf = NULL;
qcs->rx.app_buf = BUF_NULL;
qcs->rx.offset = qcs->rx.offset_max = 0;
return 1;
}
-/* Simple function to duplicate a buffer */
-static inline struct buffer qcs_b_dup(const struct ncbuf *b)
+/* Convert <b> out-of-order storage into a contiguous buffer. */
+static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
+{
+ if (b) {
+ const struct ncbuf *ncb = &b->ncb;
+ return b_make(ncb_orig(ncb), ncb->size, ncb->head, ncb_data(ncb, 0));
+ }
+ else {
+ return BUF_NULL;
+ }
+}
+
+/* Returns the amount of data readable at <qcs> stream current offset. */
+static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
{
- return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
+ return qcs->rx.buf ? ncb_data(&qcs->rx.buf->ncb, 0) : 0;
}
/* Remove <bytes> from <qcs> Rx buffer. Flow-control for received offsets may
{
struct qcc *qcc = qcs->qcc;
struct quic_frame *frm;
- struct ncbuf *buf = &qcs->rx.ncbuf;
+ struct ncbuf *ncbuf = &qcs->rx.buf->ncb;
enum ncb_ret ret;
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
- ret = ncb_advance(buf, bytes);
+ ret = ncb_advance(ncbuf, bytes);
if (ret) {
ABORT_NOW(); /* should not happens because removal only in data */
}
- if (ncb_is_empty(buf))
- qcs_free_ncbuf(qcs, buf);
+ if (ncb_is_empty(ncbuf))
+ qcs_free_rxbuf(qcs, qcs->rx.buf);
qcs->rx.offset += bytes;
/* Not necessary to emit a MAX_STREAM_DATA if all data received. */
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
- b = qcs_b_dup(&qcs->rx.ncbuf);
+ b = qcs_b_dup(qcs->rx.buf);
/* Signal FIN to application if STREAM FIN received with all data. */
if (qcs_is_close_remote(qcs))
char fin, char *data)
{
const int fin_standalone = (!len && fin);
+ struct ncbuf *ncbuf;
struct qcs *qcs;
enum ncb_ret ret;
}
if (len) {
- if (!qcs_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) {
+ if (!qcs->rx.buf) {
+ struct qc_stream_rxbuf *buf;
+ buf = pool_alloc(pool_head_qc_stream_rxbuf);
+ if (!buf) {
+ TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
+ qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
+ goto err;
+ }
+
+ buf->ncb = NCBUF_NULL;
+ buf->off = qcs->rx.offset;
+ buf->off_end = buf->off + qmux_stream_rx_bufsz();
+ qcs->rx.buf = buf;
+ ncbuf = &buf->ncb;
+ }
+ else {
+ ncbuf = &qcs->rx.buf->ncb;
+ }
+
+ if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
goto err;
}
- ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
+ ret = ncb_add(ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
switch (ret) {
case NCB_RET_OK:
break;
qcs->flags |= QC_SF_SIZE_KNOWN;
if (qcs->flags & QC_SF_SIZE_KNOWN &&
- qcs->rx.offset_max == qcs->rx.offset + ncb_data(&qcs->rx.ncbuf, 0)) {
+ qcs->rx.offset_max == qcs->rx.offset + qcs_rx_avail_data(qcs)) {
qcs_close_remote(qcs);
}
- if ((ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) ||
+ if ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) ||
unlikely(fin_standalone && qcs_is_close_remote(qcs))) {
qcc_decode_qcs(qcc, qcs);
LIST_DEL_INIT(&qcs->el_recv);
*/
qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET;
qcs_close_remote(qcs);
- qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
+ qcs_free_rxbuf(qcs, qcs->rx.buf);
out:
if (qcc->glitches != prev_glitches)
/* Ensure DEM_FULL is only set if there is available data to
* ensure we never do unnecessary wakeup here.
*/
- BUG_ON(!ncb_data(&qcs->rx.ncbuf, 0));
+ BUG_ON(!qcs_rx_avail_data(qcs));
qcs->flags &= ~QC_SF_DEM_FULL;
if (!(qcc->flags & QC_CF_ERRL)) {