]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: conn-stream: Pre-allocate endpoint to create CS from muxes and applets
authorChristopher Faulet <cfaulet@haproxy.com>
Wed, 23 Mar 2022 10:01:09 +0000 (11:01 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 13 Apr 2022 13:10:14 +0000 (15:10 +0200)
It is a transient commit to prepare next changes. Now, when a conn-stream is
created from an applet or a multiplexer, an endpoint is always provided. In
addition, the API to create a conn-stream was specialized to have one
function per type.

The next step will be to share the endpoint structure.

19 files changed:
include/haproxy/applet.h
include/haproxy/conn_stream.h
include/haproxy/mux_quic.h
src/backend.c
src/check.c
src/conn_stream.c
src/dns.c
src/flt_spoe.c
src/hlua.c
src/http_client.c
src/mux_fcgi.c
src/mux_h1.c
src/mux_h2.c
src/mux_pt.c
src/peers.c
src/sink.c
src/stream.c
src/stream_interface.c
src/tcpcheck.c

index 97164ac30e9b46787dc89d65307bad286dd8eeb9..97b9c347b4fe55530a2c92d46ccf0f6b31a51484 100644 (file)
@@ -59,7 +59,7 @@ static inline void appctx_init(struct appctx *appctx)
  * appctx_free(). <applet> is assigned as the applet, but it can be NULL. The
  * applet's task is always created on the current thread.
  */
-static inline struct appctx *appctx_new(struct applet *applet, void *owner)
+static inline struct appctx *appctx_new(struct applet *applet)
 {
        struct appctx *appctx;
 
@@ -67,7 +67,6 @@ static inline struct appctx *appctx_new(struct applet *applet, void *owner)
        if (likely(appctx != NULL)) {
                appctx->obj_type = OBJ_TYPE_APPCTX;
                appctx->applet = applet;
-               appctx->owner = owner;
                appctx_init(appctx);
                appctx->t = task_new_here();
                if (unlikely(appctx->t == NULL)) {
index 638b0cf2d6d2f76ccaed2d5d8e8996a029d2419a..e0c3b50bc3c902293730275c0ca0d41be236120a 100644 (file)
 #define _HAPROXY_CONN_STREAM_H
 
 #include <haproxy/api.h>
-#include <haproxy/applet.h>
 #include <haproxy/connection.h>
 #include <haproxy/conn_stream-t.h>
 #include <haproxy/obj_type.h>
 
+struct buffer;
+struct session;
+struct appctx;
 struct stream;
 struct stream_interface;
 struct check;
@@ -38,10 +40,16 @@ struct cs_endpoint *cs_endpoint_new();
 void cs_endpoint_free(struct cs_endpoint *endp);
 
 struct conn_stream *cs_new(struct cs_endpoint *endp);
+struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input);
+struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input);
+struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags);
+struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags);
 void cs_free(struct conn_stream *cs);
-void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx);
-void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx);
-int cs_attach_app(struct conn_stream *cs, enum obj_type *app);
+
+void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx);
+void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx);
+int cs_attach_strm(struct conn_stream *cs, struct stream *strm);
+
 void cs_detach_endp(struct conn_stream *cs);
 void cs_detach_app(struct conn_stream *cs);
 
index aab468752f612c4ac4f29300a6709d4e9a2a54ab..6c97b7fda9e8b03a30392ab58017048096b45bfd 100644 (file)
@@ -115,15 +115,13 @@ static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *b
                return NULL;
        endp->target = qcs;
        endp->ctx = qcs->qcc->conn;
-       cs = cs_new(endp);
+       endp->flags |= CS_EP_T_MUX;
+       cs = cs_new_from_mux(endp, qcs->qcc->conn->owner, buf);
        if (!cs) {
                cs_endpoint_free(endp);
                return NULL;
        }
-       cs_attach_endp_mux(cs, qcs, qcs->qcc->conn);
        qcs->cs = cs;
-       stream_new(qcs->qcc->conn->owner, cs, buf);
-
        ++qcs->qcc->nb_cs;
 
        return cs;
index 2f33f37c22f4242c16f738d2c62b949686dc85c7..f8c13b4f4775414a15cb894603e05eb076556e0f 100644 (file)
@@ -1570,7 +1570,9 @@ skip_reuse:
                        return SF_ERR_INTERNAL;  /* how did we get there ? */
                }
 
