]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: h2: Don't run tasks that are waiting to send if mux in full.
authorOlivier Houchard <ohouchard@haproxy.com>
Fri, 19 Oct 2018 15:24:29 +0000 (17:24 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 21 Oct 2018 04:00:13 +0000 (06:00 +0200)
We wake up all the streams waiting to send data when we have space available
in the mux buffer. Doing so means we probably wake way too many streams,
because after a few the buffer will probably be full instead. So keep a
list of all the streams that are about to send data, and if we detect that
the buffer is full, unschedule the tasks and put the streams back to the
send_list.

src/mux_h2.c

index 5255ca0a1cc1966b20fc24aaf6df7cbfb4d339be..a5ea6c3ad9558d8d9aa5fd3705abcdd48613fed4 100644 (file)
@@ -121,6 +121,7 @@ struct h2c {
        struct eb_root streams_by_id; /* all active streams by their ID */
        struct list send_list; /* list of blocked streams requesting to send */
        struct list fctl_list; /* list of streams blocked by connection's fctl */
+       struct list sending_list; /* list of h2s scheduled to send data */
        struct buffer_wait buf_wait; /* wait list for buffer allocations */
        struct wait_event wait_event;  /* To be used if we're waiting for I/Os */
 };
@@ -412,6 +413,7 @@ static int h2_init(struct connection *conn, struct proxy *prx)
        h2c->streams_by_id = EB_ROOT_UNIQUE;
        LIST_INIT(&h2c->send_list);
        LIST_INIT(&h2c->fctl_list);
+       LIST_INIT(&h2c->sending_list);
        LIST_INIT(&h2c->buf_wait.list);
        conn->mux_ctx = h2c;
 
@@ -2183,10 +2185,11 @@ static int h2_process_mux(struct h2c *h2c)
 
                h2s->flags &= ~H2_SF_BLK_ANY;
                h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
+               h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
                tasklet_wakeup(h2s->send_wait->task);
-               h2s->send_wait = NULL;
                LIST_DEL(&h2s->list);
                LIST_INIT(&h2s->list);
+               LIST_ADDQ(&h2c->sending_list, &h2s->list);
        }
 
        list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) {
@@ -2195,10 +2198,11 @@ static int h2_process_mux(struct h2c *h2c)
 
                h2s->flags &= ~H2_SF_BLK_ANY;
                h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
+               h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
                tasklet_wakeup(h2s->send_wait->task);
-               h2s->send_wait = NULL;
                LIST_DEL(&h2s->list);
                LIST_INIT(&h2s->list);
+               LIST_ADDQ(&h2c->sending_list, &h2s->list);
        }
 
  fail:
@@ -2334,9 +2338,10 @@ static int h2_send(struct h2c *h2c)
                            struct h2s *, list);
                        LIST_DEL(&h2s->list);
                        LIST_INIT(&h2s->list);
+                       LIST_ADDQ(&h2c->sending_list, &h2s->list);
                        h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
+                       h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
                        tasklet_wakeup(h2s->send_wait->task);
-                       h2s->send_wait = NULL;
                }
        }
        /* We're done, no more to send */
@@ -3521,6 +3526,13 @@ static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param)
                        h2s->send_wait = NULL;
                }
        }
+       if (event_type & SUB_CALL_UNSUBSCRIBE) {
+               sw = param;
+               if (h2s->send_wait == sw) {
+                       sw->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
+                       h2s->send_wait = NULL;
+               }
+       }
        return 0;
 }
 
@@ -3549,6 +3561,23 @@ static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
        return ret;
 }
 
+static void h2_stop_senders(struct h2c *h2c)
+{
+       struct h2s *h2s, *h2s_back;
+
+       list_for_each_entry_safe(h2s, h2s_back, &h2c->sending_list, list) {
+               /* Don't unschedule the stream if the mux is just busy waiting for more data fro mthat stream */
+               if (h2c->msi == h2s_id(h2s))
+                       continue;
+               LIST_DEL(&h2s->list);
+               LIST_INIT(&h2s->list);
+               task_remove_from_task_list((struct task *)h2s->send_wait->task);
+               h2s->send_wait->wait_reason |= SUB_CAN_SEND;
+               h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
+               LIST_ADD(&h2c->send_list, &h2s->list);
+       }
+}
+
 /* Called from the upper layer, to send data */
 static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
 {
@@ -3556,6 +3585,12 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
        size_t total = 0;
        size_t ret;
 
+       if (h2s->send_wait) {
+               h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
+               h2s->send_wait = NULL;
+               LIST_DEL(&h2s->list);
+               LIST_INIT(&h2s->list);
+       }
        if (h2s->h2c->st0 < H2_CS_FRAME_H)
                return 0;
 
@@ -3615,9 +3650,15 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
        }
 
        b_del(buf, total);
+
+       /* The mux is full, cancel the pending tasks */
+       if ((h2s->h2c->flags & H2_CF_MUX_BLOCK_ANY) ||
+           (h2s->flags & H2_SF_BLK_MBUSY))
+               h2_stop_senders(h2s->h2c);
        if (total > 0) {
                if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND))
                        tasklet_wakeup(h2s->h2c->wait_event.task);
+
        }
        return total;
 }