]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-h1: Revamp the way subscriptions are handled.
authorOlivier Houchard <ohouchard@haproxy.com>
Mon, 3 Dec 2018 17:46:09 +0000 (18:46 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 4 Dec 2018 15:43:30 +0000 (16:43 +0100)
Don't always wake the tasklets subscribed to recv or send events as soon as
we had any I/O event, and don't call the wake() method if there were no
subscription, instead, wake the recv tasklet if we received data in h2_recv(),
and wake the send tasklet if we were able to send data in h2_send(), and the
buffer is not full anymore.
Only call the data_cb->wake() method if we get an error/a read 0, just in
case the stream was not subscribed to receive events.

src/mux_h1.c

index ff54adc561fc0ba3992b08ccda69eb0a1155de37..6b00320bdea43257e90eeacdfd50582bb2a095d1 100644 (file)
@@ -1463,14 +1463,16 @@ static int h1_recv(struct h1c *h1c)
 {
        struct connection *conn = h1c->conn;
                struct h1s *h1s = h1c->h1s;
-       size_t ret, max;
+       size_t ret = 0, max;
        int rcvd = 0;
 
        if (h1c->wait_event.wait_reason & SUB_CAN_RECV)
                return 0;
 
-       if (!h1_recv_allowed(h1c))
+       if (!h1_recv_allowed(h1c)) {
+               rcvd = 1;
                goto end;
+       }
 
        if (h1s && (h1s->flags & (H1S_F_BUF_FLUSH|H1S_F_SPLICED_DATA))) {
                rcvd = 1;
@@ -1482,7 +1484,6 @@ static int h1_recv(struct h1c *h1c)
                goto end;
        }
 
-       ret = 0;
        max = b_room(&h1c->ibuf);
        if (max) {
                h1c->flags &= ~H1C_F_IN_FULL;
@@ -1503,6 +1504,13 @@ static int h1_recv(struct h1c *h1c)
                rcvd = 1;
 
   end:
+       if ((ret > 0 || (conn->flags & CO_FL_ERROR) ||
+           conn_xprt_read0_pending(conn)) && h1s && h1s->recv_wait) {
+               h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
+               tasklet_wakeup(h1s->recv_wait->task);
+               h1s->recv_wait = NULL;
+
+       }
        if (!b_data(&h1c->ibuf))
                h1_release_buf(h1c, &h1c->ibuf);
        else if (b_full(&h1c->ibuf))
@@ -1544,6 +1552,13 @@ static int h1_send(struct h1c *h1c)
        }
 
   end:
+       if (!(h1c->flags & H1C_F_OUT_FULL) && h1c->h1s && h1c->h1s->send_wait) {
+               struct h1s *h1s = h1c->h1s;
+
+               h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
+               tasklet_wakeup(h1s->send_wait->task);
+               h1s->send_wait = NULL;
+       }
        /* We're done, no more to send */
        if (!b_data(&h1c->obuf)) {
                h1_release_buf(h1c, &h1c->obuf);
@@ -1558,38 +1573,6 @@ static int h1_send(struct h1c *h1c)
 }
 
 
-static void h1_wake_stream(struct h1c *h1c)
-{
-       struct connection *conn = h1c->conn;
-       struct h1s *h1s = h1c->h1s;
-       uint32_t flags = 0;
-       int dont_wake = 0;
-
-       if (!h1s || !h1s->cs)
-               return;
-
-       if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR))
-               flags |= CS_FL_ERROR;
-       if (conn_xprt_read0_pending(conn))
-               flags |= CS_FL_REOS;
-
-       h1s->cs->flags |= flags;
-       if (h1s->recv_wait) {
-               h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
-               tasklet_wakeup(h1s->recv_wait->task);
-               h1s->recv_wait = NULL;
-               dont_wake = 1;
-       }
-       if (h1s->send_wait) {
-               h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
-               tasklet_wakeup(h1s->send_wait->task);
-               h1s->send_wait = NULL;
-               dont_wake = 1;
-       }
-       if (!dont_wake && h1s->cs->data_cb->wake)
-               h1s->cs->data_cb->wake(h1s->cs);
-}
-
 /* callback called on any event by the connection handler.
  * It applies changes and returns zero, or < 0 if it wants immediate
  * destruction of the connection.
@@ -1623,7 +1606,18 @@ static int h1_process(struct h1c * h1c)
        if (b_data(&h1c->ibuf) && h1s->csinfo.t_idle == -1)
                h1s->csinfo.t_idle = tv_ms_elapsed(&h1s->csinfo.tv_create, &now) - h1s->csinfo.t_handshake;
 
-       h1_wake_stream(h1c);
+       if (!b_data(&h1c->ibuf) && h1s && h1s->cs && h1s->cs->data_cb->wake &&
+           (conn_xprt_read0_pending(conn) || h1c->flags & H1C_F_CS_ERROR ||
+           conn->flags & CO_FL_ERROR)) {
+               int flags = 0;
+
+               if (h1c->flags & H1C_F_CS_ERROR || conn->flags & CO_FL_ERROR)
+                       flags |= CS_FL_ERROR;
+               if (conn_xprt_read0_pending(conn))
+                       flags |= CS_FL_REOS;
+               h1s->cs->flags |= flags;
+               h1s->cs->data_cb->wake(h1s->cs);
+       }
   end:
        return 0;
 
@@ -1650,9 +1644,17 @@ static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status)
 static int h1_wake(struct connection *conn)
 {
        struct h1c *h1c = conn->mux_ctx;
+       int ret;
 
        h1_send(h1c);
-       return h1_process(h1c);
+       ret = h1_process(h1c);
+       if (ret == 0) {
+               struct h1s *h1s = h1c->h1s;
+
+               if (h1s && h1s->cs && h1s->cs->data_cb->wake)
+                       ret = h1s->cs->data_cb->wake(h1s->cs);
+       }
+       return ret;
 }
 
 /*******************************************/
@@ -1921,10 +1923,6 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
         */
        if (!b_data(buf))
                total = count;
-       else if (total != count) {
-               if (!(h1c->wait_event.wait_reason & SUB_CAN_SEND))
-                       cs->conn->xprt->subscribe(cs->conn, SUB_CAN_SEND, &h1c->wait_event);
-       }
        return total;
 }