-               cs_attach_endp_mux(s->csb, NULL, srv_conn);
+               cs_attach_mux(s->csb, NULL, srv_conn);
+               srv_conn->ctx = s->csb;
+
 #if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
                if (!srv ||
                    (srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||
index b26e7b9b27873fb4361c20bfb69efdbe55a02de6..44583c243d8df3645cf7def476fc7d71b42e663d 100644 (file)
@@ -1391,11 +1391,9 @@ int start_check_task(struct check *check, int mininter,
        if (check->type == PR_O2_EXT_CHK)
                t = task_new_on(0);
        else {
-               check->cs = cs_new(NULL);
+               check->cs = cs_new_from_check(check, CS_FL_NONE);
                if (!check->cs)
                        goto fail_alloc_cs;
-               if (cs_attach_app(check->cs, &check->obj_type) < 0)
-                       goto fail_attach_cs;
                t = task_new_anywhere();
        }
 
@@ -1420,7 +1418,6 @@ int start_check_task(struct check *check, int mininter,
        return 1;
 
   fail_alloc_task:
-  fail_attach_cs:
        cs_free(check->cs);
   fail_alloc_cs:
        ha_alert("Starting [%s:%s] check: out of memory.\n",
index 5b8177484b013456e2087bc183901a00fe72b067..24a22b5219f68a872b71a425cf8472edcc19dba7 100644 (file)
@@ -75,6 +75,68 @@ struct conn_stream *cs_new(struct cs_endpoint *endp)
        return NULL;
 }
 
+struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
+{
+       struct conn_stream *cs;
+
+       cs = cs_new(endp);
+       if (unlikely(!cs))
+               return NULL;
+       if (unlikely(!stream_new(sess, cs, input))) {
+               pool_free(pool_head_connstream, cs);
+               cs = NULL;
+       }
+       return cs;
+}
+
+struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
+{
+       struct conn_stream *cs;
+       struct appctx *appctx = endp->ctx;
+
+       cs = cs_new(endp);
+       if (unlikely(!cs))
+               return NULL;
+       appctx->owner = cs;
+       if (unlikely(!stream_new(sess, cs, input))) {
+               pool_free(pool_head_connstream, cs);
+               cs = NULL;
+       }
+       return cs;
+}
+
+struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags)
+{
+       struct conn_stream *cs;
+
+       cs = cs_new(NULL);
+       if (unlikely(!cs))
+               return NULL;
+       cs->flags |= flags;
+       cs->si = si_new(cs);
+       if (unlikely(!cs->si)) {
+               cs_free(cs);
+               return NULL;
+       }
+       cs->app = &strm->obj_type;
+       cs->si->ops = &si_embedded_ops;
+       cs->data_cb = NULL;
+       return cs;
+}
+
+struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags)
+{
+       struct conn_stream *cs;
+
+       cs = cs_new(NULL);
+       if (unlikely(!cs))
+               return NULL;
+       cs->flags |= flags;
+       cs->app = &check->obj_type;
+       cs->data_cb = &check_conn_cb;
+       return cs;
+}
+
 /* Releases a conn_stream previously allocated by cs_new(), as well as any
  * buffer it would still hold.
  */
@@ -89,11 +151,11 @@ void cs_free(struct conn_stream *cs)
 
 
 /* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */
-void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx)
+void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
 {
        struct connection *conn = ctx;
 
-       cs->endp->target = endp;
+       cs->endp->target = target;
        cs->endp->ctx = ctx;
        cs->endp->flags |= CS_EP_T_MUX;
        if (!conn->ctx)
@@ -107,11 +169,11 @@ void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx)
 }
 
 /* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */
-void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx)
+void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
 {
-       struct appctx *appctx = endp;
+       struct appctx *appctx = target;
 
-       cs->endp->target = endp;
+       cs->endp->target = target;
        cs->endp->ctx = ctx;
        cs->endp->flags |= CS_EP_T_APPLET;
        appctx->owner = cs;
@@ -122,31 +184,26 @@ void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx)
 }
 
 /* Attaches a conn_stream to a app layer and sets the relevant callbacks */
