From: Willy Tarreau Date: Wed, 28 Aug 2024 09:38:32 +0000 (+0200) Subject: MINOR: mux-h2: count within a connection, how many streams are receiving data X-Git-Tag: v3.1-dev10~88 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=721ea5b06cccc9f01bc6c901c3b777e1f8ed8bea;p=thirdparty%2Fhaproxy.git MINOR: mux-h2: count within a connection, how many streams are receiving data A stream is receiving data from after the HEADERS frame missing END_STREAM, to the end of the stream or HREM (the presence of END_STREAM). We're now adding a flag to the stream that indicates this state, as well as a counter in the connection of streams currently receiving data. The purpose will be to gauge at any instant the number of streams that might have to share the available bandwidth and buffers count in order not to allocate too much flow control to any single stream. For now the counter is kept up to date, and is reported in "show fd". --- diff --git a/include/haproxy/mux_h2-t.h b/include/haproxy/mux_h2-t.h index ccb40b2e24..2b9f151be7 100644 --- a/include/haproxy/mux_h2-t.h +++ b/include/haproxy/mux_h2-t.h @@ -130,6 +130,8 @@ static forceinline char *h2c_show_flags(char *buf, size_t len, const char *delim #define H2_SF_TUNNEL_ABRT 0x00100000 // A tunnel attempt was aborted #define H2_SF_MORE_HTX_DATA 0x00200000 // more data expected from HTX +#define H2_SF_EXPECT_RXDATA 0x00400000 // more data expected from the peer + /* This function is used to report flags in debugging tools. Please reflect * below any single-bit flag addition above in the same order via the diff --git a/src/mux_h2.c b/src/mux_h2.c index 744dea5ea2..b662222fc7 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -56,7 +56,7 @@ struct h2c { uint32_t rcvd_c; /* newly received data to ACK for the connection */ uint32_t rcvd_s; /* newly received data for the current stream (dsi) or zero */ uint32_t wu_s; /* amount of data to write in the next WU frame for dsi, or zero */ - /* 32-bit hole here */ + uint32_t receiving_streams; /* number of streams currently receiving data */ /* states for the demux direction */ struct hpack_dht *ddht; /* demux dynamic header table */ @@ -1136,6 +1136,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s h2c->nb_sc = 0; h2c->nb_reserved = 0; h2c->stream_cnt = 0; + h2c->receiving_streams = 0; h2c->glitches = 0; h2c->dbuf = *input; @@ -1556,6 +1557,29 @@ static int h2_fragment_headers(struct buffer *b, uint32_t mfs) return 1; } +/* marks the h2s as receiving data. This will allow to update the number of + * receiving streams in the connection. + */ +static inline void h2s_count_as_receiving(struct h2s *h2s) +{ + if (!(h2s->flags & H2_SF_EXPECT_RXDATA)) { + TRACE_STATE("counting H2 stream as receiving data", H2_EV_H2S_RECV, h2s->h2c->conn, h2s); + h2s->flags |= H2_SF_EXPECT_RXDATA; + h2s->h2c->receiving_streams++; + } +} + +/* marks the h2s as no longer receiving data. This will allow to + * update the number of receiving streams in the connection. + */ +static inline void h2s_no_longer_receiving(struct h2s *h2s) +{ + if (h2s->flags & H2_SF_EXPECT_RXDATA) { + TRACE_STATE("counting H2 stream as not receiving data", H2_EV_H2S_RECV, h2s->h2c->conn, h2s); + h2s->flags &= ~H2_SF_EXPECT_RXDATA; + h2s->h2c->receiving_streams--; + } +} /* marks stream as CLOSED and decrement the number of active streams for * its connection if the stream was not yet closed. Please use this exclusively @@ -1580,6 +1604,7 @@ static inline void h2s_close(struct h2s *h2s) TRACE_LEAVE(H2_EV_H2S_END, h2s->h2c->conn, h2s); } h2s->st = H2_SS_CLOSED; + h2s_no_longer_receiving(h2s); } /* Check h2c and h2s flags to evaluate if EOI/EOS/ERR_PENDING/ERROR flags must @@ -2324,6 +2349,7 @@ static void h2s_wake_one_stream(struct h2s *h2s) } if (h2c_read0_pending(h2s->h2c)) { + h2s_no_longer_receiving(h2s); if (h2s->st == H2_SS_OPEN) h2s->st = H2_SS_HREM; else if (h2s->st == H2_SS_HLOC) @@ -3105,11 +3131,15 @@ static struct h2s *h2c_frt_handle_headers(struct h2c *h2c, struct h2s *h2s) done: if (h2s->flags & H2_SF_ES_RCVD) { + h2s_no_longer_receiving(h2s); if (h2s->st == H2_SS_OPEN) h2s->st = H2_SS_HREM; else h2s_close(h2s); } + else + h2s_count_as_receiving(h2s); + TRACE_LEAVE(H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s); goto leave; @@ -3228,11 +3258,14 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s) if (se_fl_test(h2s->sd, SE_FL_ERROR) && h2s->st < H2_SS_ERROR) h2s->st = H2_SS_ERROR; else if (h2s->flags & H2_SF_ES_RCVD) { + h2s_no_longer_receiving(h2s); if (h2s->st == H2_SS_OPEN) h2s->st = H2_SS_HREM; else if (h2s->st == H2_SS_HLOC) h2s_close(h2s); } + else + h2s_count_as_receiving(h2s); /* Unblock busy server h2s waiting for the response headers to validate * the tunnel establishment or the end of the response of an oborted @@ -3357,6 +3390,7 @@ static int h2c_handle_data(struct h2c *h2c, struct h2s *h2s) /* last frame */ if (h2c->dff & H2_F_DATA_END_STREAM) { h2s->flags |= H2_SF_ES_RCVD; + h2s_no_longer_receiving(h2s); if (h2s->st == H2_SS_OPEN) h2s->st = H2_SS_HREM; else @@ -7602,9 +7636,9 @@ static int h2_dump_h2c_info(struct buffer *msg, struct h2c *h2c, const char *pfx hmbuf = br_head(h2c->mbuf); tmbuf = br_tail(h2c->mbuf); chunk_appendf(msg, " h2c.st0=%s .err=%d .maxid=%d .lastid=%d .flg=0x%04x" - " .nbst=%u .nbsc=%u, .glitches=%d", + " .nbst=%u .nbsc=%u .nbrcv=%u .glitches=%d", h2c_st_to_str(h2c->st0), h2c->errcode, h2c->max_id, h2c->last_sid, h2c->flags, - h2c->nb_streams, h2c->nb_sc, h2c->glitches); + h2c->nb_streams, h2c->nb_sc, h2c->receiving_streams, h2c->glitches); if (pfx) chunk_appendf(msg, "\n%s", pfx);