/* At least 9 bytes to store frame type + length as a varint max size */
if (b_room(res) < 9) {
+ /* TODO */
TRACE_STATE("not enough room for trailers frame", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
- qcs->flags |= QC_SF_BLK_MROOM;
- goto end;
+ ABORT_NOW();
}
/* Force buffer realignment as size required to encode headers is unknown. */
headers_buf = b_make(b_peek(res, b_data(res) + 9), b_contig_space(res) - 9, 0, 0);
if (qpack_encode_field_section_line(&headers_buf)) {
+ /* TODO */
TRACE_STATE("not enough room for trailers section line", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
- qcs->flags |= QC_SF_BLK_MROOM;
- goto end;
+ ABORT_NOW();
}
tail = b_tail(&headers_buf);
}
if (qpack_encode_header(&headers_buf, list[hdr].n, list[hdr].v)) {
+ /* TODO */
TRACE_STATE("not enough room for all trailers", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
- qcs->flags |= QC_SF_BLK_MROOM;
- goto end;
+ ABORT_NOW();
}
}
if (fsize > count)
fsize = count;
- while (1) {
- b_reset(&outbuf);
- outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
- if (b_size(&outbuf) > hsize || !b_space_wraps(res))
- break;
- b_slow_realign(res, trash.area, b_data(res));
- }
+ /* TODO buffer can be realign only if no data waiting for ACK. */
+ outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
- /* Not enough room for headers and at least one data byte, block the
- * stream. It is expected that the stream connector layer will subscribe
- * on SEND.
- */
if (b_size(&outbuf) <= hsize) {
+ /* TODO */
TRACE_STATE("not enough room for data frame", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
- qcs->flags |= QC_SF_BLK_MROOM;
- goto end;
+ ABORT_NOW();
}
if (b_size(&outbuf) < hsize + fsize)
htx = htx_from_buf(buf);
- while (count && !htx_is_empty(htx) &&
- !(qcs->flags & QC_SF_BLK_MROOM) && !h3c->err) {
+ while (count && !htx_is_empty(htx) && !h3c->err) {
idx = htx_get_head(htx);
blk = htx_get_blk(htx, idx);
/* h3 DATA headers : 1-byte frame type + varint frame length */
hsize = 1 + QUIC_VARINT_MAX_SIZE;
- while (1) {
- if (b_contig_space(res) >= hsize || !b_space_wraps(res))
- break;
- b_slow_realign(res, trash.area, b_data(res));
- }
+ /* TODO buffer can be realign only if no data waiting for ACK. */
/* Not enough room for headers and at least one data byte, block the
* stream. It is expected that the stream connector layer will subscribe
* on SEND.
*/
if (b_contig_space(res) <= hsize) {
- qcs->flags |= QC_SF_BLK_MROOM;
+ /* TODO */
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
}
size_t count)
{
enum htx_blk_type btype;
- struct htx *htx;
+ struct htx *htx = NULL;
struct htx_blk *blk;
int32_t idx;
uint32_t bsize, fsize;
- struct buffer *res, outbuf;
+ struct buffer *res = NULL;
size_t total = 0;
- res = qcc_get_stream_txbuf(qcs);
- outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
-
htx = htx_from_buf(buf);
- while (count && !htx_is_empty(htx) && !(qcs->flags & QC_SF_BLK_MROOM)) {
+ while (count && !htx_is_empty(htx)) {
/* Not implemented : QUIC on backend side */
idx = htx_get_head(htx);
blk = htx_get_blk(htx, idx);
switch (btype) {
case HTX_BLK_DATA:
+ res = qcc_get_stream_txbuf(qcs);
+ if (!res) {
+ /* TODO */
+ ABORT_NOW();
+ }
+
if (unlikely(fsize == count &&
!b_data(res) &&
htx_nbblks(htx) == 1 && btype == HTX_BLK_DATA)) {
if (fsize > count)
fsize = count;
- if (b_room(&outbuf) < fsize)
- fsize = b_room(&outbuf);
+ if (b_contig_space(res) < fsize)
+ fsize = b_contig_space(res);
if (!fsize) {
- qcs->flags |= QC_SF_BLK_MROOM;
- goto end;
+ /* TODO */
+ ABORT_NOW();
}
- b_putblk(&outbuf, htx_get_blk_ptr(htx, blk), fsize);
+ b_putblk(res, htx_get_blk_ptr(htx, blk), fsize);
total += fsize;
count -= fsize;
}
end:
- b_add(res, b_data(&outbuf));
htx_to_buf(htx, buf);
return total;
static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count)
{
- struct buffer *res = qcc_get_stream_txbuf(qcs);
+ int ret = 0;
+ struct buffer *res;
- if (!b_room(res)) {
- qcs->flags |= QC_SF_BLK_MROOM;
+ res = qcc_get_stream_txbuf(qcs);
+ if (!res) {
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
+ /* TODO */
+ ABORT_NOW();
+ }
+
+ if (!b_room(res)) {
+ /* TODO */
+ ABORT_NOW();
}
/* No header required for HTTP/0.9, no need to reserve an offset. */
qcs->sd->iobuf.offset = 0;
qcs->sd->iobuf.data = 0;
+ ret = MIN(count, b_contig_space(res));
end:
- return MIN(b_contig_space(res), count);
+ return ret;
}
static size_t hq_interop_done_ff(struct qcs *qcs)
/* Release qc_stream_desc buffer from quic-conn layer. */
qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real);
- /* Free Rx/Tx buffers. */
+ /* Free Rx buffer. */
qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
- b_free(&qcs->tx.buf);
/* Remove qcs from qcc tree. */
eb64_delete(&qcs->by_id);
}
qcs->rx.msd_init = qcs->rx.msd;
- qcs->tx.buf = BUF_NULL;
qcs->tx.offset = 0;
qcs->wait_event.tasklet = NULL;
*/
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs)
{
- return b_alloc(&qcs->tx.buf);
+ struct qcc *qcc = qcs->qcc;
+ int buf_avail;
+ struct buffer *out = qc_stream_buf_get(qcs->stream);
+
+ if (!out) {
+ out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
+ &buf_avail);
+ if (!out)
+ goto out;
+
+ if (!b_alloc(out)) {
+ TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ goto out;
+ }
+ }
+
+ out:
+ return out;
}
/* Wakes up every streams of <qcc> which are currently waiting for sending but
const int soft_blocked = qfctl_sblocked(&qcc->tx.fc);
qcc->tx.fc.off_soft -= (qcs->tx.fc.off_soft - qcs->tx.fc.off_real);
+
if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc))
qcc_notify_fctl(qcc);
}
if (count) {
qfctl_sinc(&qcc->tx.fc, count);
qfctl_sinc(&qcs->tx.fc, count);
+
+ qcs->tx.offset += count;
+ qcs->qcc->tx.offsets += count;
}
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
TRACE_LEAVE(QMUX_EV_QCS_END, conn);
}
-/* Transfer as much as possible data on <qcs> from <in> to <out>.
- *
- * Returns the total bytes of transferred data or a negative error code.
- */
-static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in)
-{
- struct qcc *qcc = qcs->qcc;
- int to_xfer;
- int total = 0;
-
- TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
-
- if (!b_alloc(out)) {
- TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- goto err;
- }
-
- /*
- * QCS out buffer diagram
- * head left to_xfer
- * -------------> ----------> ----->
- * --------------------------------------------------
- * |...............|xxxxxxxxxxx|<<<<<
- * --------------------------------------------------
- * ^ ack-off ^ sent-off ^ off
- *
- * STREAM frame
- * ^ ^
- * |xxxxxxxxxxxxxxxxx|
- */
-
- BUG_ON_HOT(qcs->tx.fc.off_real < qcs->stream->ack_offset);
- BUG_ON_HOT(qcs->tx.offset < qcs->tx.fc.off_real);
- BUG_ON_HOT(qcc->tx.offsets < qcc->tx.fc.off_real);
-
- to_xfer = QUIC_MIN(b_data(in), b_room(out));
-
- if (!to_xfer)
- goto out;
-
- total = b_force_xfer(out, in, to_xfer);
-
- out:
- {
- struct qcs_xfer_data_trace_arg arg = {
- .prep = b_data(out), .xfer = total,
- };
- TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_XFER_DATA,
- qcc->conn, qcs, &arg);
- }
-
- return total;
-
- err:
- TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- return -1;
-}
-
/* Prepare a STREAM frame for <qcs> instance using <out> as payload. The frame
* is appended in <frm_list>. Set <fin> if this is supposed to be the last
* stream frame. If <out> is NULL an empty STREAM frame is built : this may be
return -1;
}
-/* Check after transferring data from qcs.tx.buf if FIN must be set on the next
- * STREAM frame for <qcs>.
- *
- * Returns true if FIN must be set else false.
- */
-static int qcs_stream_fin(struct qcs *qcs)
-{
- return qcs->flags & QC_SF_FIN_STREAM && !b_data(&qcs->tx.buf);
-}
-
/* Return true if <qcs> has data to send in new STREAM frames. */
static forceinline int qcs_need_sending(struct qcs *qcs)
{
- return b_data(&qcs->tx.buf) || qcs->tx.fc.off_real < qcs->tx.offset ||
- qcs_stream_fin(qcs);
+ return qcs->tx.fc.off_real < qcs->tx.offset ||
+ qcs->flags & QC_SF_FIN_STREAM;
}
/* This function must be called by the upper layer to inform about the sending
increment_send_rate(diff, 0);
}
- if (qcs->tx.offset == qcs->tx.fc.off_real && !b_data(&qcs->tx.buf)) {
+ if (qcs->tx.offset == qcs->tx.fc.off_real) {
/* Remove stream from send_list if all was sent. */
LIST_DEL_INIT(&qcs->el_send);
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
return 0;
}
-/* Used internally by qcc_io_send function. Proceed to send for <qcs>. This will
- * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
- * is then generated and inserted in <frms> list. Frame length will be
- * truncated if greater than <window_conn>. This allows to prepare several
- * frames in a loop while respecting connection flow control window.
+/* Used internally by qcc_io_send function. Proceed to send for <qcs>. A STREAM
+ * frame is generated poiting to QCS stream descriptor content and inserted in
+ * <frms> list. Frame length will be truncated if greater than <window_conn>.
+ * This allows to prepare several frames in a loop while respecting connection
+ * flow control window.
*
* Returns the payload length of the STREAM frame or a negative error code.
*/
static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn)
{
struct qcc *qcc = qcs->qcc;
- struct buffer *buf = &qcs->tx.buf;
struct buffer *out = qc_stream_buf_get(qcs->stream);
- int xfer = 0, flen = 0, buf_avail;
- char fin = 0;
+ int flen = 0;
+ const char fin = qcs->flags & QC_SF_FIN_STREAM;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
/* Cannot send STREAM on remote unidirectional streams. */
BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
- if (b_data(buf)) {
- /* Allocate <out> buffer if not already done. */
- if (!out) {
- if (qcc->flags & QC_CF_CONN_FULL)
- goto out;
-
- out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset,
- &buf_avail);
- if (!out) {
- if (buf_avail) {
- TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- goto err;
- }
-
- TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- qcc->flags |= QC_CF_CONN_FULL;
- goto out;
- }
- }
-
- /* Transfer data from <buf> to <out>. */
- xfer = qcs_xfer_data(qcs, out, buf);
- if (xfer < 0)
- goto err;
-
- if (xfer > 0) {
- qcs_notify_send(qcs);
- qcs->flags &= ~QC_SF_BLK_MROOM;
- }
-
- qcs->tx.offset += xfer;
- qcc->tx.offsets += xfer;
+ /* This function must not be called if there is nothing to send. */
+ BUG_ON(!fin && !qcs_need_sending(qcs));
- /* If out buffer is empty, QCS offsets must be equal. */
- BUG_ON(!b_data(out) && qcs->tx.fc.off_real != qcs->tx.offset);
+ /* Skip STREAM frame allocation if already subscribed for send.
+ * Happens on sendto transient error or network congestion.
+ */
+ if (qcc->wait_event.events & SUB_RETRY_SEND) {
+ TRACE_DEVEL("already subscribed for sending",
+ QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ goto err;
}
- /* FIN is set if all incoming data were transferred. */
- fin = qcs_stream_fin(qcs);
-
/* Build a new STREAM frame with <out> buffer. */
- if (qcs->tx.fc.off_real != qcs->tx.offset || fin) {
- /* Skip STREAM frame allocation if already subscribed for send.
- * Happens on sendto transient error or network congestion.
- */
- if (qcc->wait_event.events & SUB_RETRY_SEND) {
- TRACE_DEVEL("already subscribed for sending",
- QMUX_EV_QCS_SEND, qcc->conn, qcs);
- goto err;
- }
-
- flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn);
- if (flen < 0)
- goto err;
- }
+ flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn);
+ if (flen < 0)
+ goto err;
out:
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
size_t count, int flags)
{
struct qcs *qcs = __sc_mux_strm(sc);
- const size_t old_data = b_data(&qcs->tx.buf);
+ struct buffer *out = qc_stream_buf_get(qcs->stream);
+ const size_t old_data = out ? b_data(out) : 0;
size_t ret = 0;
char fin;
}
if (ret || fin) {
- qcc_send_stream(qcs, 0, b_data(&qcs->tx.buf) - old_data);
+ const size_t data = b_data(qc_stream_buf_get(qcs->stream)) - (old_data);
+ if (data || fin)
+ qcc_send_stream(qcs, 0, data);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
}
data += sd->iobuf.offset;
total = qcs->qcc->app_ops->done_ff(qcs);
- qcc_send_stream(qcs, 0, data);
+ if (data || qcs->flags & QC_SF_FIN_STREAM)
+ qcc_send_stream(qcs, 0, data);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(qcc->wait_event.tasklet);
end:
- if (!b_data(&qcs->tx.buf))
- b_free(&qcs->tx.buf);
-
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
return total;
}