-int cs_attach_app(struct conn_stream *cs, enum obj_type *app)
+int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
 {
-       cs->app = app;
+       cs->app = &strm->obj_type;
 
-       if (objt_stream(app)) {
-               if (!cs->si)
-                       cs->si = si_new(cs);
-               if (unlikely(!cs->si))
-                       return -1;
+       cs->si = si_new(cs);
+       if (unlikely(!cs->si))
+               return -1;
 
-               if (cs->endp->flags & CS_EP_T_MUX) {
-                       cs->si->ops = &si_conn_ops;
-                       cs->data_cb = &si_conn_cb;
-               }
-               else if (cs->endp->flags & CS_EP_T_APPLET) {
-                       cs->si->ops = &si_applet_ops;
-                       cs->data_cb = NULL;
-               }
-               else {
-                       cs->si->ops = &si_embedded_ops;
-                       cs->data_cb = NULL;
-               }
+       if (cs->endp->flags & CS_EP_T_MUX) {
+               cs->si->ops = &si_conn_ops;
+               cs->data_cb = &si_conn_cb;
+       }
+       else if (cs->endp->flags & CS_EP_T_APPLET) {
+               cs->si->ops = &si_applet_ops;
+               cs->data_cb = NULL;
+       }
+       else {
+               cs->si->ops = &si_embedded_ops;
+               cs->data_cb = NULL;
        }
-       else if (objt_check(app))
-               cs->data_cb = &check_conn_cb;
        return 0;
 }
 
index 461e3c723264202e69ff9d50d5f8e77e15356b74..e6f70935c722f18dda003677e7471341aee88632 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -887,19 +887,15 @@ static struct appctx *dns_session_create(struct dns_session *ds)
 {
        struct appctx *appctx;
        struct session *sess;
+       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
        struct applet *applet = &dns_session_applet;
+       struct sockaddr_storage *addr = NULL;
 
-       cs = cs_new(NULL);
-       if (!cs) {
-               ha_alert("out of memory in dns_session_create().\n");
-               goto out_close;
-       }
-
-       appctx = appctx_new(applet, cs);
+       appctx = appctx_new(applet);
        if (!appctx)
-               goto out_free_cs;
+               goto out_close;
        appctx->ctx.sft.ptr = (void *)ds;
 
        sess = session_new(ds->dss->srv->proxy, NULL, &appctx->obj_type);
@@ -908,18 +904,28 @@ static struct appctx *dns_session_create(struct dns_session *ds)
                goto out_free_appctx;
        }
 
-       if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
-               ha_alert("Failed to initialize stream in dns_session_create().\n");
+       if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
                goto out_free_sess;
+
+       endp = cs_endpoint_new();
+       if (!endp)
+               goto out_free_addr;
+       endp->target = appctx;
+       endp->ctx = appctx;
+       endp->flags |= CS_EP_T_APPLET;
+
+       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       if (!cs) {
+               ha_alert("Failed to initialize stream in dns_session_create().\n");
+               cs_endpoint_free(endp);
+               goto out_free_addr;
        }
 
+       s = DISGUISE(cs_strm(cs));
+       cs_si(s->csb)->dst = addr;
+       cs_si(s->csb)->flags |= SI_FL_NOLINGER;
        s->target = &ds->dss->srv->obj_type;
-       if (!sockaddr_alloc(&cs_si(s->csb)->dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
-               goto out_free_strm;
-
-       cs_attach_endp_app(cs, appctx, appctx);
        s->flags = SF_ASSIGNED|SF_ADDR_SET;
-       cs_si(s->csb)->flags |= SI_FL_NOLINGER;
 
        s->do_log = NULL;
        s->uniq_id = 0;
@@ -934,15 +940,12 @@ static struct appctx *dns_session_create(struct dns_session *ds)
        return appctx;
 
        /* Error unrolling */
- out_free_strm:
-       LIST_DELETE(&s->list);
-       pool_free(pool_head_stream, s);
+ out_free_addr:
+       sockaddr_free(&addr);
  out_free_sess:
        session_free(sess);
  out_free_appctx:
        appctx_free(appctx);
- out_free_cs:
-       cs_free(cs);
  out_close:
        return NULL;
 }
index e8daf1be18afe59a4401e07b60e34f528b0aac2e..86fb646f1c058b366159eea5419f81f40220f766 100644 (file)
@@ -1988,16 +1988,13 @@ spoe_create_appctx(struct spoe_config *conf)
 {
        struct appctx      *appctx;
        struct session     *sess;
+       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream      *strm;
 
-       cs = cs_new(NULL);
-       if (!cs)
+       if ((appctx = appctx_new(&spoe_applet)) == NULL)
                goto out_error;
 
-       if ((appctx = appctx_new(&spoe_applet, cs)) == NULL)
-               goto out_free_cs;
-
        appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx);
        if (SPOE_APPCTX(appctx) == NULL)
                goto out_free_appctx;
@@ -2028,14 +2025,24 @@ spoe_create_appctx(struct spoe_config *conf)
        if (!sess)
                goto out_free_spoe;
 
-       if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL)
+       endp = cs_endpoint_new();
+       if (!endp)
+               goto out_free_sess;
+       endp->target = appctx;
+       endp->ctx = appctx;
+       endp->flags |= CS_EP_T_APPLET;
+
+       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       if (!cs) {
+               cs_endpoint_free(endp);
                goto out_free_sess;
+       }
 
-       cs_attach_endp_app(cs, appctx, appctx);
+       strm = DISGUISE(cs_strm(cs));
        stream_set_backend(strm, conf->agent->b.be);
 
        /* applet is waiting for data */
-       si_cant_get(strm->csf->si);
+       si_cant_get(cs_si(strm->csf));
        appctx_wakeup(appctx);
 
        strm->do_log = NULL;
@@ -2058,8 +2065,6 @@ spoe_create_appctx(struct spoe_config *conf)
        pool_free(pool_head_spoe_appctx, SPOE_APPCTX(appctx));
  out_free_appctx:
        appctx_free(appctx);
- out_free_cs:
-       cs_free(cs);
  out_error:
        return NULL;
 }
