]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: h2: start to consider the H2_CF_{MUX,DEM}_* flags for polling
authorWilly Tarreau <w@1wt.eu>
Mon, 9 Oct 2017 13:14:19 +0000 (15:14 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 17:16:17 +0000 (18:16 +0100)
Now we start to set the flags to indicate that the response buffer is
being awaited or that it is full, it makes it possible to centralize a
little bit the polling management into the wake() callback.

In case of error, we wake all the streams up so that they are aware of
the nature of the event and are able to detach if needed.

src/mux_h2.c

index 759c6a15dae502c67f350b36139cf8cdb7208ed6..5457bc2c7eec9f7095cb51d0c60a2339fe2dab34 100644 (file)
@@ -589,7 +589,10 @@ static void h2_recv(struct connection *conn)
        int max;
 
        if (conn->flags & CO_FL_ERROR)
-               goto error;
+               return;
+
+       if (h2c->flags & H2_CF_DEM_BLOCK_ANY)
+               return;
 
        buf = h2_get_dbuf(h2c);
        if (!buf) {
@@ -600,32 +603,28 @@ static void h2_recv(struct connection *conn)
        /* note: buf->o == 0 */
        max = buf->size - buf->i;
        if (!max) {
-               /* FIXME: buffer full, add a flag, stop polling and wait */
-               __conn_xprt_stop_recv(conn);
+               h2c->flags |= H2_CF_DEM_DFULL;
                return;
        }
 
        conn->xprt->rcv_buf(conn, buf, max);
        if (conn->flags & CO_FL_ERROR)
-               goto error;
+               return;
 
-       if (!buf->i)
+       if (!buf->i) {
                h2_release_dbuf(h2c);
-
-       if (buf->i == buf->size) {
-               /* buffer now full */
-               __conn_xprt_stop_recv(conn);
                return;
        }
 
+       if (buf->i == buf->size)
+               h2c->flags |= H2_CF_DEM_DFULL;
+
        /* FIXME: should we try to process streams here instead of doing it in ->wake ? */
 
-       if (conn_xprt_read0_pending(conn))
-               __conn_xprt_stop_recv(conn);
+       /* after streams have been processed, we should have made some room */
+       if (buf->i != buf->size)
+               h2c->flags &= ~H2_CF_DEM_DFULL;
        return;
-
- error:
-       __conn_xprt_stop_recv(conn);
 }
 
 /* callback called on send event by the connection handler */
@@ -636,22 +635,16 @@ static void h2_send(struct connection *conn)
        /* FIXME: should we try to process pending streams here instead of doing it in ->wake ? */
 
        if (conn->flags & CO_FL_ERROR)
-               goto error;
+               return;
 
        if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
                /* a handshake was requested */
                return;
        }
 
-       if (!h2c->mbuf->o) {
-               /* nothing to send */
-               goto done;
-       }
-
        if (conn->flags & CO_FL_SOCK_WR_SH) {
                /* output closed, nothing to send, clear the buffer to release it */
                h2c->mbuf->o = 0;
-               goto done;
        }
 
        /* pending response data, we need to try to send or subscribe to
@@ -667,33 +660,39 @@ static void h2_send(struct connection *conn)
         * problematic for ACKs. The latter should possibly not be set
         * for now.
         */
-       conn->xprt->snd_buf(conn, h2c->mbuf, 0);
+       if (conn->xprt->snd_buf(conn, h2c->mbuf, 0) > 0)
+               h2c->flags &= ~(H2_CF_MUX_MFULL | H2_CF_DEM_MROOM);
 
        if (conn->flags & CO_FL_ERROR)
-               goto error;
-
-       if (!h2c->mbuf->o)
-               h2_release_mbuf(h2c);
-
-       if (h2c->mbuf->o) {
-               /* incomplete send, the snd_buf callback has already updated
-                * the connection flags.
-                *
-                * FIXME: we should arm a send timeout here
-                */
-               __conn_xprt_want_send(conn);
                return;
-       }
-
- done:
-       /* FIXME: release the output buffer when empty or do it in ->wake() ? */
-       __conn_xprt_stop_send(conn);
-       return;
+}
 
- error:
-       /* FIXME: report an error somewhere in the mux */
-       __conn_xprt_stop_send(conn);
-       return;
+/* call the wake up function of all streams attached to the connection */
+static void h2_wake_all_streams(struct h2c *h2c)
+{
+       struct eb32_node *node;
+       struct h2s *h2s;
+       unsigned int flags = 0;
+
+       if (h2c->st0 >= H2_CS_ERROR || h2c->conn->flags & CO_FL_ERROR)
+               flags |= CS_FL_ERROR;
+
+       if (conn_xprt_read0_pending(h2c->conn))
+               flags |= CS_FL_EOS;
+
+       node = eb32_first(&h2c->streams_by_id);
+       while (node) {
+               h2s = container_of(node, struct h2s, by_id);
+               node = eb32_next(node);
+               if (h2s->cs) {
+                       h2s->cs->flags |= flags;
+                       /* recv is used to force to detect CS_FL_EOS that wake()
+                        * doesn't handle in the stream int code.
+                        */
+                       h2s->cs->data_cb->recv(h2s->cs);
+                       h2s->cs->data_cb->wake(h2s->cs);
+               }
+       }
 }
 
 /* callback called on any event by the connection handler.
@@ -704,9 +703,48 @@ static int h2_wake(struct connection *conn)
 {
        struct h2c *h2c = conn->mux_ctx;
 
-       if ((conn->flags & CO_FL_ERROR) && eb_is_empty(&h2c->streams_by_id)) {
-               h2_release(conn);
-               return -1;
+       if (conn->flags & CO_FL_ERROR || h2c->st0 == H2_CS_ERROR2) {
+               h2_wake_all_streams(h2c);
+
+               if (eb_is_empty(&h2c->streams_by_id)) {
+                       /* no more stream, kill the connection now */
+                       h2_release(conn);
+                       return -1;
+               }
+               else {
+                       /* some streams still there, we need to signal them all and
+                        * wait for their departure.
+                        */
+                       __conn_xprt_stop_recv(conn);
+                       __conn_xprt_stop_send(conn);
+                       return 0;
+               }
+       }
+
+       if (!h2c->dbuf->i)
+               h2_release_dbuf(h2c);
+
+       /* stop being notified of incoming data if we can't process them */
+       if (h2c->st0 >= H2_CS_ERROR ||
+           (h2c->flags & H2_CF_DEM_BLOCK_ANY) || conn_xprt_read0_pending(conn)) {
+               /* FIXME: we should clear a read timeout here */
+               __conn_xprt_stop_recv(conn);
+       }
+       else {
+               /* FIXME: we should (re-)arm a read timeout here */
+               __conn_xprt_want_recv(conn);
+       }
+
+       /* adjust output polling */
+       if ((h2c->st0 == H2_CS_ERROR || h2c->mbuf->o) &&
+           !(conn->flags & CO_FL_SOCK_WR_SH)) {
+               /* FIXME: we should (re-)arm a send timeout here */
+               __conn_xprt_want_send(conn);
+       }
+       else {
+               /* FIXME: we should clear a send timeout here */
+               h2_release_mbuf(h2c);
+               __conn_xprt_stop_send(conn);
        }
 
        return 0;