/* Safe to use even if already removed from the list. */
LIST_DEL_INIT(&qcs->el_opening);
+ LIST_DEL_INIT(&qcs->el_send);
/* Release stream endpoint descriptor. */
BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
* These fields must be initialed before.
*/
LIST_INIT(&qcs->el_opening);
+ LIST_INIT(&qcs->el_send);
qcs->start = TICK_ETERNITY;
/* store transport layer stream descriptor in qcc tree */
tasklet_wakeup(qcc->wait_event.tasklet);
}
+/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM. */
+void qcc_send_stream(struct qcs *qcs)
+{
+ struct qcc *qcc = qcs->qcc;
+
+ TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+ /* Cannot send if already closed. */
+ BUG_ON(qcs_is_close_local(qcs));
+
+ if (!LIST_INLIST(&qcs->el_send))
+ LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
+
+ TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+}
+
/* Prepare for the emission of STOP_SENDING on <qcs>. */
void qcc_abort_stream_read(struct qcs *qcs)
{
if (qcs->flags & QC_SF_BLK_SFCTL) {
qcs->flags &= ~QC_SF_BLK_SFCTL;
+ /* TODO optim: only wakeup IO-CB if stream has data to sent. */
tasklet_wakeup(qcc->wait_event.tasklet);
}
}
}
}
- if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf) &&
- qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
- /* Close stream locally. */
- qcs_close_local(qcs);
- /* Reset flag to not emit multiple FIN STREAM frames. */
- qcs->flags &= ~QC_SF_FIN_STREAM;
+ if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf)) {
+ /* Remove stream from send_list if all was sent. */
+ LIST_DEL_INIT(&qcs->el_send);
+ TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+ if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
+ /* Close stream locally. */
+ qcs_close_local(qcs);
+ /* Reset flag to not emit multiple FIN STREAM frames. */
+ qcs->flags &= ~QC_SF_FIN_STREAM;
+ }
}
out:
int xfer = 0;
char fin = 0;
+ /* Cannot send STREAM on remote unidirectional streams. */
+ BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
+
/* Allocate <out> buffer if necessary. */
if (!out) {
if (qcc->flags & QC_CF_CONN_FULL)
qcc->flags |= QC_CF_APP_FINAL;
}
- /* loop through all streams, construct STREAM frames if data available.
- * TODO optimize the loop to favor streams which are not too heavy.
+ /* Loop through all streams for STOP_SENDING/RESET_STREAM sending. Each
+ * frame is send individually to guarantee emission.
+ *
+ * TODO Optimize sending by multiplexing several frames in one datagram.
*/
node = eb64_first(&qcc->streams_by_id);
while (node) {
- int ret;
uint64_t id;
qcs = eb64_entry(node, struct qcs, by_id);
continue;
}
- if (qcs_is_close_local(qcs)) {
- node = eb64_next(node);
- continue;
- }
+ node = eb64_next(node);
+ }
- if (qcs->flags & QC_SF_BLK_SFCTL) {
- node = eb64_next(node);
- continue;
- }
+ /* Send STREAM data for registered streams. */
+ list_for_each_entry(qcs, &qcc->send_list, el_send) {
+ /* Stream must not be present in send_list if it has nothing to send. */
+ BUG_ON(!b_data(&qcs->tx.buf) &&
+ qcs->tx.sent_offset == qcs->tx.offset &&
+ !qcs_stream_fin(qcs));
- /* Check if there is something to send. */
- if (!b_data(&qcs->tx.buf) && !qcs_stream_fin(qcs) &&
- !qc_stream_buf_get(qcs->stream)) {
- node = eb64_next(node);
+ if (qcs_is_close_local(qcs)) {
+ LIST_DEL_INIT(&qcs->el_send);
continue;
}
- ret = _qc_send_qcs(qcs, &frms);
- total += ret;
- node = eb64_next(node);
+ if (!(qcs->flags & QC_SF_BLK_SFCTL))
+ total += _qc_send_qcs(qcs, &frms);
}
if (qc_send_frames(qcc, &frms)) {
/* Deallocate frames that the transport layer has rejected. */
if (!LIST_ISEMPTY(&frms)) {
struct quic_frame *frm, *frm2;
+
list_for_each_entry_safe(frm, frm2, &frms, list) {
LIST_DELETE(&frm->list);
pool_free(pool_head_quic_frame, frm);
goto fail_no_tasklet;
}
+ LIST_INIT(&qcc->send_list);
LIST_INIT(&qcc->send_retry_list);
qcc->wait_event.tasklet->process = qc_io_cb;
qcs->flags |= QC_SF_FIN_STREAM;
if (ret || fin) {
+ qcc_send_stream(qcs);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
}