index 3f9b123b2d3bde3e612c7bbfb1248c804a36e8b4..705c15c19a2b5edaa29d7a56f84270a457e5b2c9 100644 (file)
@@ -2918,8 +2918,9 @@ __LJMP static int hlua_socket_new(lua_State *L)
        struct hlua_socket *socket;
        struct appctx *appctx;
        struct session *sess;
+       struct cs_endpoint *endp;
        struct conn_stream *cs;
-       struct stream *strm;
+       struct stream *s;
 
        /* Check stack size. */
        if (!lua_checkstack(L, 3)) {
@@ -2944,17 +2945,11 @@ __LJMP static int hlua_socket_new(lua_State *L)
        lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref);
        lua_setmetatable(L, -2);
 
-       cs = cs_new(NULL);
-       if (!cs) {
-               hlua_pusherror(L, "socket: out of memory");
-               goto out_fail_conf;
-       }
-
        /* Create the applet context */
-       appctx = appctx_new(&update_applet, cs);
+       appctx = appctx_new(&update_applet);
        if (!appctx) {
                hlua_pusherror(L, "socket: out of memory");
-               goto out_fail_cs;
+               goto out_fail_conf;
        }
 
        appctx->ctx.hlua_cosocket.connected = 0;
@@ -2969,13 +2964,21 @@ __LJMP static int hlua_socket_new(lua_State *L)
                goto out_fail_appctx;
        }
 
-       strm = stream_new(sess, cs, &BUF_NULL);
-       if (!strm) {
+       endp = cs_endpoint_new();
+       if (!endp)
+               goto out_fail_sess;
+       endp->target = appctx;
+       endp->ctx = appctx;
+       endp->flags |= CS_EP_T_APPLET;
+
+       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       if (!cs) {
                hlua_pusherror(L, "socket: out of memory");
+               cs_endpoint_free(endp);
                goto out_fail_sess;
        }
 
-       cs_attach_endp_app(cs, appctx, appctx);
+       s = DISGUISE(cs_strm(cs));
 
        /* Initialise cross reference between stream and Lua socket object. */
        xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref);
@@ -2984,11 +2987,11 @@ __LJMP static int hlua_socket_new(lua_State *L)
         * and retrieve data from the server. The connection is initialized
         * with the "struct server".
         */
-       si_set_state(strm->csb->si, SI_ST_ASS);
+       si_set_state(cs_si(s->csb), SI_ST_ASS);
 
        /* Force destination server. */
-       strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
-       strm->target = &socket_tcp->obj_type;
+       s->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
+       s->target = &socket_tcp->obj_type;
 
        return 1;
 
@@ -2996,8 +2999,6 @@ __LJMP static int hlua_socket_new(lua_State *L)
        session_free(sess);
  out_fail_appctx:
        appctx_free(appctx);
- out_fail_cs:
-       cs_free(cs);
  out_fail_conf:
        WILL_LJMP(lua_error(L));
        return 0;
index dd274820739373207aa41fe28714987bc4cb7ed3..55701e5eabd7ecb458b2fbbb002efe3be0508e7c 100644 (file)
@@ -455,8 +455,10 @@ struct appctx *httpclient_start(struct httpclient *hc)
        struct applet *applet = &httpclient_applet;
        struct appctx *appctx;
        struct session *sess;
+       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
+       struct sockaddr_storage *addr = NULL;
        int len;
        struct sockaddr_storage ss_url;
        struct sockaddr_storage* ss_dst;
