]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream_interface: Make recv() subscribe when more data is needed.
authorOlivier Houchard <ohouchard@haproxy.com>
Fri, 31 Aug 2018 15:29:12 +0000 (17:29 +0200)
committerWilly Tarreau <w@1wt.eu>
Wed, 12 Sep 2018 15:37:55 +0000 (17:37 +0200)
Refactor the code so that si_cs_recv() subscribes to receive events.

src/stream_interface.c

index 46e57d453a367b8d7c104e8b4686e0b34d3d9c02..e5ddee68f19611b6457d165e0825c4effefa431e 100644 (file)
@@ -51,10 +51,10 @@ static void stream_int_shutr_applet(struct stream_interface *si);
 static void stream_int_shutw_applet(struct stream_interface *si);
 static void stream_int_chk_rcv_applet(struct stream_interface *si);
 static void stream_int_chk_snd_applet(struct stream_interface *si);
-static void si_cs_recv(struct conn_stream *cs);
+static int si_cs_recv(struct conn_stream *cs);
 static int si_cs_wake_cb(struct conn_stream *cs);
 static int si_idle_conn_wake_cb(struct conn_stream *cs);
-static struct task * si_cs_send(struct conn_stream *cs);
+static int si_cs_send(struct conn_stream *cs);
 
 /* stream-interface operations for embedded tasks */
 struct si_ops si_embedded_ops = {
@@ -631,7 +631,7 @@ static int si_cs_wake_cb(struct conn_stream *cs)
  * caller to commit polling changes. The caller should check conn->flags
  * for errors.
  */
-static struct task * si_cs_send(struct conn_stream *cs)
+static int si_cs_send(struct conn_stream *cs)
 {
        struct connection *conn = cs->conn;
        struct stream_interface *si = cs->data;
@@ -641,21 +641,21 @@ static struct task * si_cs_send(struct conn_stream *cs)
 
        /* We're already waiting to be able to send, give up */
        if (si->wait_list.wait_reason & SUB_CAN_SEND)
-               return NULL;
+               return 0;
 
        if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-               return NULL;
+               return 0;
 
        if (conn->flags & CO_FL_HANDSHAKE) {
                /* a handshake was requested */
                /* Schedule ourself to be woken up once the handshake is done */
                conn->xprt->subscribe(conn, SUB_CAN_SEND,  &si->wait_list);
-               return NULL;
+               return 0;
        }
 
        /* we might have been called just after an asynchronous shutw */
        if (si_oc(si)->flags & CF_SHUTW)
-               return NULL;
+               return 0;
 
        /* ensure it's only set if a write attempt has succeeded */
        oc->flags &= ~CF_WRITE_PARTIAL;
@@ -673,7 +673,7 @@ static struct task * si_cs_send(struct conn_stream *cs)
                }
 
                if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-                       return NULL;
+                       return 0;
        }
 
        /* At this point, the pipe is empty, but we may still have data pending
@@ -753,20 +753,24 @@ wake_others:
                }
 
        }
-       return NULL;
+       return did_send;
 }
 
 struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
 {
        struct stream_interface *si = ctx;
        struct conn_stream *cs = objt_cs(si->end);
+       int ret = 0;
 
        if (!cs)
                return NULL;
-       if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) {
-               si_cs_send(cs);
+       if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
+               ret = si_cs_send(cs);
+       if (!(si->wait_list.wait_reason & SUB_CAN_RECV))
+               ret |= si_cs_recv(cs);
+       if (ret != 0)
                si_cs_wake_cb(cs);
-       }
+
        return (NULL);
 }
 
@@ -1138,12 +1142,12 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
  * into the buffer from the connection. It iterates over the mux layer's
  * rcv_buf function.
  */
-static void si_cs_recv(struct conn_stream *cs)
+static int si_cs_recv(struct conn_stream *cs)
 {
        struct connection *conn = cs->conn;
        struct stream_interface *si = cs->data;
        struct channel *ic = si_ic(si);
-       int ret, max, cur_read;
+       int ret, max, cur_read = 0;
        int read_poll = MAX_READ_POLL_LOOPS;
 
        /* stop immediately on errors. Note that we DON'T want to stop on
@@ -1153,18 +1157,22 @@ static void si_cs_recv(struct conn_stream *cs)
         * which rejects it before reading it all.
         */
        if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-               return;
+               return 0;
+
+       /* If another call to si_cs_recv() failed, and we subscribed to
+        * recv events already, give up now.
+        */
+       if (si->wait_list.wait_reason & SUB_CAN_RECV)
+               return 0;
 
        /* maybe we were called immediately after an asynchronous shutr */
        if (ic->flags & CF_SHUTR)
-               return;
+               return 0;
 
        /* stop here if we reached the end of data */
        if (cs->flags & CS_FL_EOS)
                goto out_shutdown_r;
 
-       cur_read = 0;
-
        if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
            global.tune.idle_timer &&
            (unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
@@ -1218,7 +1226,7 @@ static void si_cs_recv(struct conn_stream *cs)
                        goto out_shutdown_r;
 
                if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-                       return;
+                       return cur_read != 0;
 
                if (conn->flags & CO_FL_WAIT_ROOM) {
                        /* the pipe is full or we have read enough data that it
@@ -1377,13 +1385,16 @@ static void si_cs_recv(struct conn_stream *cs)
 
  end_recv:
        if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-               return;
+               return cur_read != 0;
 
        if (cs->flags & CS_FL_EOS)
                /* connection closed */
                goto out_shutdown_r;
 
-       return;
+       /* Subscribe to receive events */
+       conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_list);
+
+       return cur_read != 0;
 
  out_shutdown_r:
        /* we received a shutdown */
@@ -1391,7 +1402,7 @@ static void si_cs_recv(struct conn_stream *cs)
        if (ic->flags & CF_AUTO_CLOSE)
                channel_shutw_now(ic);
        stream_sock_read0(si);
-       return;
+       return cur_read != 0;
 }
 
 /*