]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux: Teach the mux_pt how to deal with idle connections.
authorOlivier Houchard <ohouchard@haproxy.com>
Mon, 5 Nov 2018 17:28:43 +0000 (18:28 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 18 Nov 2018 20:44:03 +0000 (21:44 +0100)
In order to make the mux_pt able to handle idle connections, give it its
own context, where it'll stores the connection, the current conn_stream if
any, and a wait_event, so that it can subscribe to I/O events.
Add a new parameter to the detach() method, that gives the mux a hint
if it should destroy the connection or not when detaching a conn_stream.
If 1, then the mux_pt immediately destroys the connecion, if 0, then it
just subscribes to any read event. If a read happens, it will call
conn_sock_drain(), and if there's a connection error, it'll free the
connection, after removing it from the idle list.

src/mux_pt.c

index dd052c5fda27f683e7d80fd42583290289f8670f..25b1cfe0df0f72d469fd0893424e4d6209447dea 100644 (file)
 #include <common/config.h>
 #include <proto/connection.h>
 #include <proto/stream.h>
+#include <proto/task.h>
+
+static struct pool_head *pool_head_pt_ctx;
+
+struct mux_pt_ctx {
+       struct conn_stream *cs;
+       struct connection *conn;
+       struct wait_event wait_event;
+};
+
+static void mux_pt_destroy(struct mux_pt_ctx *ctx)
+{
+       struct connection *conn = ctx->conn;
+
+       LIST_DEL(&conn->list);
+       conn_stop_tracking(conn);
+       conn_full_close(conn);
+       if (conn->destroy_cb)
+               conn->destroy_cb(conn);
+       /* We don't bother unsubscribing here, as we're about to destroy
+        * both the connection and the mux_pt_ctx
+        */
+       conn_free(conn);
+       pool_free(pool_head_pt_ctx, ctx);
+}
+
+/* Callback, used when we get I/Os while in idle mode */
+static struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned short status)
+{
+       struct mux_pt_ctx *ctx = tctx;
+
+       conn_sock_drain(ctx->conn);
+       if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH))
+               mux_pt_destroy(ctx);
+       else
+               ctx->conn->xprt->subscribe(ctx->conn, SUB_CAN_RECV,
+                   &ctx->wait_event);
+
+       return NULL;
+}
 
 /* Initialize the mux once it's attached. It is expected that conn->mux_ctx
  * points to the existing conn_stream (for outgoing connections) or NULL (for
 static int mux_pt_init(struct connection *conn, struct proxy *prx)
 {
        struct conn_stream *cs = conn->mux_ctx;
+       struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
+
+       if (!ctx)
+               goto fail;
+
+       ctx->wait_event.task = tasklet_new();
+       if (!ctx->wait_event.task)
+               goto fail_free_ctx;
+       ctx->wait_event.task->context = ctx;
+       ctx->wait_event.task->process = mux_pt_io_cb;
+       ctx->wait_event.wait_reason = 0;
+       ctx->conn = conn;
 
        if (!cs) {
                cs = cs_new(conn);
                if (!cs)
-                       goto fail;
+                       goto fail_free_ctx;
 
                if (stream_create_from_cs(cs) < 0)
                        goto fail_free;
 
-               conn->mux_ctx = cs;
        }
+       conn->mux_ctx = ctx;
+       ctx->cs = cs;
        return 0;
 
  fail_free:
        cs_free(cs);
+fail_free_ctx:
+       if (ctx->wait_event.task)
+               tasklet_free(ctx->wait_event.task);
+       pool_free(pool_head_pt_ctx, ctx);
  fail:
        return -1;
 }
@@ -46,13 +103,22 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx)
  */
 static int mux_pt_wake(struct connection *conn)
 {
-       struct conn_stream *cs = conn->mux_ctx;
-       int ret;
+       struct mux_pt_ctx *ctx = conn->mux_ctx;
+       struct conn_stream *cs = ctx->cs;
+       int ret = 0;
 
-       ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
+       if (cs) {
+               ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
 
-       if (ret < 0)
-               return ret;
+               if (ret < 0)
+                       return ret;
+       } else {
+               conn_sock_drain(conn);
+               if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
+                       mux_pt_destroy(ctx);
+                       return -1;
+               }
+       }
 
        /* If we had early data, and we're done with the handshake
         * then whe know the data are safe, and we can remove the flag.
@@ -77,7 +143,8 @@ static struct conn_stream *mux_pt_attach(struct connection *conn)
  */
 static const struct conn_stream *mux_pt_get_first_cs(const struct connection *conn)
 {
-       struct conn_stream *cs = conn->mux_ctx;
+       struct mux_pt_ctx *ctx = conn->mux_ctx;
+       struct conn_stream *cs = ctx->cs;
 
        return cs;
 }
@@ -88,13 +155,12 @@ static const struct conn_stream *mux_pt_get_first_cs(const struct connection *co
 static void mux_pt_detach(struct conn_stream *cs)
 {
        struct connection *conn = cs->conn;
+       struct mux_pt_ctx *ctx = cs->conn->mux_ctx;
 
-       LIST_DEL(&conn->list);
-       conn_stop_tracking(conn);
-       conn_full_close(conn);
-       if (conn->destroy_cb)
-               conn->destroy_cb(conn);
-       conn_free(conn);
+       /* Subscribe, to know if we got disconnected */
+       conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event);
+       ctx->cs = NULL;
+       mux_pt_destroy(ctx);
 }
 
 static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
@@ -209,4 +275,6 @@ __attribute__((constructor))
 static void __mux_pt_init(void)
 {
        register_mux_proto(&mux_proto_pt);
+       pool_head_pt_ctx = create_pool("mux_pt", sizeof(struct mux_pt_ctx),
+           MEM_F_SHARED);
 }