@@ -476,17 +478,11 @@ struct appctx *httpclient_start(struct httpclient *hc)
                goto out;
        }
 
-       cs = cs_new(NULL);
-       if (!cs) {
-               ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
-               goto out;
-       }
-
        /* The HTTP client will be created in the same thread as the caller,
         * avoiding threading issues */
-       appctx = appctx_new(applet, cs);
+       appctx = appctx_new(applet);
        if (!appctx)
-               goto out_free_cs;
+               goto out;
 
        sess = session_new(httpclient_proxy, NULL, &appctx->obj_type);
        if (!sess) {
@@ -494,25 +490,33 @@ struct appctx *httpclient_start(struct httpclient *hc)
                goto out_free_appctx;
        }
 
-       if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) {
-               ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
-               goto out_free_sess;
-       }
-
-       /* set the "timeout server" */
-       s->req.wto = hc->timeout_server;
-       s->res.rto = hc->timeout_server;
-
        /* if httpclient_set_dst() was used, sets the alternative address */
        if (hc->dst)
                ss_dst = hc->dst;
        else
                ss_dst = &ss_url;
 
-       if (!sockaddr_alloc(&cs_si(s->csb)->dst, ss_dst, sizeof(*hc->dst))) {
-               ha_alert("httpclient: Failed to initialize stream in %s:%d.\n", __FUNCTION__, __LINE__);
-               goto out_free_stream;
+       if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst)))
+               goto out_free_sess;
+
+       endp = cs_endpoint_new();
+       if (!endp)
+               goto out_free_addr;
+       endp->target = appctx;
+       endp->ctx = appctx;
+       endp->flags |= CS_EP_T_APPLET;
+
+       cs = cs_new_from_applet(endp, sess, &hc->req.buf);
+       if (!cs) {
+               ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
+               cs_endpoint_free(endp);
+               goto out_free_addr;
        }
+       s = DISGUISE(cs_strm(cs));
+
+       /* set the "timeout server" */
+       s->req.wto = hc->timeout_server;
+       s->res.rto = hc->timeout_server;
 
        /* choose the SSL server or not */
        switch (out.scheme) {
@@ -529,9 +533,9 @@ struct appctx *httpclient_start(struct httpclient *hc)
                        break;
        }
 
-       cs_attach_endp_app(cs, appctx, appctx);
-       s->flags |= SF_ASSIGNED|SF_ADDR_SET;
+       cs_si(s->csb)->dst = addr;
        cs_si(s->csb)->flags |= SI_FL_NOLINGER;
+       s->flags |= SF_ASSIGNED|SF_ADDR_SET;
        s->res.flags |= CF_READ_DONTWAIT;
 
        /* applet is waiting for data */
@@ -550,14 +554,16 @@ struct appctx *httpclient_start(struct httpclient *hc)
        return appctx;
 
 out_free_stream:
+       cs_detach_app(cs);
        LIST_DELETE(&s->list);
        pool_free(pool_head_stream, s);
+       cs_free(cs);
+out_free_addr:
+       sockaddr_free(&addr);
 out_free_sess:
        session_free(sess);
 out_free_appctx:
        appctx_free(appctx);
-out_free_cs:
-       cs_free(cs);
 out:
 
        return NULL;
index ea956b334100f92fd7ad5b641029a36d412c38a9..e18d9abcf678c078fdf19a15388ea8f19ad8e307 100644 (file)
@@ -1130,7 +1130,7 @@ static struct fcgi_strm *fcgi_conn_stream_new(struct fcgi_conn *fconn, struct co
                TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn);
                goto out;
        }
-       cs_attach_endp_mux(cs, fstrm, fconn->conn);
+       cs_attach_mux(cs, fstrm, fconn->conn);
        fstrm->cs = cs;
        fstrm->sess = sess;
        fconn->nb_cs++;
