]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: connections/mux: Add a recv and a send+recv wait list.
authorOlivier Houchard <ohouchard@haproxy.com>
Thu, 2 Aug 2018 17:23:05 +0000 (19:23 +0200)
committerWilly Tarreau <w@1wt.eu>
Wed, 12 Sep 2018 15:37:55 +0000 (17:37 +0200)
For struct connection, struct conn_stream, and for the h2 mux, add 2 new
lists, one that handles waiters for recv, and one that handles waiters for
recv and send. That way we can ask to subscribe for either recv or send.

include/proto/connection.h
include/types/connection.h
src/connection.c
src/mux_h2.c
src/stream_interface.c

index ea6b17b67c986ab18837b736e48b95597bc8c0b9..c7f25613c7a951b32d48b72a576e905645fd9c09 100644 (file)
@@ -602,6 +602,8 @@ static inline void cs_init(struct conn_stream *cs, struct connection *conn)
        cs->flags = CS_FL_NONE;
        LIST_INIT(&cs->wait_list.list);
        LIST_INIT(&cs->send_wait_list);
+       LIST_INIT(&cs->recv_wait_list);
+       LIST_INIT(&cs->sendrecv_wait_list);
        cs->conn = conn;
        cs->wait_list.wait_reason = 0;
 }
@@ -629,6 +631,8 @@ static inline void conn_init(struct connection *conn)
        conn->proxy_netns = NULL;
        LIST_INIT(&conn->list);
        LIST_INIT(&conn->send_wait_list);
+       LIST_INIT(&conn->recv_wait_list);
+       LIST_INIT(&conn->sendrecv_wait_list);
 }
 
 /* sets <owner> as the connection's owner */
@@ -711,8 +715,19 @@ static inline struct conn_stream *cs_new(struct connection *conn)
 /* Releases a connection previously allocated by conn_new() */
 static inline void conn_free(struct connection *conn)
 {
-       LIST_DEL(&conn->send_wait_list);
-       LIST_INIT(&conn->send_wait_list);
+       struct wait_list *sw, *sw_back;
+       list_for_each_entry_safe(sw, sw_back, &conn->recv_wait_list, list) {
+               LIST_DEL(&sw->list);
+               LIST_INIT(&sw->list);
+       }
+       list_for_each_entry_safe(sw, sw_back, &conn->send_wait_list, list) {
+               LIST_DEL(&sw->list);
+               LIST_INIT(&sw->list);
+       }
+       list_for_each_entry_safe(sw, sw_back, &conn->sendrecv_wait_list, list) {
+               LIST_DEL(&sw->list);
+               LIST_INIT(&sw->list);
+       }
        pool_free(pool_head_connection, conn);
 }
 
index 9a1ba9667ec2610fd15afa2b912fabe78239d2a5..421df3c02838f81ec59ae8e4e08db2c5ffdc3f13 100644 (file)
@@ -375,6 +375,8 @@ struct conn_stream {
        struct connection *conn;             /* xprt-level connection */
        struct wait_list wait_list;          /* We're in a wait list for send */
        struct list send_wait_list;          /* list of tasks to wake when we're ready to send */
+       struct list recv_wait_list;          /* list of tasks to wake when we're ready to recv */
+       struct list sendrecv_wait_list;      /* list of tasks to wake when we're ready to either send or recv */
        void *data;                          /* pointer to upper layer's entity (eg: stream interface) */
        const struct data_cb *data_cb;       /* data layer callbacks. Must be set before xprt->init() */
        void *ctx;                           /* mux-specific context */
@@ -406,6 +408,8 @@ struct connection {
 
        /* second cache line */
        struct list send_wait_list;   /* list of tasks to wake when we're ready to send */
+       struct list recv_wait_list;          /* list of tasks to wake when we're ready to recv */
+       struct list sendrecv_wait_list;      /* list of tasks to wake when we're ready to either send or recv */
        struct list list;             /* attach point to various connection lists (idle, ...) */
        int xprt_st;                  /* transport layer state, initialized to zero */
        int tmp_early_data;           /* 1st byte of early data, if any */
index e303f2c3b798e9136fedce13f12706d664ba4198..005e0e7410414a82cd4b147e2516cf6ad2a6f50d 100644 (file)
@@ -137,6 +137,15 @@ void conn_fd_handler(int fd)
                        sw->wait_reason &= ~SUB_CAN_SEND;
                        tasklet_wakeup(sw->task);
                }
+               while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
+                       struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
+                           struct wait_list *, list);
+                       LIST_DEL(&sw->list);
+                       LIST_INIT(&sw->list);
+                       LIST_ADDQ(&conn->recv_wait_list, &sw->list);
+                       sw->wait_reason &= ~SUB_CAN_SEND;
+                       tasklet_wakeup(sw->task);
+               }
        }
 
        /* The data transfer starts here and stops on error and handshakes. Note
@@ -334,11 +343,34 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
        struct wait_list *sw;
 
        switch (event_type) {
+       case SUB_CAN_RECV:
+               sw = param;
+               if (!(sw->wait_reason & SUB_CAN_RECV)) {
+                       sw->wait_reason |= SUB_CAN_RECV;
+                       /* If we're already subscribed for send(), move it
+                        * to the send+recv list
+                        */
+                       if (sw->wait_reason & SUB_CAN_SEND) {
+                               LIST_DEL(&sw->list);
+                               LIST_INIT(&sw->list);
+                               LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
+                       } else
+                               LIST_ADDQ(&conn->recv_wait_list, &sw->list);
+               }
+               return 0;
        case SUB_CAN_SEND:
                sw = param;
                if (!(sw->wait_reason & SUB_CAN_SEND)) {
                        sw->wait_reason |= SUB_CAN_SEND;
-                       LIST_ADDQ(&conn->send_wait_list, &sw->list);
+                       /* If we're already subscribed for recv(), move it
+                        * to the send+recv list
+                        */
+                       if (sw->wait_reason & SUB_CAN_RECV) {
+                               LIST_DEL(&sw->list);
+                               LIST_INIT(&sw->list);
+                               LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
+                       } else
+                               LIST_ADDQ(&conn->send_wait_list, &sw->list);
                }
                return 0;
        default:
