{ .mask = QMUX_EV_STRM_END, .name = "strm_end", .desc = "detaching app-layer stream" },
#define QMUX_EV_SEND_FRM (1ULL << 13)
{ .mask = QMUX_EV_SEND_FRM, .name = "send_frm", .desc = "sending QUIC frame" },
-/* special event dedicated to qcs_push_frame */
-#define QMUX_EV_QCS_PUSH_FRM (1ULL << 14)
- { .mask = QMUX_EV_QCS_PUSH_FRM, .name = "qcs_push_frm", .desc = "qcs_push_frame" },
+/* special event dedicated to qcs_xfer_data */
+#define QMUX_EV_QCS_XFER_DATA (1ULL << 14)
+ { .mask = QMUX_EV_QCS_XFER_DATA, .name = "qcs_xfer_data", .desc = "qcs_xfer_data" },
+/* special event dedicated to qcs_build_stream_frm */
+#define QMUX_EV_QCS_BUILD_STRM (1ULL << 15)
+ { .mask = QMUX_EV_QCS_BUILD_STRM, .name = "qcs_build_stream_frm", .desc = "qcs_build_stream_frm" },
{ }
};
-/* custom arg for QMUX_EV_QCS_PUSH_FRM */
-struct qcs_push_frm_trace_arg {
- size_t sent;
+/* custom arg for QMUX_EV_QCS_XFER_DATA */
+struct qcs_xfer_data_trace_arg {
+ size_t prep;
int xfer;
+};
+/* custom arg for QMUX_EV_QCS_BUILD_STRM */
+struct qcs_build_stream_trace_arg {
+ size_t len;
char fin;
uint64_t offset;
};
*
* Returns the total bytes of newly transferred data or a negative error code.
*/
-static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
- struct buffer *payload, int fin,
- struct list *frm_list, uint64_t max_data)
+static int qcs_xfer_data(struct qcs *qcs, struct buffer *out,
+ struct buffer *payload, uint64_t max_data)
{
struct qcc *qcc = qcs->qcc;
- struct quic_frame *frm;
- int head, left, to_xfer;
+ int left, to_xfer;
int total = 0;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset);
BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
- head = qcs->tx.sent_offset - qcs->stream->ack_offset;
left = qcs->tx.offset - qcs->tx.sent_offset;
to_xfer = QUIC_MIN(b_data(payload), b_room(out));
if (!left && !to_xfer)
goto out;
+ total = b_force_xfer(out, payload, to_xfer);
+
+ out:
+ {
+ struct qcs_xfer_data_trace_arg arg = {
+ .prep = b_data(out), .xfer = total,
+ };
+ TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_XFER_DATA,
+ qcc->conn, qcs, &arg);
+ }
+
+ return total;
+
+ err:
+ TRACE_DEVEL("leaving in error", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ return -1;
+}
+
+static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
+ struct list *frm_list)
+{
+ struct qcc *qcc = qcs->qcc;
+ struct quic_frame *frm;
+ int head, total;
+
+ TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+ /* cf buffer schema in qcs_xfer_data */
+ head = qcs->tx.sent_offset - qcs->stream->ack_offset;
+ total = b_data(out) - head;
+ if (!total) {
+ TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ return 0;
+ }
+
frm = pool_zalloc(pool_head_quic_frame);
if (!frm)
goto err;
- total = b_force_xfer(out, payload, to_xfer);
-
frm->type = QUIC_FT_STREAM_8;
frm->stream.stream = qcs->stream;
frm->stream.id = qcs->id;
frm->stream.data = (unsigned char *)b_peek(out, head);
/* FIN is positioned only when the buffer has been totally emptied. */
- fin = fin && !b_data(payload);
if (fin)
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
frm->stream.offset.key = qcs->tx.sent_offset;
}
- if (left + total) {
- frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
- frm->stream.len = left + total;
- }
+ frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
+ frm->stream.len = total;
LIST_APPEND(frm_list, &frm->list);
out:
{
- struct qcs_push_frm_trace_arg arg = {
- .sent = b_data(out), .xfer = total, .fin = fin,
- .offset = qcs->tx.sent_offset
+ struct qcs_build_stream_trace_arg arg = {
+ .len = frm->stream.len, .fin = fin,
+ .offset = frm->stream.offset.key,
};
- TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_PUSH_FRM,
+ TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_BUILD_STRM,
qcc->conn, qcs, &arg);
}
continue;
}
- if (b_data(buf) || b_data(out)) {
- int ret;
- char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
-
- ret = qcs_push_frame(qcs, out, buf, fin, &frms,
- qcc->tx.sent_offsets + total);
+ /* Prepare <out> buffer with data from <buf>. */
+ if (b_data(buf)) {
+ int ret = qcs_xfer_data(qcs, out, buf,
+ qcc->tx.sent_offsets + total);
BUG_ON(ret < 0); /* TODO handle this properly */
if (ret > 0) {
qcs->tx.offset += ret;
total += ret;
+ }
- /* Subscribe if not all data can be send. */
- if (b_data(buf)) {
- qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
- SUB_RETRY_SEND, &qcc->wait_event);
- }
+ /* Subscribe if not all data can be transfered. */
+ if (b_data(buf)) {
+ qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
+ SUB_RETRY_SEND, &qcc->wait_event);
}
+
+ /* Build a new STREAM frame with <out> buffer. */
+ if (b_data(out)) {
+ int ret;
+ char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+
+ /* FIN is set if all incoming data were transfered. */
+ fin = !!(fin && !b_data(buf));
+ ret = qcs_build_stream_frm(qcs, out, fin, &frms);
+ BUG_ON(ret < 0); /* TODO handle this properly */
+ }
+
node = eb64_next(node);
}
if (mask & QMUX_EV_SEND_FRM)
qmux_trace_frm(a3);
- if (mask & QMUX_EV_QCS_PUSH_FRM) {
- const struct qcs_push_frm_trace_arg *arg = a3;
- chunk_appendf(&trace_buf, " sent=%lu xfer=%d fin=%d offset=%lu",
- arg->sent, arg->xfer, arg->fin, arg->offset);
+ if (mask & QMUX_EV_QCS_XFER_DATA) {
+ const struct qcs_xfer_data_trace_arg *arg = a3;
+ chunk_appendf(&trace_buf, " prep=%lu xfer=%d",
+ arg->prep, arg->xfer);
+ }
+
+ if (mask & QMUX_EV_QCS_BUILD_STRM) {
+ const struct qcs_build_stream_trace_arg *arg = a3;
+ chunk_appendf(&trace_buf, " len=%lu fin=%d offset=%lu",
+ arg->len, arg->fin, arg->offset);
}
}
}