if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
qc_stream_buf_release(qcs->stream);
- tasklet_wakeup(qcc->wait_event.tasklet);
+
+ /* reschedule send if buffers available */
+ if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
+ tasklet_wakeup(qcc->wait_event.tasklet);
+ }
+ else {
+ qcc->flags |= QC_CF_CONN_FULL;
+ }
}
}
continue;
}
- if (!out) {
- struct connection *conn = qcc->conn;
+ if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
+ node = eb64_next(node);
+ continue;
+ }
- out = qc_stream_buf_alloc(qcs->stream,
- qcs->tx.offset);
+ if (!out) {
+ out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
if (!out) {
- conn->xprt->subscribe(conn, conn->xprt_ctx,
- SUB_RETRY_SEND, &qcc->wait_event);
+ qcc->flags |= QC_CF_CONN_FULL;
node = eb64_next(node);
continue;
}
* Returns the count of byte removed from stream. Do not forget to check if
* <stream> is NULL after invocation.
*/
-int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset,
- size_t len)
+int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len)
{
struct qc_stream_desc *s = *stream;
struct qc_stream_buf *stream_buf;
+ struct quic_conn *qc = s->qc;
struct buffer *buf;
size_t diff;
pool_free(pool_head_quic_conn_stream_buf, stream_buf);
offer_buffers(NULL, 1);
+ /* notify MUX about available buffers. */
+ --qc->stream_buf_count;
+ if (qc->mux_state == QC_MUX_READY) {
+ if (qc->qcc->flags & QC_CF_CONN_FULL) {
+ qc->qcc->flags &= ~QC_CF_CONN_FULL;
+ tasklet_wakeup(qc->qcc->wait_event.tasklet);
+ }
+ }
+
/* Free stream instance if already released and no buffers left. */
if (s->release && LIST_ISEMPTY(&s->buf_list)) {
qc_stream_desc_free(s);
void qc_stream_desc_free(struct qc_stream_desc *stream)
{
struct qc_stream_buf *buf, *buf_back;
+ struct quic_conn *qc = stream->qc;
struct eb64_node *frm_node;
unsigned int free_count = 0;
}
}
- if (free_count)
+ if (free_count) {
offer_buffers(NULL, free_count);
+ qc->stream_buf_count -= free_count;
+ if (qc->mux_state == QC_MUX_READY) {
+ /* notify MUX about available buffers. */
+ if (qc->qcc->flags & QC_CF_CONN_FULL) {
+ qc->qcc->flags &= ~QC_CF_CONN_FULL;
+ tasklet_wakeup(qc->qcc->wait_event.tasklet);
+ }
+ }
+ }
+
/* qc_stream_desc might be freed before having received all its ACKs.
* This is the case if some frames were retransmitted.
*/
return &stream->buf->buf;
}
-/* Allocate a new current buffer for <stream>. This function is not allowed if
- * current buffer is not NULL prior to this call. The new buffer represents
- * stream payload at offset <offset>.
+/* Check if a new stream buffer can be allocated for the connection <qc>.
+ * Returns a boolean.
+ */
+int qc_stream_buf_avail(struct quic_conn *qc)
+{
+ /* TODO use a global tune settings for max */
+ return qc->stream_buf_count < 30;
+}
+
+/* Allocate a new current buffer for <stream>. The buffer limit count for the
+ * connection is checked first. This function is not allowed if current buffer
+ * is not NULL prior to this call. The new buffer represents stream payload at
+ * offset <offset>.
*
* Returns the buffer or NULL.
*/
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
uint64_t offset)
{
+ struct quic_conn *qc = stream->qc;
+
/* current buffer must be released first before allocate a new one. */
BUG_ON(stream->buf);
+ if (!qc_stream_buf_avail(qc))
+ return NULL;
+
+ ++qc->stream_buf_count;
+
stream->buf_offset = offset;
stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
if (!stream->buf)