]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-h2: count within a connection, how many streams are receiving data
authorWilly Tarreau <w@1wt.eu>
Wed, 28 Aug 2024 09:38:32 +0000 (11:38 +0200)
committerWilly Tarreau <w@1wt.eu>
Sat, 12 Oct 2024 14:29:16 +0000 (16:29 +0200)
A stream is receiving data from after the HEADERS frame missing END_STREAM,
to the end of the stream or HREM (the presence of END_STREAM). We're now
adding a flag to the stream that indicates this state, as well as a counter
in the connection of streams currently receiving data. The purpose will be
to gauge at any instant the number of streams that might have to share the
available bandwidth and buffers count in order not to allocate too much flow
control to any single stream. For now the counter is kept up to date, and is
reported in "show fd".

include/haproxy/mux_h2-t.h
src/mux_h2.c

index ccb40b2e24825ddcc5efb0a69319095f86a84887..2b9f151be7baf1e4518306523094ffa1831bc71d 100644 (file)
@@ -130,6 +130,8 @@ static forceinline char *h2c_show_flags(char *buf, size_t len, const char *delim
 
 #define H2_SF_TUNNEL_ABRT       0x00100000  // A tunnel attempt was aborted
 #define H2_SF_MORE_HTX_DATA     0x00200000  // more data expected from HTX
+#define H2_SF_EXPECT_RXDATA     0x00400000  // more data expected from the peer
+
 
 /* This function is used to report flags in debugging tools. Please reflect
  * below any single-bit flag addition above in the same order via the
index 744dea5ea2860de267fe3f2779d4b4e472b447d2..b662222fc7aa5f5e384892d3c2b3139864cfb474 100644 (file)
@@ -56,7 +56,7 @@ struct h2c {
        uint32_t rcvd_c; /* newly received data to ACK for the connection */
        uint32_t rcvd_s; /* newly received data for the current stream (dsi) or zero */
        uint32_t wu_s;   /* amount of data to write in the next WU frame for dsi, or zero */
-       /* 32-bit hole here */
+       uint32_t receiving_streams; /* number of streams currently receiving data */
 
        /* states for the demux direction */
        struct hpack_dht *ddht; /* demux dynamic header table */
@@ -1136,6 +1136,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s
        h2c->nb_sc = 0;
        h2c->nb_reserved = 0;
        h2c->stream_cnt = 0;
+       h2c->receiving_streams = 0;
        h2c->glitches = 0;
 
        h2c->dbuf = *input;
@@ -1556,6 +1557,29 @@ static int h2_fragment_headers(struct buffer *b, uint32_t mfs)
        return 1;
 }
 
+/* marks the h2s as receiving data. This will allow to update the number of
+ * receiving streams in the connection.
+ */
+static inline void h2s_count_as_receiving(struct h2s *h2s)
+{
+       if (!(h2s->flags & H2_SF_EXPECT_RXDATA)) {
+               TRACE_STATE("counting H2 stream as receiving data", H2_EV_H2S_RECV, h2s->h2c->conn, h2s);
+               h2s->flags |= H2_SF_EXPECT_RXDATA;
+               h2s->h2c->receiving_streams++;
+       }
+}
+
+/* marks the h2s as no longer receiving data. This will allow to
+ * update the number of receiving streams in the connection.
+ */
+static inline void h2s_no_longer_receiving(struct h2s *h2s)
+{
+       if (h2s->flags & H2_SF_EXPECT_RXDATA) {
+               TRACE_STATE("counting H2 stream as not receiving data", H2_EV_H2S_RECV, h2s->h2c->conn, h2s);
+               h2s->flags &= ~H2_SF_EXPECT_RXDATA;
+               h2s->h2c->receiving_streams--;
+       }
+}
 
 /* marks stream <h2s> as CLOSED and decrement the number of active streams for
  * its connection if the stream was not yet closed. Please use this exclusively
@@ -1580,6 +1604,7 @@ static inline void h2s_close(struct h2s *h2s)
                TRACE_LEAVE(H2_EV_H2S_END, h2s->h2c->conn, h2s);
        }
        h2s->st = H2_SS_CLOSED;
+       h2s_no_longer_receiving(h2s);
 }
 
 /* Check h2c and h2s flags to evaluate if EOI/EOS/ERR_PENDING/ERROR flags must
@@ -2324,6 +2349,7 @@ static void h2s_wake_one_stream(struct h2s *h2s)
        }
 
        if (h2c_read0_pending(h2s->h2c)) {
+               h2s_no_longer_receiving(h2s);
                if (h2s->st == H2_SS_OPEN)
                        h2s->st = H2_SS_HREM;
                else if (h2s->st == H2_SS_HLOC)
@@ -3105,11 +3131,15 @@ static struct h2s *h2c_frt_handle_headers(struct h2c *h2c, struct h2s *h2s)
 
  done:
        if (h2s->flags & H2_SF_ES_RCVD) {
+               h2s_no_longer_receiving(h2s);
                if (h2s->st == H2_SS_OPEN)
                        h2s->st = H2_SS_HREM;
                else
                        h2s_close(h2s);
        }
+       else
+               h2s_count_as_receiving(h2s);
+
        TRACE_LEAVE(H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s);
        goto leave;
 
@@ -3228,11 +3258,14 @@ static struct h2s *h2c_bck_handle_headers(struct h2c *h2c, struct h2s *h2s)
        if (se_fl_test(h2s->sd, SE_FL_ERROR) && h2s->st < H2_SS_ERROR)
                h2s->st = H2_SS_ERROR;
        else if (h2s->flags & H2_SF_ES_RCVD) {
+               h2s_no_longer_receiving(h2s);
                if (h2s->st == H2_SS_OPEN)
                        h2s->st = H2_SS_HREM;
                else if (h2s->st == H2_SS_HLOC)
                        h2s_close(h2s);
        }
+       else
+               h2s_count_as_receiving(h2s);
 
        /* Unblock busy server h2s waiting for the response headers to validate
         * the tunnel establishment or the end of the response of an oborted
@@ -3357,6 +3390,7 @@ static int h2c_handle_data(struct h2c *h2c, struct h2s *h2s)
        /* last frame */
        if (h2c->dff & H2_F_DATA_END_STREAM) {
                h2s->flags |= H2_SF_ES_RCVD;
+               h2s_no_longer_receiving(h2s);
                if (h2s->st == H2_SS_OPEN)
                        h2s->st = H2_SS_HREM;
                else
@@ -7602,9 +7636,9 @@ static int h2_dump_h2c_info(struct buffer *msg, struct h2c *h2c, const char *pfx
        hmbuf = br_head(h2c->mbuf);
        tmbuf = br_tail(h2c->mbuf);
        chunk_appendf(msg, " h2c.st0=%s .err=%d .maxid=%d .lastid=%d .flg=0x%04x"
-                     " .nbst=%u .nbsc=%u, .glitches=%d",
+                     " .nbst=%u .nbsc=%u .nbrcv=%u .glitches=%d",
                      h2c_st_to_str(h2c->st0), h2c->errcode, h2c->max_id, h2c->last_sid, h2c->flags,
-                     h2c->nb_streams, h2c->nb_sc, h2c->glitches);
+                     h2c->nb_streams, h2c->nb_sc, h2c->receiving_streams, h2c->glitches);
 
        if (pfx)
                chunk_appendf(msg, "\n%s", pfx);