From: Olivier Houchard Date: Tue, 17 Jul 2018 16:46:31 +0000 (+0200) Subject: MINOR: connections/mux: Add a new "subscribe" method. X-Git-Tag: v1.9-dev1~68 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6ff2039d13a3acaa18887650f9d56a9e5c848379;p=thirdparty%2Fhaproxy.git MINOR: connections/mux: Add a new "subscribe" method. Add a new "subscribe" method for connection, conn_stream and mux, so that upper layer can subscribe to them, to be called when the event happens. Right now, the only event implemented is "SUB_CAN_SEND", where the upper layer can register to be called back when it is possible to send data. The connection and conn_stream got a new "send_wait_list" entry, which required to move a few struct members around to maintain an efficient cache alignment (and actually this slightly improved performance). --- diff --git a/include/proto/connection.h b/include/proto/connection.h index 8566736fd6..16103b7db5 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -29,6 +29,7 @@ #include #include #include +#include extern struct pool_head *pool_head_connection; extern struct pool_head *pool_head_connstream; @@ -49,6 +50,7 @@ int make_proxy_line(char *buf, int buf_len, struct server *srv, struct connectio int make_proxy_line_v1(char *buf, int buf_len, struct sockaddr_storage *src, struct sockaddr_storage *dst); int make_proxy_line_v2(char *buf, int buf_len, struct server *srv, struct connection *remote); +int conn_subscribe(struct connection *conn, int event_type, void *param); /* receive a NetScaler Client IP insertion header over a connection */ int conn_recv_netscaler_cip(struct connection *conn, int flag); @@ -596,6 +598,7 @@ static inline void cs_init(struct conn_stream *cs, struct connection *conn) { cs->obj_type = OBJ_TYPE_CS; cs->flags = CS_FL_NONE; + LIST_INIT(&cs->send_wait_list); cs->conn = conn; } @@ -621,6 +624,7 @@ static inline void conn_init(struct connection *conn) conn->destroy_cb = NULL; conn->proxy_netns = NULL; LIST_INIT(&conn->list); + LIST_INIT(&conn->send_wait_list); } /* sets as the connection's owner */ diff --git a/include/types/connection.h b/include/types/connection.h index ad406acd34..de0c32a224 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -44,6 +44,10 @@ struct buffer; struct server; struct pipe; +struct wait_list { + struct tasklet *task; + struct list list; +}; /* A connection handle is how we differenciate two connections on the lower * layers. It usually is a file descriptor but can be a connection id. @@ -85,6 +89,9 @@ enum cs_shw_mode { CS_SHW_SILENT = 1, /* imminent close, don't notify peer */ }; +enum sub_event_type { + SUB_CAN_SEND = 0x00000001, /* Schedule the tasklet when we can send more */ +}; /* For each direction, we have a CO_FL_{SOCK,DATA}__ENA flag, which * indicates if read or write is desired in that direction for the respective * layers. The current status corresponding to the current layer being used is @@ -287,6 +294,7 @@ struct xprt_ops { void (*destroy_srv)(struct server *srv); /* destroy a server context */ int (*get_alpn)(const struct connection *conn, const char **str, int *len); /* get application layer name */ char name[8]; /* transport layer name, zero-terminated */ + int (*subscribe)(struct connection *conn, int event_type, void *param); /* Subscribe to events, such as "being able to send" */ }; /* mux_ops describes the mux operations, which are to be performed at the @@ -312,6 +320,7 @@ struct mux_ops { struct conn_stream *(*attach)(struct connection *); /* Create and attach a conn_stream to an outgoing connection */ void (*detach)(struct conn_stream *); /* Detach a conn_stream from an outgoing connection, when the request is done */ void (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd" */ + int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */ unsigned int flags; /* some flags characterizing the mux's capabilities (MX_FL_*) */ char name[8]; /* mux layer name, zero-terminated */ }; @@ -327,6 +336,7 @@ struct data_cb { void (*recv)(struct conn_stream *cs); /* data-layer recv callback */ void (*send)(struct conn_stream *cs); /* data-layer send callback */ int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */ + int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */ char name[8]; /* data layer name, zero-terminated */ }; @@ -358,8 +368,9 @@ struct conn_src { */ struct conn_stream { enum obj_type obj_type; /* differentiates connection from applet context */ + unsigned int flags; /* CS_FL_* */ struct connection *conn; /* xprt-level connection */ - unsigned int flags; /* CS_FL_* */ + struct list send_wait_list; /* list of tasks to wake when we're ready to send */ void *data; /* pointer to upper layer's entity (eg: stream interface) */ const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */ void *ctx; /* mux-specific context */ @@ -376,6 +387,7 @@ struct conn_stream { * connection being instanciated. It must be removed once done. */ struct connection { + /* first cache line */ enum obj_type obj_type; /* differentiates connection from applet context */ unsigned char err_code; /* CO_ER_* */ signed short send_proxy_ofs; /* <0 = offset to (re)send from the end, >0 = send all */ @@ -386,15 +398,20 @@ struct connection { void *xprt_ctx; /* general purpose pointer, initialized to NULL */ void *mux_ctx; /* mux-specific context, initialized to NULL */ void *owner; /* pointer to the owner session for incoming connections, or NULL */ + enum obj_type *target; /* the target to connect to (server, proxy, applet, ...) */ + + /* second cache line */ + struct list send_wait_list; /* list of tasks to wake when we're ready to send */ + struct list list; /* attach point to various connection lists (idle, ...) */ int xprt_st; /* transport layer state, initialized to zero */ int tmp_early_data; /* 1st byte of early data, if any */ int sent_early_data; /* Amount of early data we sent so far */ union conn_handle handle; /* connection handle at the socket layer */ - enum obj_type *target; /* the target to connect to (server, proxy, applet, ...) */ - struct list list; /* attach point to various connection lists (idle, ...) */ + const struct netns_entry *proxy_netns; int (*xprt_done_cb)(struct connection *conn); /* callback to notify of end of handshake */ + + /* third cache line and beyond */ void (*destroy_cb)(struct connection *conn); /* callback to notify of imminent death of the connection */ - const struct netns_entry *proxy_netns; struct { struct sockaddr_storage from; /* client address, or address to spoof when connecting to the server */ struct sockaddr_storage to; /* address reached by the client, or address to connect to */ diff --git a/src/connection.c b/src/connection.c index db869fb0f0..94e7209b39 100644 --- a/src/connection.c +++ b/src/connection.c @@ -128,6 +128,13 @@ void conn_fd_handler(int fd) */ flags = 0; conn->mux->send(conn); + while (!LIST_ISEMPTY(&conn->send_wait_list)) { + struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + tasklet_wakeup(sw->task); + } } /* The data transfer starts here and stops on error and handshakes. Note @@ -323,6 +330,22 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags) return ret; } +int conn_subscribe(struct connection *conn, int event_type, void *param) +{ + struct wait_list *sw; + + switch (event_type) { + case SUB_CAN_SEND: + sw = param; + if (LIST_ISEMPTY(&sw->list)) + LIST_ADDQ(&conn->send_wait_list, &sw->list); + return 0; + default: + break; + } + return (-1); +} + /* Drains possibly pending incoming data on the file descriptor attached to the * connection and update the connection's flags accordingly. This is used to * know whether we need to disable lingering on close. Returns non-zero if it diff --git a/src/mux_h2.c b/src/mux_h2.c index e25208360a..ba6bd8d8ac 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -120,6 +120,7 @@ struct h2c { struct list send_list; /* list of blocked streams requesting to send */ struct list fctl_list; /* list of streams blocked by connection's fctl */ struct buffer_wait buf_wait; /* wait list for buffer allocations */ + struct list send_wait_list; /* list of tasks to wake when we're ready to send */ }; /* H2 stream state, in h2s->st */ @@ -379,6 +380,7 @@ static int h2c_frt_init(struct connection *conn) if (t) task_queue(t); conn_xprt_want_recv(conn); + LIST_INIT(&h2c->send_wait_list); /* mux->wake will be called soon to complete the operation */ return 0; @@ -2228,6 +2230,19 @@ static void h2_send(struct connection *conn) /* output closed, nothing to send, clear the buffer to release it */ b_reset(&h2c->mbuf); } + /* We're not full anymore, so we can wake any task that are waiting + * for us. + */ + if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) { + while (!LIST_ISEMPTY(&h2c->send_wait_list)) { + struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + tasklet_wakeup(sw->task); + } + + } } /* callback called on any event by the connection handler. @@ -3369,6 +3384,26 @@ static size_t h2s_frt_make_resp_data(struct h2s *h2s, const struct buffer *buf, return total; } +/* Called from the upper layer, to subscribe to events, such as being able to send */ +static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) +{ + struct wait_list *sw; + struct h2s *h2s = cs->ctx; + + switch (event_type) { + case SUB_CAN_SEND: + sw = param; + if (LIST_ISEMPTY(&h2s->list) && LIST_ISEMPTY(&sw->list)) + LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list); + return 0; + default: + break; + } + return -1; + + +} + /* Called from the upper layer, to send data */ static size_t h2_snd_buf(struct conn_stream *cs, const struct buffer *buf, size_t count, int flags) { @@ -3545,6 +3580,7 @@ const struct mux_ops h2_ops = { .update_poll = h2_update_poll, .rcv_buf = h2_rcv_buf, .snd_buf = h2_snd_buf, + .subscribe = h2_subscribe, .attach = h2_attach, .detach = h2_detach, .shutr = h2_shutr, diff --git a/src/mux_pt.c b/src/mux_pt.c index b6d0b1aadf..059e4995cd 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -177,6 +177,12 @@ static size_t mux_pt_snd_buf(struct conn_stream *cs, const struct buffer *buf, s return cs->conn->xprt->snd_buf(cs->conn, buf, count, flags); } +/* Called from the upper layer, to subscribe to events */ +static int mux_pt_subscribe(struct conn_stream *cs, int event_type, void *param) +{ + return (cs->conn->xprt->subscribe(cs->conn, event_type, param)); +} + #if defined(CONFIG_HAP_LINUX_SPLICE) /* Send and get, using splicing */ static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count) @@ -206,6 +212,7 @@ const struct mux_ops mux_pt_ops = { .update_poll = mux_pt_update_poll, .rcv_buf = mux_pt_rcv_buf, .snd_buf = mux_pt_snd_buf, + .subscribe = mux_pt_subscribe, #if defined(CONFIG_HAP_LINUX_SPLICE) .rcv_pipe = mux_pt_rcv_pipe, .snd_pipe = mux_pt_snd_pipe, diff --git a/src/raw_sock.c b/src/raw_sock.c index 375c453c5d..c108a42bda 100644 --- a/src/raw_sock.c +++ b/src/raw_sock.c @@ -424,6 +424,7 @@ static size_t raw_sock_from_buf(struct connection *conn, const struct buffer *bu static struct xprt_ops raw_sock = { .snd_buf = raw_sock_from_buf, .rcv_buf = raw_sock_to_buf, + .subscribe = conn_subscribe, #if defined(CONFIG_HAP_LINUX_SPLICE) .rcv_pipe = raw_sock_to_pipe, .snd_pipe = raw_sock_from_pipe, diff --git a/src/ssl_sock.c b/src/ssl_sock.c index 56898209a6..7e8739a633 100644 --- a/src/ssl_sock.c +++ b/src/ssl_sock.c @@ -8895,6 +8895,7 @@ static struct cfg_kw_list cfg_kws = {ILH, { static struct xprt_ops ssl_sock = { .snd_buf = ssl_sock_from_buf, .rcv_buf = ssl_sock_to_buf, + .subscribe = conn_subscribe, .rcv_pipe = NULL, .snd_pipe = NULL, .shutr = NULL,