index 669b411051ed69d393f12bb76eb5912dc58978ab..9ebd0ffb1d45c55cddf92de5e6e7d85cab5cd69c 100644 (file)
@@ -717,9 +717,9 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
 {
        struct h1c *h1c = h1s->h1c;
        struct cs_endpoint *endp;
-       struct conn_stream *cs;
 
        TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
+
        endp = cs_endpoint_new();
        if (!endp) {
                TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
@@ -727,36 +727,27 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
        }
        endp->target = h1s;
        endp->ctx = h1c->conn;
+       endp->flags |= CS_EP_T_MUX;
        if (h1s->flags & H1S_F_NOT_FIRST)
                endp->flags |= CS_EP_NOT_FIRST;
        if (h1s->req.flags & H1_MF_UPG_WEBSOCKET)
                endp->flags |= CS_EP_WEBSOCKET;
 
-       cs = cs_new(endp);
-       if (!cs) {
+       h1s->cs = cs_new_from_mux(endp, h1c->conn->owner, input);
+       if (!h1s->cs) {
                TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
                cs_endpoint_free(endp);
                goto err;
        }
-       cs_attach_endp_mux(cs, h1s, h1c->conn);
-       h1s->cs = cs;
-
-       if (!stream_new(h1c->conn->owner, cs, input)) {
-               TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
-               goto err_cs;
-       }
 
        HA_ATOMIC_INC(&h1c->px_counters->open_streams);
        HA_ATOMIC_INC(&h1c->px_counters->total_streams);
 
        h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
        TRACE_LEAVE(H1_EV_STRM_NEW, h1c->conn, h1s);
-       return cs;
+       return h1s->cs;
 
-  err_cs:
-       cs_free(cs);
   err:
-       h1s->cs = NULL;
        TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn, h1s);
        return NULL;
 }
@@ -856,7 +847,7 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s
        if (!h1s)
                goto fail;
 
-       cs_attach_endp_mux(cs, h1s, h1c->conn);
+       cs_attach_mux(cs, h1s, h1c->conn);
        h1s->flags |= H1S_F_RX_BLK;
        h1s->cs = cs;
        h1s->sess = sess;
@@ -1004,7 +995,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
                if (!h1c_frt_stream_new(h1c))
                        goto fail;
                h1c->h1s->cs = cs;
-               cs_attach_endp_mux(cs, h1c->h1s, conn);
+               cs_attach_mux(cs, h1c->h1s, conn);
 
                /* Attach the CS but Not ready yet */
                h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;
index 98a837d42b79b01766eb74df21ab86cb0aeb21b0..39e5ece76af58f48a3ff3f037cdb2cce99a8de1a 100644 (file)
@@ -1591,7 +1591,6 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
 {
        struct session *sess = h2c->conn->owner;
        struct cs_endpoint *endp;
-       struct conn_stream *cs;
        struct h2s *h2s;
 
        TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn);
@@ -1608,30 +1607,26 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
                goto out_close;
        endp->target = h2s;
        endp->ctx = h2c->conn;
-       endp->flags |= CS_EP_NOT_FIRST;
+       endp->flags |= (CS_EP_T_MUX|CS_EP_NOT_FIRST);
        /* FIXME wrong analogy between ext-connect and websocket, this need to
         * be refine.
         */
        if (flags & H2_SF_EXT_CONNECT_RCVD)
                endp->flags |= CS_EP_WEBSOCKET;
 
-       cs = cs_new(endp);
-       if (!cs) {
-               cs_endpoint_free(endp);
-               goto out_close;
-       }
-       cs_attach_endp_mux(cs, h2s, h2c->conn);
-       h2s->cs = cs;
-       h2c->nb_cs++;
-
        /* The stream will record the request's accept date (which is either the
         * end of the connection's or the date immediately after the previous
         * request) and the idle time, which is the delay since the previous
         * request. We can set the value now, it will be copied by stream_new().
         */
        sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake;
