]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream_interfaces: Starts receiving from the upper layers.
authorOlivier Houchard <ohouchard@haproxy.com>
Tue, 11 Sep 2018 16:27:21 +0000 (18:27 +0200)
committerWilly Tarreau <w@1wt.eu>
Wed, 12 Sep 2018 15:37:55 +0000 (17:37 +0200)
Instead of waiting for the connection layer to let us know we can read,
attempt to receive as soon as process_stream() is called, and subscribe
to receive events if we can't receive yet.

Now, except for idle connections, the recv(), send() and wake() methods are
no more, all the lower layers do is waking tasklet for anybody waiting
for I/O events.

src/mux_h2.c
src/stream.c
src/stream_interface.c

index 64cd59682411cd78b81eaf1c962f829024f380f9..a0fadcd53ae26247561852d340738d3180a1e568 100644 (file)
@@ -488,6 +488,8 @@ static void h2_release(struct connection *conn)
                }
                if (h2c->wait_list.task)
                        tasklet_free(h2c->wait_list.task);
+               LIST_DEL(&h2c->wait_list.list);
+               LIST_INIT(&h2c->wait_list.list);
 
                pool_free(pool_head_h2c, h2c);
        }
@@ -652,6 +654,8 @@ static void h2s_destroy(struct h2s *h2s)
                b_free(&h2s->rxbuf);
                offer_buffers(NULL, tasks_run_queue);
        }
+       LIST_DEL(&h2s->wait_list.list);
+       LIST_INIT(&h2s->wait_list.list);
        tasklet_free(h2s->wait_list.task);
        pool_free(pool_head_h2s, h2s);
 }
@@ -1112,7 +1116,12 @@ static void h2_wake_some_streams(struct h2c *h2c, int last, uint32_t flags)
                }
 
                h2s->cs->flags |= flags;
-               h2s->cs->data_cb->wake(h2s->cs);
+               if (h2s->recv_wait_list) {
+                       struct wait_list *sw = h2s->recv_wait_list;
+                       sw->wait_reason &= ~SUB_CAN_RECV;
+                       tasklet_wakeup(sw->task);
+                       h2s->recv_wait_list = NULL;
+               }
 
                if (flags & CS_FL_ERROR && h2s->st < H2_SS_ERROR)
                        h2s->st = H2_SS_ERROR;
@@ -1584,7 +1593,13 @@ static int h2c_handle_rst_stream(struct h2c *h2c, struct h2s *h2s)
 
        if (h2s->cs) {
                h2s->cs->flags |= CS_FL_REOS | CS_FL_ERROR;
-               h2s->cs->data_cb->wake(h2s->cs);
+               if (h2s->recv_wait_list) {
+                       struct wait_list *sw = h2s->recv_wait_list;
+
+                       sw->wait_reason &= ~SUB_CAN_RECV;
+                       tasklet_wakeup(sw->task);
+                       h2s->recv_wait_list = NULL;
+               }
        }
 
        h2s->flags |= H2_SF_RST_RCVD;