index 946288d3ecc105e234ddd5070cbd6ae1a853e48c..3c873e9ed19f352dc1e0cb74a9d4c8de9e768329 100644 (file)
@@ -121,6 +121,8 @@ struct h2c {
        struct list fctl_list; /* list of streams blocked by connection's fctl */
        struct buffer_wait buf_wait; /* wait list for buffer allocations */
        struct list send_wait_list;  /* list of tasks to wake when we're ready to send */
+       struct list recv_wait_list;          /* list of tasks to wake when we're ready to recv */
+       struct list sendrecv_wait_list;      /* list of tasks to wake when we're ready to either send or recv */
        struct wait_list wait_list;  /* We're in a wait list, to send */
 };
 
@@ -406,6 +408,8 @@ static int h2c_frt_init(struct connection *conn)
                task_queue(t);
        conn_xprt_want_recv(conn);
        LIST_INIT(&h2c->send_wait_list);
+       LIST_INIT(&h2c->recv_wait_list);
+       LIST_INIT(&h2c->sendrecv_wait_list);
        LIST_INIT(&h2c->wait_list.list);
 
        /* mux->wake will be called soon to complete the operation */
@@ -2333,6 +2337,16 @@ static void h2_send(struct h2c *h2c)
                        sw->wait_reason &= ~SUB_CAN_SEND;
                        tasklet_wakeup(sw->task);
                }
+               while (!(LIST_ISEMPTY(&h2c->sendrecv_wait_list))) {
+                       struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
+                           struct wait_list *, list);
+                       LIST_DEL(&sw->list);
+                       LIST_INIT(&sw->list);
+                       LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
+                       sw->wait_reason &= ~SUB_CAN_SEND;
+                       tasklet_wakeup(sw->task);
+               }
+
 
        }
        /* We're done, no more to send */
@@ -3456,14 +3470,37 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
 {
        struct wait_list *sw;
        struct h2s *h2s = cs->ctx;
+       struct h2c *h2c = h2s->h2c;
 
        switch (event_type) {
+       case SUB_CAN_RECV:
+               sw = param;
+               if (!(sw->wait_reason & SUB_CAN_RECV)) {
+                       sw->wait_reason |= SUB_CAN_RECV;
+                       /* If we're already subscribed for send(), move it
+                        * to the send+recv list
+                        */
+                       if (sw->wait_reason & SUB_CAN_SEND) {
+                               LIST_DEL(&sw->list);
+                               LIST_INIT(&sw->list);
+                               LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
+                       } else
+                               LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
+               }
+               return 0;
        case SUB_CAN_SEND:
                sw = param;
-               if (LIST_ISEMPTY(&h2s->list) &&
-                   !(sw->wait_reason & SUB_CAN_SEND)) {
-                       LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list);
+               if (!(sw->wait_reason & SUB_CAN_SEND)) {
                        sw->wait_reason |= SUB_CAN_SEND;
+                       /* If we're already subscribed for recv(), move it
+                        * to the send+recv list
+                        */
+                       if (sw->wait_reason & SUB_CAN_RECV) {
+                               LIST_DEL(&sw->list);
+                               LIST_INIT(&sw->list);
+                               LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
+                       } else
+                               LIST_ADDQ(&h2c->send_wait_list, &sw->list);
                }
                return 0;
        default:
index 72fec21b6dfe919270d35dc9b7fdfb1f131eccd6..cfa613a3c0c157bb0c66adaf46d69770d72e8d67 100644 (file)
@@ -752,6 +752,16 @@ wake_others:
                        sw->wait_reason &= ~SUB_CAN_SEND;
                        tasklet_wakeup(sw->task);
                }
+               while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
+                       struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
+                           struct wait_list *, list);
+                       LIST_DEL(&sw->list);
+                       LIST_INIT(&sw->list);
+                       LIST_ADDQ(&cs->recv_wait_list, &sw->list);
+                       sw->wait_reason &= ~SUB_CAN_SEND;
+                       tasklet_wakeup(sw->task);
+               }
+
        }
        return NULL;
 }