-       if (!stream_new(h2c->conn->owner, cs, input))
-               goto out_free_cs;
+
+       h2s->cs = cs_new_from_mux(endp, sess, input);
+       if (!h2s->cs) {
+               cs_endpoint_free(endp);
+               goto out_close;
+       }
+       h2c->nb_cs++;
 
        /* We want the accept date presented to the next stream to be the one
         * we have now, the handshake time to be null (since the next stream
@@ -1649,12 +1644,6 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
        TRACE_LEAVE(H2_EV_H2S_NEW, h2c->conn);
        return h2s;
 
- out_free_cs:
-       h2c->nb_cs--;
-       if (!h2c->nb_cs)
-               h2c->idle_start = now_ms;
-       cs_free(cs);
-       h2s->cs = NULL;
  out_close:
        h2s_destroy(h2s);
  out:
@@ -1684,7 +1673,7 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, s
        if (!h2s)
                goto out;
 
-       cs_attach_endp_mux(cs, h2s, h2c->conn);
+       cs_attach_mux(cs, h2s, h2c->conn);
        h2s->cs = cs;
        h2s->sess = sess;
        h2c->nb_cs++;
index e49898e2eb7231058e7534eb260b86a9ca0eab40..5bd71932e324ead35e83fd237797f16c1ef6d543 100644 (file)
@@ -297,18 +297,14 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
                        goto fail_free_ctx;
                endp->target = ctx;
                endp->ctx = conn;
-               cs = cs_new(endp);
+               endp->flags |= CS_EP_T_MUX;
+
+               cs = cs_new_from_mux(endp, sess, input);
                if (!cs) {
                        TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
                        cs_endpoint_free(endp);
                        goto fail_free_ctx;
                }
-               cs_attach_endp_mux(cs, ctx, conn);
-
-               if (!stream_new(conn->owner, cs, &BUF_NULL)) {
-                       TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
-                       goto fail_free;
-               }
                TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
        }
        conn->ctx = ctx;
@@ -320,9 +316,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
        TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
        return 0;
 
- fail_free:
-       cs_free(cs);
-fail_free_ctx:
+ fail_free_ctx:
        if (ctx->wait_event.tasklet)
                tasklet_free(ctx->wait_event.tasklet);
        pool_free(pool_head_pt_ctx, ctx);
@@ -379,7 +373,7 @@ static int mux_pt_attach(struct connection *conn, struct conn_stream *cs, struct
        TRACE_ENTER(PT_EV_STRM_NEW, conn);
        if (ctx->wait_event.events)
                conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
-       cs_attach_endp_mux(cs, ctx, conn);
+       cs_attach_mux(cs, ctx, conn);
        ctx->cs = cs;
        cs->flags |= CS_FL_RCV_MORE;
 
index b88f4ec05b374f3e333d3dc5bb3da31647bdfca0..9701963182f456b490e8b499cced76b8d2dc4b8e 100644 (file)
@@ -3181,8 +3181,10 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        struct proxy *p = peers->peers_fe; /* attached frontend */
        struct appctx *appctx;
        struct session *sess;
+       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
+       struct sockaddr_storage *addr = NULL;
 
        peer->new_conn++;
        peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
@@ -3191,15 +3193,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        peer->last_hdshk = now_ms;
        s = NULL;
 
-       cs = cs_new(NULL);
-       if (!cs) {
-               ha_alert("out of memory in peer_session_create().\n");
-               goto out_close;
-       }
-
-       appctx = appctx_new(&peer_applet, cs);
+       appctx = appctx_new(&peer_applet);
        if (!appctx)
-               goto out_free_cs;
+               goto out_close;
 
        appctx->st0 = PEER_SESS_ST_CONNECT;
        appctx->ctx.peers.ptr = (void *)peer;
@@ -3210,23 +3206,34 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
                goto out_free_appctx;
        }
 
-       if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
-               ha_alert("Failed to initialize stream in peer_session_create().\n");
+       if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr)))
                goto out_free_sess;
+
+       endp = cs_endpoint_new();
+       if (!endp)
+               goto out_free_addr;
+       endp->target = appctx;
+       endp->ctx = appctx;
+       endp->flags |= CS_EP_T_APPLET;
+
+       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       if (!cs) {
+               ha_alert("Failed to initialize stream in peer_session_create().\n");
+               cs_endpoint_free(endp);
+               goto out_free_addr;
        }
 
+       s = DISGUISE(cs_strm(cs));
+
        /* applet is waiting for data */
        si_cant_get(cs_si(s->csf));
        appctx_wakeup(appctx);
 
        /* initiate an outgoing connection */
-       s->target = peer_session_target(peer, s);
-       if (!sockaddr_alloc(&(cs_si(s->csb)->dst), &peer->addr, sizeof(peer->addr)))
-               goto out_free_strm;
-
-       cs_attach_endp_app(cs, appctx, appctx);
-       s->flags = SF_ASSIGNED|SF_ADDR_SET;
+       cs_si(s->csb)->dst = addr;
        cs_si(s->csb)->flags |= SI_FL_NOLINGER;
+       s->flags = SF_ASSIGNED|SF_ADDR_SET;
+       s->target = peer_session_target(peer, s);
 
        s->do_log = NULL;
        s->uniq_id = 0;
@@ -3238,15 +3245,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        return appctx;
 
        /* Error unrolling */
- out_free_strm:
-       LIST_DELETE(&s->list);
-       pool_free(pool_head_stream, s);
+ out_free_addr:
+       sockaddr_free(&addr);
  out_free_sess:
        session_free(sess);
  out_free_appctx:
        appctx_free(appctx);
- out_free_cs:
-       cs_free(cs);
  out_close:
        return NULL;
 }