@@ -1869,12 +1884,11 @@ static void h2_process_demux(struct h2c *h2c)
                if (tmp_h2s != h2s && h2s && h2s->cs && b_data(&h2s->rxbuf)) {
                        /* we may have to signal the upper layers */
                        h2s->cs->flags |= CS_FL_RCV_MORE;
-                       if (h2s->cs->data_cb->wake(h2s->cs) < 0) {
-                               /* cs has just been destroyed, we have to kill h2s. */
-                               h2s_error(h2s, H2_ERR_STREAM_CLOSED);
-                               goto strm_err;
+                       if (h2s->recv_wait_list) {
+                               h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV;
+                               tasklet_wakeup(h2s->recv_wait_list->task);
+                               h2s->recv_wait_list = NULL;
                        }
-
                        if (h2c->st0 >= H2_CS_ERROR)
                                goto strm_err;
 
@@ -2114,10 +2128,10 @@ static void h2_process_demux(struct h2c *h2c)
        if (h2s && h2s->cs && b_data(&h2s->rxbuf)) {
                /* we may have to signal the upper layers */
                h2s->cs->flags |= CS_FL_RCV_MORE;
-               if (h2s->cs->data_cb->wake(h2s->cs) < 0) {
-                       /* cs has just been destroyed, we have to kill h2s. */
-                       h2s_error(h2s, H2_ERR_STREAM_CLOSED);
-                       h2c_send_rst_stream(h2c, h2s);
+               if (h2s->recv_wait_list) {
+                               h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV;
+                               tasklet_wakeup(h2s->recv_wait_list->task);
+                               h2s->recv_wait_list = NULL;
                }
        }
        return;
@@ -2393,8 +2407,13 @@ static int h2_process(struct h2c *h2c)
 
                while (node) {
                        h2s = container_of(node, struct h2s, by_id);
-                       if (h2s->cs->flags & CS_FL_WAIT_FOR_HS)
-                               h2s->cs->data_cb->wake(h2s->cs);
+                       if ((h2s->cs->flags & CS_FL_WAIT_FOR_HS) &&
+                           h2s->recv_wait_list) {
+                               struct wait_list *sw = h2s->recv_wait_list;
+                               sw->wait_reason &= ~SUB_CAN_RECV;
+                               tasklet_wakeup(sw->task);
+                               h2s->recv_wait_list = NULL;
+                       }
                        node = eb32_next(node);
                }
        }
index 667eb6a4c15bcaf3af886e54838d428f116260ee..cc307453aaadbfed4757649af6659d3435c7dc3c 100644 (file)
@@ -1655,6 +1655,10 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
        si_f = &s->si[0];
        si_b = &s->si[1];
 
+       /* First, attempd to do I/Os */
+       si_cs_io_cb(NULL, si_f, 0);
+       si_cs_io_cb(NULL, si_b, 0);
+
        //DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
        //        si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
 
@@ -2484,6 +2488,9 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 #endif
                s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
                stream_release_buffers(s);
+               /* We may have free'd some space in buffers, or have more to send/recv, try again */
+               si_cs_io_cb(NULL, si_f, 0);
+               si_cs_io_cb(NULL, si_b, 0);
                return t; /* nothing more to do */
        }
 
index e5ddee68f19611b6457d165e0825c4effefa431e..13f7aa309299e4f5b2057dff7c2d8717c08c5af7 100644 (file)
@@ -52,7 +52,7 @@ static void stream_int_shutw_applet(struct stream_interface *si);
 static void stream_int_chk_rcv_applet(struct stream_interface *si);
 static void stream_int_chk_snd_applet(struct stream_interface *si);
 static int si_cs_recv(struct conn_stream *cs);
-static int si_cs_wake_cb(struct conn_stream *cs);
+static int si_cs_process(struct conn_stream *cs);
 static int si_idle_conn_wake_cb(struct conn_stream *cs);
 static int si_cs_send(struct conn_stream *cs);
 
@@ -83,7 +83,6 @@ struct si_ops si_applet_ops = {
 };
 
 struct data_cb si_conn_cb = {
-       .wake    = si_cs_wake_cb,
        .name    = "STRM",
 };
 
@@ -554,27 +553,19 @@ void stream_int_notify(struct stream_interface *si)
 }
 
 
-/* Callback to be used by connection I/O handlers upon completion. It propagates
+/* Called by I/O handlers after completion.. It propagates
  * connection flags to the stream interface, updates the stream (which may or
  * may not take this opportunity to try to forward data), then update the
  * connection's polling based on the channels and stream interface's final
  * states. The function always returns 0.
  */
