struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type, void *ctx,
struct quic_conn *qc);
-void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size);
+void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size,
+ void *new_ctx);
int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len, int fin);
void qc_stream_desc_free(struct qc_stream_desc *stream, int closing);
stream->notify_send = cb;
}
+/* Subscribe for room notification on <stream>. */
+static inline void qc_stream_desc_sub_room(struct qc_stream_desc *stream,
+ void (*cb)(struct qc_stream_desc *s, uint64_t offset))
+{
+ stream->notify_room = cb;
+}
+
#endif /* USE_QUIC */
#endif /* _HAPROXY_QUIC_STREAM_H_ */
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
+static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
{
/* Release qc_stream_desc buffer from quic-conn layer. */
if (qcs->stream) {
qc_stream_desc_sub_send(qcs->stream, NULL);
- qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real);
+ qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real, qcc);
}
/* Free Rx buffer. */
}
qc_stream_desc_sub_send(qcs->stream, qmux_ctrl_send);
+ qc_stream_desc_sub_room(qcs->stream, qmux_ctrl_room);
}
if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) {
return qcc->tx.buf_in_flight >= qc->path->cwnd;
}
+static void qmux_ctrl_room(struct qc_stream_desc *stream, uint64_t room)
+{
+ /* Context is different for active and released streams. */
+ struct qcc *qcc = !(stream->flags & QC_SD_FL_RELEASE) ?
+ ((struct qcs *)stream->ctx)->qcc : stream->ctx;
+ qcc_notify_buf(qcc, room);
+}
+
/* Report that one or several stream-desc buffers have been released for <qcc>
* connection. <free_size> represent the sum of freed buffers sizes. May also
* be used to notify about congestion window increase, in which case
/* Cannot use if some data already transferred for this stream. */
BUG_ON(qcs->stream->ack_offset || !LIST_ISEMPTY(&qcs->stream->buf_list));
- qcs->stream->flags |= QC_SD_FL_OOB_BUF;
+ qcs->flags |= QC_SF_TXBUB_OOB;
+ qc_stream_desc_sub_room(qcs->stream, NULL);
}
struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin)
{
struct qcc *qcc = qcs->qcc;
struct buffer *out = qc_stream_buf_get(qcs->stream);
- const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF;
/* Stream must not try to reallocate a buffer if currently waiting for one. */
BUG_ON(LIST_INLIST(&qcs->el_buf));
*err = 0;
if (!out) {
- if (likely(!unlimited)) {
+ if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) {
if ((qcc->flags & QC_CF_CONN_FULL)) {
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
tot_time_start(&qcs->timer.buf);
goto out;
}
- if (likely(!unlimited))
+ if (likely(!(qcs->flags & QC_SF_TXBUB_OOB)))
qcc->tx.buf_in_flight += b_size(out);
}
{
struct qcc *qcc = qcs->qcc;
struct buffer *out = qc_stream_buf_get(qcs->stream);
- const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF;
/* Stream must not try to reallocate a buffer if currently waiting for one. */
BUG_ON(LIST_INLIST(&qcs->el_buf));
- if (likely(!unlimited)) {
+ if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) {
/* Reduce buffer window. As such there is always some space
* left for a new buffer allocation.
*/
goto out;
}
- if (likely(!unlimited))
+ if (likely(!(qcs->flags & QC_SF_TXBUB_OOB)))
qcc->tx.buf_in_flight += b_size(out);
out:
{
struct connection *conn = qcc->conn;
struct eb64_node *node;
+ struct quic_conn *qc = conn->handle.qc;
TRACE_ENTER(QMUX_EV_QCC_END, conn);
qcs_free(qcs);
}
+ /* unsubscribe from all remaining qc_stream_desc */
+ node = eb64_first(&qc->streams_by_id);
+ while (node) {
+ struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
+ qc_stream_desc_sub_room(stream, NULL);
+ node = eb64_next(node);
+ }
+
tasklet_free(qcc->wait_event.tasklet);
if (conn && qcc->wait_event.events) {
conn->xprt->unsubscribe(conn, conn->xprt_ctx,
static void qc_stream_buf_free(struct qc_stream_desc *stream,
struct qc_stream_buf **stream_buf)
{
- struct quic_conn *qc = stream->qc;
struct buffer *buf = &(*stream_buf)->buf;
uint64_t free_size;
*stream_buf = NULL;
/* notify MUX about available buffers. */
- if (qc->mux_state == QC_MUX_READY) {
- if (!(stream->flags & QC_SD_FL_OOB_BUF)) {
- /* notify MUX about available buffers. */
- qcc_notify_buf(qc->qcc, free_size);
- }
- }
+ if (stream->notify_room)
+ stream->notify_room(stream, free_size);
}
/* Allocate a new stream descriptor with id <id>. The caller is responsible to
stream->flags = 0;
stream->ctx = ctx;
stream->notify_send = NULL;
+ stream->notify_room = NULL;
return stream;
}
* <final_size> corresponds to the last offset sent for this stream. If there
* is unsent data present, they will be remove first to guarantee that buffer
* is freed after receiving all acknowledges.
+ *
+ * It is expected that upper layer instance related to <stream> may disappear
+ * after this operation. As such, <new_ctx> must be set to reassociate <stream>
+ * for notifications.
*/
void qc_stream_desc_release(struct qc_stream_desc *stream,
- uint64_t final_size)
+ uint64_t final_size, void *new_ctx)
{
/* A stream can be released only one time. */
BUG_ON(stream->flags & QC_SD_FL_RELEASE);
stream->flags |= QC_SD_FL_RELEASE;
- stream->ctx = NULL;
+ stream->ctx = new_ctx;
if (stream->buf) {
struct qc_stream_buf *stream_buf = stream->buf;