]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: connections: Introduce an unsubscribe method.
authorOlivier Houchard <ohouchard@haproxy.com>
Fri, 28 Sep 2018 15:57:58 +0000 (17:57 +0200)
committerWilly Tarreau <w@1wt.eu>
Thu, 11 Oct 2018 13:34:21 +0000 (15:34 +0200)
As we don't know how subscriptions are handled, we can't just assume we can
use LIST_DEL() to unsubscribe, so introduce a new method to mux and connections
to do so.

include/proto/connection.h
include/types/connection.h
src/connection.c
src/mux_h2.c
src/mux_pt.c
src/raw_sock.c
src/ssl_sock.c

index 31f82b8b18aa29442131dabf6aeffa65558d9ebb..028dc232c13a1a7412a46db8ee53a6ef65eb4c73 100644 (file)
@@ -51,6 +51,8 @@ int make_proxy_line_v1(char *buf, int buf_len, struct sockaddr_storage *src, str
 int make_proxy_line_v2(char *buf, int buf_len, struct server *srv, struct connection *remote);
 
 int conn_subscribe(struct connection *conn, int event_type, void *param);
+int conn_unsubscribe(struct connection *conn, int event_type, void *param);
+
 /* receive a NetScaler Client IP insertion header over a connection */
 int conn_recv_netscaler_cip(struct connection *conn, int flag);
 
index 27ae76e21c661ee4e9f15984f57abab376665d35..26f9bed5889136cb8e111b27a5759a940199f1a0 100644 (file)
@@ -301,6 +301,7 @@ struct xprt_ops {
        int  (*get_alpn)(const struct connection *conn, const char **str, int *len); /* get application layer name */
        char name[8];                               /* transport layer name, zero-terminated */
        int (*subscribe)(struct connection *conn, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
+       int (*unsubscribe)(struct connection *conn, int event_type, void *param); /* Unsubscribe to events */
 };
 
 /* mux_ops describes the mux operations, which are to be performed at the
@@ -325,6 +326,7 @@ struct mux_ops {
        void (*detach)(struct conn_stream *); /* Detach a conn_stream from an outgoing connection, when the request is done */
        void (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd" */
        int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
+       int (*unsubscribe)(struct conn_stream *cs, int event_type, void *param); /* Unsubscribe to events */
        unsigned int flags;                           /* some flags characterizing the mux's capabilities (MX_FL_*) */
        char name[8];                                 /* mux layer name, zero-terminated */
 };
@@ -338,7 +340,6 @@ struct mux_ops {
  */
 struct data_cb {
        int  (*wake)(struct conn_stream *cs);  /* data-layer callback to report activity */
-       int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
        char name[8];                           /* data layer name, zero-terminated */
 };
 
index c0da874bd48c926e361a26df630fc0362c76ad29..c8f1df11630c811805da321cceb29d5da5886eaf 100644 (file)
@@ -358,12 +358,38 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags)
        return ret;
 }
 
+int conn_unsubscribe(struct connection *conn, int event_type, void *param)
+{
+       struct wait_list *sw;
+
+       if (event_type & SUB_CAN_RECV) {
+               sw = param;
+               if (sw->wait_reason & SUB_CAN_RECV) {
+                       LIST_DEL(&sw->list);
+                       LIST_INIT(&sw->list);
+                       sw->wait_reason &= ~SUB_CAN_RECV;
+                       if (sw->wait_reason & SUB_CAN_SEND)
+                               LIST_ADDQ(&conn->send_wait_list, &sw->list);
+               }
+       }
+       if (event_type & SUB_CAN_SEND) {
+               sw = param;
+               if (sw->wait_reason & SUB_CAN_SEND) {
+                       LIST_DEL(&sw->list);
+                       LIST_INIT(&sw->list);
+                       sw->wait_reason &= ~SUB_CAN_SEND;
+                       if (sw->wait_reason & SUB_CAN_RECV)
+                               LIST_ADDQ(&conn->recv_wait_list, &sw->list);
+               }
+       }
+       return 0;
+}
+
 int conn_subscribe(struct connection *conn, int event_type, void *param)
 {
        struct wait_list *sw;
 
-       switch (event_type) {
-       case SUB_CAN_RECV:
+       if (event_type & SUB_CAN_RECV) {
                sw = param;
                if (!(sw->wait_reason & SUB_CAN_RECV)) {
                        sw->wait_reason |= SUB_CAN_RECV;
@@ -377,8 +403,9 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
                        } else
                                LIST_ADDQ(&conn->recv_wait_list, &sw->list);
                }
-               return 0;
-       case SUB_CAN_SEND:
+               event_type &= ~SUB_CAN_RECV;
+       }
+       if (event_type & SUB_CAN_SEND) {
                sw = param;
                if (!(sw->wait_reason & SUB_CAN_SEND)) {
                        sw->wait_reason |= SUB_CAN_SEND;
@@ -392,11 +419,11 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
                        } else
                                LIST_ADDQ(&conn->send_wait_list, &sw->list);
                }
-               return 0;
-       default:
-               break;
+               event_type &= ~SUB_CAN_SEND;
        }
-       return (-1);
+       if (event_type != 0)
+               return (-1);
+       return 0;
 }
 
 /* Drains possibly pending incoming data on the file descriptor attached to the
index 558375433c2957fa40c9e5323a83258ad94d31ea..86201514629e1b3a7c8b73d81cba5010bfd31b1a 100644 (file)
@@ -3541,16 +3541,16 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
        struct h2s *h2s = cs->ctx;
        struct h2c *h2c = h2s->h2c;
 
-       switch (event_type) {
-       case SUB_CAN_RECV:
+       if (event_type & SUB_CAN_RECV) {
                sw = param;
                if (!(sw->wait_reason & SUB_CAN_RECV)) {
                        sw->wait_reason |= SUB_CAN_RECV;
                        sw->handle = h2s;
                        h2s->recv_wait_list = sw;
                }
-               return 0;
-       case SUB_CAN_SEND:
+               event_type &= ~SUB_CAN_RECV;
+       }
+       if (event_type & SUB_CAN_SEND) {
                sw = param;
                if (!(sw->wait_reason & SUB_CAN_SEND)) {
                        sw->wait_reason |= SUB_CAN_SEND;
@@ -3560,15 +3560,38 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
                        else
                                LIST_ADDQ(&h2c->send_list, &sw->list);
                }
-               return 0;
-       default:
-               break;
+               event_type &= ~SUB_CAN_SEND;
        }
-       return -1;
+       if (event_type != 0)
+               return -1;
+       return 0;
 
 
 }
 
+static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param)
+{
+       struct wait_list *sw;
+       struct h2s *h2s = cs->ctx;
+
+       if (event_type & SUB_CAN_RECV) {
+               sw = param;
+               if (h2s->recv_wait_list == sw) {
+                       sw->wait_reason &= ~SUB_CAN_RECV;
+                       h2s->recv_wait_list = NULL;
+               }
+       }
+       if (event_type & SUB_CAN_SEND) {
+               sw = param;
+               if (sw->wait_reason & SUB_CAN_SEND) {
+                       LIST_DEL(&sw->list);
+                       LIST_INIT(&sw->list);
+               }
+       }
+       return 0;
+}
+
+
 /* Called from the upper layer, to receive data */
 static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
 {
@@ -3767,6 +3790,7 @@ const struct mux_ops h2_ops = {
        .snd_buf = h2_snd_buf,
        .rcv_buf = h2_rcv_buf,
        .subscribe = h2_subscribe,
+       .unsubscribe = h2_unsubscribe,
        .attach = h2_attach,
        .detach = h2_detach,
        .shutr = h2_shutr,
index 5b864199ec02cf4ff209627c5740016cb4c884b8..3a573b5a937acdce814c9c305ec22c33f1795d2d 100644 (file)
@@ -162,6 +162,11 @@ static int mux_pt_subscribe(struct conn_stream *cs, int event_type, void *param)
        return (cs->conn->xprt->subscribe(cs->conn, event_type, param));
 }
 
+static int mux_pt_unsubscribe(struct conn_stream *cs, int event_type, void *param)
+{
+       return (cs->conn->xprt->unsubscribe(cs->conn, event_type, param));
+}
+
 #if defined(CONFIG_HAP_LINUX_SPLICE)
 /* Send and get, using splicing */
 static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
@@ -190,6 +195,7 @@ const struct mux_ops mux_pt_ops = {
        .rcv_buf = mux_pt_rcv_buf,
        .snd_buf = mux_pt_snd_buf,
        .subscribe = mux_pt_subscribe,
+       .unsubscribe = mux_pt_unsubscribe,
 #if defined(CONFIG_HAP_LINUX_SPLICE)
        .rcv_pipe = mux_pt_rcv_pipe,
        .snd_pipe = mux_pt_snd_pipe,
index c108a42bdab01c83eaf4fc4b781267024dad5ae0..df861f48d59f9c7689b213763ec2bf81fd68039b 100644 (file)
@@ -425,6 +425,7 @@ static struct xprt_ops raw_sock = {
        .snd_buf  = raw_sock_from_buf,
        .rcv_buf  = raw_sock_to_buf,
        .subscribe = conn_subscribe,
+       .unsubscribe = conn_unsubscribe,
 #if defined(CONFIG_HAP_LINUX_SPLICE)
        .rcv_pipe = raw_sock_to_pipe,
        .snd_pipe = raw_sock_from_pipe,
index d4827e5393fb5eec36fafad795e9515809c87266..b78dc871b436e18ef758a3cb247dae2402332755 100644 (file)
@@ -9023,6 +9023,7 @@ static struct xprt_ops ssl_sock = {
        .snd_buf  = ssl_sock_from_buf,
        .rcv_buf  = ssl_sock_to_buf,
        .subscribe = conn_subscribe,
+       .unsubscribe = conn_unsubscribe,
        .rcv_pipe = NULL,
        .snd_pipe = NULL,
        .shutr    = NULL,