index d704042d1bb4801c43a0e6b0fc27ca2e0fe39b0f..0f016890dfc71f00730284fc02c54d6ad0df45c8 100644 (file)
@@ -636,22 +636,18 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
        struct proxy *p = sink->forward_px;
        struct appctx *appctx;
        struct session *sess;
+       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
        struct applet *applet = &sink_forward_applet;
+       struct sockaddr_storage *addr = NULL;
 
        if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
                applet = &sink_forward_oc_applet;
 
-       cs = cs_new(NULL);
-       if (!cs) {
-               ha_alert("out of memory in sink_forward_session_create");
-               goto out_close;
-       }
-
-       appctx = appctx_new(applet, cs);
+       appctx = appctx_new(applet);
        if (!appctx)
-               goto out_free_cs;
+               goto out_close;
 
        appctx->ctx.sft.ptr = (void *)sft;
 
@@ -661,19 +657,29 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
                goto out_free_appctx;
        }
 
-       if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
-               ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
+       if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
                goto out_free_sess;
+
+       endp = cs_endpoint_new();
+       if (!endp)
+               goto out_free_addr;
+       endp->target = appctx;
+       endp->ctx = appctx;
+       endp->flags |= CS_EP_T_APPLET;
+
+       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       if (!cs) {
+               ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
+               cs_endpoint_free(endp);
+               goto out_free_addr;
        }
+       s = DISGUISE(cs_strm(cs));
 
+       cs_si(s->csb)->dst = addr;
+       cs_si(s->csb)->flags |= SI_FL_NOLINGER;
 
        s->target = &sft->srv->obj_type;
-       if (!sockaddr_alloc(&cs_si(s->csb)->dst, &sft->srv->addr, sizeof(sft->srv->addr)))
-               goto out_free_strm;
-
-       cs_attach_endp_app(cs, appctx, appctx);
        s->flags = SF_ASSIGNED|SF_ADDR_SET;
-       cs_si(s->csb)->flags |= SI_FL_NOLINGER;
 
        s->do_log = NULL;
        s->uniq_id = 0;
@@ -688,15 +694,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
        return appctx;
 
        /* Error unrolling */
- out_free_strm:
-       LIST_DELETE(&s->list);
-       pool_free(pool_head_stream, s);
+ out_free_addr:
+       sockaddr_free(&addr);
  out_free_sess:
        session_free(sess);
  out_free_appctx:
        appctx_free(appctx);
- out_free_cs:
-       cs_free(cs);
  out_close:
        return NULL;
 }
index e30027c5228b33758b896f20801fa607f59f90b3..b07de9bc07523477c5ec300c72f45a1565df08e7 100644 (file)
@@ -443,15 +443,13 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b
                s->flags |= SF_HTX;
 
        s->csf = cs;
-       s->csb = cs_new(NULL);
+       if (cs_attach_strm(s->csf, s) < 0)
+               goto out_fail_attach_csf;
+
+       s->csb = cs_new_from_strm(s, CS_FL_NONE);
        if (!s->csb)
                goto out_fail_alloc_csb;
 
-       if (cs_attach_app(s->csf, &s->obj_type) < 0)
-               goto out_fail_attach_csf;
-       if (cs_attach_app(s->csb, &s->obj_type) < 0)
-               goto out_fail_attach_csb;
-
        si_set_state(cs_si(s->csf), SI_ST_EST);
        cs_si(s->csf)->hcto = sess->fe->timeout.clientfin;
 
index 4e2f7bb2f30aabae8f21f792341b416428142590..3609647564b2e85fc37118b12b74f466771dcf42 100644 (file)
@@ -339,10 +339,11 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a
 
        DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si));
 
-       appctx = appctx_new(app, si->cs);
+       appctx = appctx_new(app);
        if (!appctx)
                return NULL;
-       cs_attach_endp_app(si->cs, appctx, appctx);
+       cs_attach_applet(si->cs, appctx, appctx);
+       appctx->owner = si->cs;
        appctx->t->nice = si_strm(si)->task->nice;
        si_cant_get(si);
        appctx_wakeup(appctx);
index ee439daeb2ac1db8803e7c32d0ceeecdc601750b..53ee771f4695fca52b3b35235b5d32de4bbb4deb 100644 (file)
@@ -1101,7 +1101,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
                TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
                goto out;
        }
-       cs_attach_endp_mux(check->cs, NULL, conn);
+       cs_attach_mux(check->cs, NULL, conn);
+       conn->ctx = check->cs;
        tasklet_set_tid(check->wait_list.tasklet, tid);
        conn_set_owner(conn, check->sess, NULL);