]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: connections/mux: Add a new "subscribe" method.
authorOlivier Houchard <ohouchard@haproxy.com>
Tue, 17 Jul 2018 16:46:31 +0000 (18:46 +0200)
committerWilly Tarreau <w@1wt.eu>
Thu, 19 Jul 2018 14:23:43 +0000 (16:23 +0200)
Add a new "subscribe" method for connection, conn_stream and mux, so that
upper layer can subscribe to them, to be called when the event happens.
Right now, the only event implemented is "SUB_CAN_SEND", where the upper
layer can register to be called back when it is possible to send data.

The connection and conn_stream got a new "send_wait_list" entry, which
required to move a few struct members around to maintain an efficient
cache alignment (and actually this slightly improved performance).

include/proto/connection.h
include/types/connection.h
src/connection.c
src/mux_h2.c
src/mux_pt.c
src/raw_sock.c
src/ssl_sock.c

index 8566736fd6a330e92631e3f9064ab701e70866fb..16103b7db5cc669fcf02034ef1178377c733a4e8 100644 (file)
@@ -29,6 +29,7 @@
 #include <types/listener.h>
 #include <proto/fd.h>
 #include <proto/obj_type.h>
+#include <proto/task.h>
 
 extern struct pool_head *pool_head_connection;
 extern struct pool_head *pool_head_connstream;
@@ -49,6 +50,7 @@ int make_proxy_line(char *buf, int buf_len, struct server *srv, struct connectio
 int make_proxy_line_v1(char *buf, int buf_len, struct sockaddr_storage *src, struct sockaddr_storage *dst);
 int make_proxy_line_v2(char *buf, int buf_len, struct server *srv, struct connection *remote);
 
+int conn_subscribe(struct connection *conn, int event_type, void *param);
 /* receive a NetScaler Client IP insertion header over a connection */
 int conn_recv_netscaler_cip(struct connection *conn, int flag);
 
@@ -596,6 +598,7 @@ static inline void cs_init(struct conn_stream *cs, struct connection *conn)
 {
        cs->obj_type = OBJ_TYPE_CS;
        cs->flags = CS_FL_NONE;
+       LIST_INIT(&cs->send_wait_list);
        cs->conn = conn;
 }
 
@@ -621,6 +624,7 @@ static inline void conn_init(struct connection *conn)
        conn->destroy_cb = NULL;
        conn->proxy_netns = NULL;
        LIST_INIT(&conn->list);
+       LIST_INIT(&conn->send_wait_list);
 }
 
 /* sets <owner> as the connection's owner */
index ad406acd34a8228d8c68eb41dc595c0313365105..de0c32a22423e09ba00e3d2f8c46e64cd6289313 100644 (file)
@@ -44,6 +44,10 @@ struct buffer;
 struct server;
 struct pipe;
 
+struct wait_list {
+       struct tasklet *task;
+       struct list list;
+};
 
 /* A connection handle is how we differenciate two connections on the lower
  * layers. It usually is a file descriptor but can be a connection id.
@@ -85,6 +89,9 @@ enum cs_shw_mode {
        CS_SHW_SILENT       = 1,           /* imminent close, don't notify peer */
 };
 
+enum sub_event_type {
+       SUB_CAN_SEND        = 0x00000001,  /* Schedule the tasklet when we can send more */
+};
 /* For each direction, we have a CO_FL_{SOCK,DATA}_<DIR>_ENA flag, which
  * indicates if read or write is desired in that direction for the respective
  * layers. The current status corresponding to the current layer being used is
@@ -287,6 +294,7 @@ struct xprt_ops {
        void (*destroy_srv)(struct server *srv);    /* destroy a server context */
        int  (*get_alpn)(const struct connection *conn, const char **str, int *len); /* get application layer name */
        char name[8];                               /* transport layer name, zero-terminated */
+       int (*subscribe)(struct connection *conn, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
 };
 
 /* mux_ops describes the mux operations, which are to be performed at the
@@ -312,6 +320,7 @@ struct mux_ops {
        struct conn_stream *(*attach)(struct connection *); /* Create and attach a conn_stream to an outgoing connection */
        void (*detach)(struct conn_stream *); /* Detach a conn_stream from an outgoing connection, when the request is done */
        void (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd" */
+       int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
        unsigned int flags;                           /* some flags characterizing the mux's capabilities (MX_FL_*) */
        char name[8];                                 /* mux layer name, zero-terminated */
 };