-static int si_cs_wake_cb(struct conn_stream *cs)
+static int si_cs_process(struct conn_stream *cs)
 {
        struct connection *conn = cs->conn;
        struct stream_interface *si = cs->data;
        struct channel *ic = si_ic(si);
        struct channel *oc = si_oc(si);
 
-       /* if the CS's input buffer already has data available, let's try to
-        * receive now. The new muxes do this. The CS_FL_REOS is another cause
-        * for recv() (received only an empty response).
-        */
-       if (!(cs->flags & CS_FL_EOS) &&
-           (cs->flags & (CS_FL_DATA_RD_ENA)))
-               si_cs_recv(cs);
-
        /* If we have data to send, try it now */
        if (!channel_is_empty(oc) && objt_cs(si->end))
                si_cs_send(objt_cs(si->end));
@@ -644,7 +635,7 @@ static int si_cs_send(struct conn_stream *cs)
                return 0;
 
        if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-               return 0;
+               return 1;
 
        if (conn->flags & CO_FL_HANDSHAKE) {
                /* a handshake was requested */
@@ -655,7 +646,7 @@ static int si_cs_send(struct conn_stream *cs)
 
        /* we might have been called just after an asynchronous shutw */
        if (si_oc(si)->flags & CF_SHUTW)
-               return 0;
+               return 1;
 
        /* ensure it's only set if a write attempt has succeeded */
        oc->flags &= ~CF_WRITE_PARTIAL;
@@ -728,8 +719,10 @@ static int si_cs_send(struct conn_stream *cs)
                }
        }
        /* We couldn't send all of our data, let the mux know we'd like to send more */
-       if (co_data(oc))
+       if (co_data(oc)) {
+               cs_want_send(cs);
                conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list);
+       }
 
 wake_others:
        /* Maybe somebody was waiting for this conn_stream, wake them */
@@ -764,12 +757,13 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
 
        if (!cs)
                return NULL;
+redo:
        if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
                ret = si_cs_send(cs);
        if (!(si->wait_list.wait_reason & SUB_CAN_RECV))
                ret |= si_cs_recv(cs);
        if (ret != 0)
-               si_cs_wake_cb(cs);
+               si_cs_process(cs);
 
        return (NULL);
 }
@@ -1015,8 +1009,9 @@ static void stream_int_chk_rcv_conn(struct stream_interface *si)
 
        if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
                /* stop reading */
-               if (!(ic->flags & CF_DONT_READ)) /* full */
+               if (!(ic->flags & CF_DONT_READ)) /* full */ {
                        si->flags |= SI_FL_WAIT_ROOM;
+               }
                __cs_stop_recv(cs);
        }
        else {
@@ -1157,7 +1152,7 @@ static int si_cs_recv(struct conn_stream *cs)
         * which rejects it before reading it all.
         */
        if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-               return 0;
+               return 1; // We want to make sure si_cs_wake() is called, so that process_strema is woken up, on failure
 
        /* If another call to si_cs_recv() failed, and we subscribed to
         * recv events already, give up now.
@@ -1167,7 +1162,7 @@ static int si_cs_recv(struct conn_stream *cs)
 
        /* maybe we were called immediately after an asynchronous shutr */
        if (ic->flags & CF_SHUTR)
-               return 0;
+               return 1;
 
        /* stop here if we reached the end of data */
        if (cs->flags & CS_FL_EOS)
@@ -1226,7 +1221,7 @@ static int si_cs_recv(struct conn_stream *cs)
                        goto out_shutdown_r;
 
                if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-                       return cur_read != 0;
+                       return 1;
 
                if (conn->flags & CO_FL_WAIT_ROOM) {
                        /* the pipe is full or we have read enough data that it
@@ -1385,7 +1380,7 @@ static int si_cs_recv(struct conn_stream *cs)
 
  end_recv:
        if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-               return cur_read != 0;
+               return 1;
 
        if (cs->flags & CS_FL_EOS)
                /* connection closed */
@@ -1402,7 +1397,7 @@ static int si_cs_recv(struct conn_stream *cs)
        if (ic->flags & CF_AUTO_CLOSE)
                channel_shutw_now(ic);
        stream_sock_read0(si);
-       return cur_read != 0;
+       return 1;
 }
 
 /*