@@ -327,6 +336,7 @@ struct data_cb {
        void (*recv)(struct conn_stream *cs);  /* data-layer recv callback */
        void (*send)(struct conn_stream *cs);  /* data-layer send callback */
        int  (*wake)(struct conn_stream *cs);  /* data-layer callback to report activity */
+       int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
        char name[8];                           /* data layer name, zero-terminated */
 };
 
@@ -358,8 +368,9 @@ struct conn_src {
  */
 struct conn_stream {
        enum obj_type obj_type;              /* differentiates connection from applet context */
+       unsigned int flags;                  /* CS_FL_* */
        struct connection *conn;             /* xprt-level connection */
-       unsigned int flags;                    /* CS_FL_* */
+       struct list send_wait_list;          /* list of tasks to wake when we're ready to send */
        void *data;                          /* pointer to upper layer's entity (eg: stream interface) */
        const struct data_cb *data_cb;       /* data layer callbacks. Must be set before xprt->init() */
        void *ctx;                           /* mux-specific context */
@@ -376,6 +387,7 @@ struct conn_stream {
  * connection being instanciated. It must be removed once done.
  */
 struct connection {
+       /* first cache line */
        enum obj_type obj_type;       /* differentiates connection from applet context */
        unsigned char err_code;       /* CO_ER_* */
        signed short send_proxy_ofs;  /* <0 = offset to (re)send from the end, >0 = send all */
@@ -386,15 +398,20 @@ struct connection {
        void *xprt_ctx;               /* general purpose pointer, initialized to NULL */
        void *mux_ctx;                /* mux-specific context, initialized to NULL */
        void *owner;                  /* pointer to the owner session for incoming connections, or NULL */
+       enum obj_type *target;        /* the target to connect to (server, proxy, applet, ...) */
+
+       /* second cache line */
+       struct list send_wait_list;   /* list of tasks to wake when we're ready to send */
+       struct list list;             /* attach point to various connection lists (idle, ...) */
        int xprt_st;                  /* transport layer state, initialized to zero */
        int tmp_early_data;           /* 1st byte of early data, if any */
        int sent_early_data;          /* Amount of early data we sent so far */
        union conn_handle handle;     /* connection handle at the socket layer */
-       enum obj_type *target;        /* the target to connect to (server, proxy, applet, ...) */
-       struct list list;             /* attach point to various connection lists (idle, ...) */
+       const struct netns_entry *proxy_netns;
        int (*xprt_done_cb)(struct connection *conn);  /* callback to notify of end of handshake */
+
+       /* third cache line and beyond */
        void (*destroy_cb)(struct connection *conn);  /* callback to notify of imminent death of the connection */
-       const struct netns_entry *proxy_netns;
        struct {
                struct sockaddr_storage from;   /* client address, or address to spoof when connecting to the server */
                struct sockaddr_storage to;     /* address reached by the client, or address to connect to */
index db869fb0f0e020da67d5c8fa9f0f2887745d3a1e..94e7209b3999a51d305b33f526c5430a2a99b46b 100644 (file)
@@ -128,6 +128,13 @@ void conn_fd_handler(int fd)
                 */
                flags = 0;
                conn->mux->send(conn);
+               while (!LIST_ISEMPTY(&conn->send_wait_list)) {
+                       struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
+                           struct wait_list *, list);
+                       LIST_DEL(&sw->list);
+                       LIST_INIT(&sw->list);
+                       tasklet_wakeup(sw->task);
+               }
        }
 
        /* The data transfer starts here and stops on error and handshakes. Note
@@ -323,6 +330,22 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags)
        return ret;
 }
 
+int conn_subscribe(struct connection *conn, int event_type, void *param)
+{
+       struct wait_list *sw;
+
+       switch (event_type) {
+       case SUB_CAN_SEND:
+               sw = param;
+               if (LIST_ISEMPTY(&sw->list))
+                       LIST_ADDQ(&conn->send_wait_list, &sw->list);
+               return 0;
+       default:
+               break;
+       }
+       return (-1);
+}
+
 /* Drains possibly pending incoming data on the file descriptor attached to the
  * connection and update the connection's flags accordingly. This is used to
  * know whether we need to disable lingering on close. Returns non-zero if it
index e25208360ae7c4cda6212df9c05e2ea1bc6526ad..ba6bd8d8ac2139c0723fba0023ee62f930041714 100644 (file)
@@ -120,6 +120,7 @@ struct h2c {
        struct list send_list; /* list of blocked streams requesting to send */
        struct list fctl_list; /* list of streams blocked by connection's fctl */
        struct buffer_wait buf_wait; /* wait list for buffer allocations */
+       struct list send_wait_list;  /* list of tasks to wake when we're ready to send */
 };
 
 /* H2 stream state, in h2s->st */
@@ -379,6 +380,7 @@ static int h2c_frt_init(struct connection *conn)
        if (t)
                task_queue(t);
        conn_xprt_want_recv(conn);
+       LIST_INIT(&h2c->send_wait_list);
 
        /* mux->wake will be called soon to complete the operation */
        return 0;
@@ -2228,6 +2230,19 @@ static void h2_send(struct connection *conn)
                /* output closed, nothing to send, clear the buffer to release it */
                b_reset(&h2c->mbuf);
        }
+       /* We're not full anymore, so we can wake any task that are waiting
+        * for us.
+        */
+       if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) {
+               while (!LIST_ISEMPTY(&h2c->send_wait_list)) {
+                       struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
+                           struct wait_list *, list);
+                       LIST_DEL(&sw->list);
+                       LIST_INIT(&sw->list);
+                       tasklet_wakeup(sw->task);
+               }
+
+       }
 }
 
 /* callback called on any event by the connection handler.
@@ -3369,6 +3384,26 @@ static size_t h2s_frt_make_resp_data(struct h2s *h2s, const struct buffer *buf,
        return total;
 }
 
+/* Called from the upper layer, to subscribe to events, such as being able to send */
+static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
+{
+       struct wait_list *sw;
+       struct h2s *h2s = cs->ctx;
+
+       switch (event_type) {
+       case SUB_CAN_SEND:
+               sw = param;
+               if (LIST_ISEMPTY(&h2s->list) && LIST_ISEMPTY(&sw->list))
+                       LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list);
+               return 0;
+       default:
+               break;
+       }
+       return -1;
+
+
+}
+
 /* Called from the upper layer, to send data */
 static size_t h2_snd_buf(struct conn_stream *cs, const struct buffer *buf, size_t count, int flags)
 {
@@ -3545,6 +3580,7 @@ const struct mux_ops h2_ops = {
        .update_poll = h2_update_poll,
        .rcv_buf = h2_rcv_buf,
        .snd_buf = h2_snd_buf,
+       .subscribe = h2_subscribe,
        .attach = h2_attach,
        .detach = h2_detach,
        .shutr = h2_shutr,
index b6d0b1aadfbb0ed3226c8a292c27e24d9a485552..059e4995cd76a243fb990c13ed76e1efd53ac0e3 100644 (file)
@@ -177,6 +177,12 @@ static size_t mux_pt_snd_buf(struct conn_stream *cs, const struct buffer *buf, s
        return cs->conn->xprt->snd_buf(cs->conn, buf, count, flags);
 }
 
+/* Called from the upper layer, to subscribe to events */
+static int mux_pt_subscribe(struct conn_stream *cs, int event_type, void *param)
+{
+       return (cs->conn->xprt->subscribe(cs->conn, event_type, param));
+}
+
 #if defined(CONFIG_HAP_LINUX_SPLICE)
 /* Send and get, using splicing */
 static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
@@ -206,6 +212,7 @@ const struct mux_ops mux_pt_ops = {
        .update_poll = mux_pt_update_poll,
        .rcv_buf = mux_pt_rcv_buf,
        .snd_buf = mux_pt_snd_buf,
+       .subscribe = mux_pt_subscribe,
 #if defined(CONFIG_HAP_LINUX_SPLICE)
        .rcv_pipe = mux_pt_rcv_pipe,
        .snd_pipe = mux_pt_snd_pipe,
index 375c453c5d429dc536d352ac4cc4e36a1540e058..c108a42bdab01c83eaf4fc4b781267024dad5ae0 100644 (file)
@@ -424,6 +424,7 @@ static size_t raw_sock_from_buf(struct connection *conn, const struct buffer *bu
 static struct xprt_ops raw_sock = {
        .snd_buf  = raw_sock_from_buf,
        .rcv_buf  = raw_sock_to_buf,
+       .subscribe = conn_subscribe,
 #if defined(CONFIG_HAP_LINUX_SPLICE)
        .rcv_pipe = raw_sock_to_pipe,
        .snd_pipe = raw_sock_from_pipe,
index 56898209a616ef12361f513e0452dd9d7a0fe91c..7e8739a633b9a492dab66986fcb62d2dc85d7400 100644 (file)
@@ -8895,6 +8895,7 @@ static struct cfg_kw_list cfg_kws = {ILH, {
 static struct xprt_ops ssl_sock = {
        .snd_buf  = ssl_sock_from_buf,
        .rcv_buf  = ssl_sock_to_buf,
+       .subscribe = conn_subscribe,
        .rcv_pipe = NULL,
        .snd_pipe = NULL,
        .shutr    = NULL,