]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: conn-stream: Share endpoint struct between the CS and the mux/applet
authorChristopher Faulet <cfaulet@haproxy.com>
Wed, 23 Mar 2022 14:15:29 +0000 (15:15 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 13 Apr 2022 13:10:14 +0000 (15:10 +0200)
The conn-stream endpoint is now shared between the conn-stream and the
applet or the multiplexer. If the mux or the applet is created first, it is
responsible to also create the endpoint and share it with the conn-stream.
If the conn-stream is created first, it is the opposite.

When the endpoint is only owned by an applet or a mux, it is called an
orphan endpoint (there is no conn-stream). When it is only owned by a
conn-stream, it is called a detached endpoint (there is no mux/applet).

The last entity that owns an endpoint is responsible to release it. When a
mux or an applet is detached from a conn-stream, the conn-stream
relinquishes the endpoint to recreate a new one. This way, the endpoint
state is never lost for the mux or the applet.

26 files changed:
dev/flags/flags.c
include/haproxy/applet-t.h
include/haproxy/applet.h
include/haproxy/conn_stream-t.h
include/haproxy/conn_stream.h
include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/applet.c
src/backend.c
src/check.c
src/cli.c
src/conn_stream.c
src/dns.c
src/flt_spoe.c
src/h3.c
src/hlua.c
src/http_ana.c
src/http_client.c
src/mux_fcgi.c
src/mux_h1.c
src/mux_h2.c
src/mux_pt.c
src/mux_quic.c
src/peers.c
src/sink.c
src/stream_interface.c

index eb98b6856320211ebd61f81b1f9a90ac492e8039..08cb2e89f31247090236bf313fe8cb546e4596a5 100644 (file)
@@ -196,6 +196,8 @@ void show_endp_flags(unsigned int f)
        SHOW_FLAG(f, CS_EP_SHWN);
        SHOW_FLAG(f, CS_EP_SHRR);
        SHOW_FLAG(f, CS_EP_SHRD);
+       SHOW_FLAG(f, CS_EP_ORPHAN);
+       SHOW_FLAG(f, CS_EP_DETACHED);
        SHOW_FLAG(f, CS_EP_T_APPLET);
        SHOW_FLAG(f, CS_EP_T_MUX);
 
index 8c565dce734cf327fbe64ca2f5765fc37c775df6..bbc6b02de56de3c2628f4e079a4c594a1c27bf41 100644 (file)
@@ -58,7 +58,8 @@ struct appctx {
        struct buffer *chunk;       /* used to store unfinished commands */
        unsigned int st2;          /* output state for stats, unused by peers  */
        struct applet *applet;     /* applet this context refers to */
-       void *owner;               /* pointer to upper layer's entity (eg: conn_stream) */
+       struct conn_stream *owner;
+       struct cs_endpoint *endp;
        struct act_rule *rule;     /* rule associated with the applet. */
        int (*io_handler)(struct appctx *appctx);  /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK */
        void (*io_release)(struct appctx *appctx);  /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
index aa9124b3b3eb491cec6333c67238baed5744d224..d63b78785791a63f19465c39727eea5bd9778e62 100644 (file)
@@ -26,6 +26,7 @@
 
 #include <haproxy/api.h>
 #include <haproxy/applet-t.h>
+#include <haproxy/conn_stream.h>
 #include <haproxy/list.h>
 #include <haproxy/pool.h>
 #include <haproxy/task.h>
@@ -36,7 +37,7 @@ extern struct pool_head *pool_head_appctx;
 struct task *task_run_applet(struct task *t, void *context, unsigned int state);
 int appctx_buf_available(void *arg);
 
-struct appctx *appctx_new(struct applet *applet);
+struct appctx *appctx_new(struct applet *applet, struct cs_endpoint *endp);
 
 /* Releases an appctx previously allocated by appctx_new(). */
 static inline void __appctx_free(struct appctx *appctx)
@@ -45,6 +46,8 @@ static inline void __appctx_free(struct appctx *appctx)
        if (LIST_INLIST(&appctx->buffer_wait.list))
                LIST_DEL_INIT(&appctx->buffer_wait.list);
 
+       BUG_ON(appctx->endp && !(appctx->endp->flags & CS_EP_ORPHAN));
+       cs_endpoint_free(appctx->endp);
        pool_free(pool_head_appctx, appctx);
        _HA_ATOMIC_DEC(&nb_applets);
 }
index e76a8c3391dd90d7c97b05ca158c33ad34fdf137..842ce844379e4cb32fc4ddc617cf5d51a164e325 100644 (file)
@@ -35,7 +35,13 @@ struct stream_interface;
         CS_EP_T_MUX      = 0x00000001, /* The endpoint is a mux (the target may be NULL before the mux init) */
         CS_EP_T_APPLET   = 0x00000002, /* The endpoint is an applet */
 
-        /* unused: 0x00000004 .. 0x00000080 */
+        /* unused: 0x00000004 .. 0x00000008 */
+
+        /* Endpoint states: none == attached to a mux with a conn-stream */
+        CS_EP_DETACHED   = 0x00000010, /* The endpoint is detached (no mux/no applet) */
+        CS_EP_ORPHAN     = 0x00000020, /* The endpoint is orphan (no conn-stream) */
+
+        /* unused: 0x00000040 .. 0x00000080 */
 
         CS_EP_SHRD       = 0x00000100,  /* read shut, draining extra data */
         CS_EP_SHRR       = 0x00000200,  /* read shut, resetting extra data */
index e0c3b50bc3c902293730275c0ca0d41be236120a..ca5a99c93fc564a0bb7568a7cd35ea7d1c835878 100644 (file)
@@ -50,6 +50,7 @@ void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx);
 void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx);
 int cs_attach_strm(struct conn_stream *cs, struct stream *strm);
 
+int cs_reset_endp(struct conn_stream *cs);
 void cs_detach_endp(struct conn_stream *cs);
 void cs_detach_app(struct conn_stream *cs);
 
index 2a1cd862d8b29672bf9e5354fcce981bcfcca276..6fa952244ded1dde41065d23a9a2f2dbc516e14e 100644 (file)
@@ -11,6 +11,7 @@
 #include <haproxy/buf-t.h>
 #include <haproxy/connection-t.h>
 #include <haproxy/xprt_quic-t.h>
+#include <haproxy/conn_stream-t.h>
 
 /* Stream types */
 enum qcs_type {
@@ -88,6 +89,7 @@ struct qcc {
 struct qcs {
        struct qcc *qcc;
        struct conn_stream *cs;
+       struct cs_endpoint *endp;
        uint32_t flags;      /* QC_SF_* */
 
        struct {
index 6c97b7fda9e8b03a30392ab58017048096b45bfd..cd0a31825b1ca6f5ab7e2289505c92a56be4c2d7 100644 (file)
@@ -107,20 +107,11 @@ static inline struct qc_stream_desc *qcc_get_stream(struct qcc *qcc, uint64_t id
 
 static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *buf)
 {
-       struct cs_endpoint *endp;
        struct conn_stream *cs;
 
-       endp = cs_endpoint_new();
-       if (!endp)
+       cs = cs_new_from_mux(qcs->endp, qcs->qcc->conn->owner, buf);
+       if (!cs)
                return NULL;
-       endp->target = qcs;
-       endp->ctx = qcs->qcc->conn;
-       endp->flags |= CS_EP_T_MUX;
-       cs = cs_new_from_mux(endp, qcs->qcc->conn->owner, buf);
-       if (!cs) {
-               cs_endpoint_free(endp);
-               return NULL;
-       }
        qcs->cs = cs;
        ++qcs->qcc->nb_cs;
 
index f6fea7423231251ef27e06ac2817e58a48ddcfee..663a9df54f00d6baa1fdcbf4ac30695294d18c0c 100644 (file)
@@ -47,28 +47,47 @@ static inline void appctx_init(struct appctx *appctx)
  * appctx_free(). <applet> is assigned as the applet, but it can be NULL. The
  * applet's task is always created on the current thread.
  */
-struct appctx *appctx_new(struct applet *applet)
+struct appctx *appctx_new(struct applet *applet, struct cs_endpoint *endp)
 {
        struct appctx *appctx;
 
        appctx = pool_alloc(pool_head_appctx);
-       if (likely(appctx != NULL)) {
-               appctx->obj_type = OBJ_TYPE_APPCTX;
-               appctx->applet = applet;
-               appctx_init(appctx);
-               appctx->t = task_new_here();
-               if (unlikely(appctx->t == NULL)) {
-                       pool_free(pool_head_appctx, appctx);
-                       return NULL;
-               }
-               appctx->t->process = task_run_applet;
-               appctx->t->context = appctx;
-               LIST_INIT(&appctx->buffer_wait.list);
-               appctx->buffer_wait.target = appctx;
-               appctx->buffer_wait.wakeup_cb = appctx_buf_available;
-               _HA_ATOMIC_INC(&nb_applets);
+       if (unlikely(!appctx))
+               goto fail_appctx;
+
+       appctx_init(appctx);
+       appctx->obj_type = OBJ_TYPE_APPCTX;
+       appctx->applet = applet;
+
+       if (!endp) {
+               endp = cs_endpoint_new();
+               if (!endp)
+                       goto fail_endp;
+               endp->target = appctx;
+               endp->ctx = appctx;
+               endp->flags |= (CS_EP_T_APPLET|CS_EP_ORPHAN);
        }
+       appctx->endp = endp;
+
+       appctx->t = task_new_here();
+       if (unlikely(!appctx->t))
+               goto fail_task;
+       appctx->t->process = task_run_applet;
+       appctx->t->context = appctx;
+
+       LIST_INIT(&appctx->buffer_wait.list);
+       appctx->buffer_wait.target = appctx;
+       appctx->buffer_wait.wakeup_cb = appctx_buf_available;
+
+       _HA_ATOMIC_INC(&nb_applets);
        return appctx;
+
+  fail_task:
+       cs_endpoint_free(appctx->endp);
+  fail_endp:
+       pool_free(pool_head_appctx, appctx);
+  fail_appctx:
+       return NULL;
 }
 
 /* Callback used to wake up an applet when a buffer is available. The applet
index f8c13b4f4775414a15cb894603e05eb076556e0f..f57d05d84b68c4025c0230b7a6d5d789b4ef97b1 100644 (file)
@@ -1496,8 +1496,9 @@ static int connect_server(struct stream *s)
 
                        if (avail >= 1) {
                                if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
-                                       cs_detach_endp(s->csb);
                                        srv_conn = NULL;
+                                       if (cs_reset_endp(s->csb) < 0)
+                                               return SF_ERR_INTERNAL;
                                }
                        }
                        else
@@ -2290,7 +2291,29 @@ void back_handle_st_cer(struct stream *s)
         * Note: the stream-interface will be switched to ST_REQ, ST_ASS or
         * ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset.
         */
-       cs_detach_endp(s->csb);
+       if (cs_reset_endp(s->csb) < 0) {
+               if (!si->err_type)
+                       si->err_type = SI_ET_CONN_OTHER;
+
+               if (objt_server(s->target))
+                       _HA_ATOMIC_INC(&objt_server(s->target)->counters.internal_errors);
+               _HA_ATOMIC_INC(&s->be->be_counters.internal_errors);
+               sess_change_server(s, NULL);
+               if (may_dequeue_tasks(objt_server(s->target), s->be))
+                       process_srv_queue(objt_server(s->target));
+
+               /* shutw is enough so stop a connecting socket */
+               si_shutw(si);
+               s->req.flags |= CF_WRITE_ERROR;
+               s->res.flags |= CF_READ_ERROR;
+
+               si->state = SI_ST_CLO;
+               if (s->srv_error)
+                       s->srv_error(s, si);
+
+               DBG_TRACE_STATE("error resetting endpoint", STRM_EV_STRM_PROC|STRM_EV_SI_ST|STRM_EV_STRM_ERR, s);
+               goto end;
+       }
 
        stream_choose_redispatch(s);
 
index 44583c243d8df3645cf7def476fc7d71b42e663d..47a53db3dc6aa718e0eef666072e60ccb05615ae 100644 (file)
@@ -1134,6 +1134,15 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
                task_set_affinity(t, tid_bit);
 
                check->current_step = NULL;
+
+               if (check->cs->flags & CS_FL_ERROR) {
+                       check->cs->flags &= ~CS_FL_ERROR;
+                       check->cs->endp = cs_endpoint_new();
+                       if (!check->cs->endp)
+                               check->cs->flags |= CS_FL_ERROR;
+                       else
+                               check->cs->endp->flags |=  CS_EP_DETACHED;
+               }
                tcpcheck_main(check);
                expired = 0;
        }
@@ -1155,9 +1164,10 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
                else {
                        if (check->state & CHK_ST_CLOSE_CONN) {
                                TRACE_DEVEL("closing current connection", CHK_EV_TASK_WAKE|CHK_EV_HCHK_RUN, check);
-                               cs_detach_endp(check->cs);
-                               conn = NULL;
                                check->state &= ~CHK_ST_CLOSE_CONN;
+                               if (cs_reset_endp(check->cs) < 0)
+                                       check->cs->flags |= CS_FL_ERROR;
+                               conn = NULL;
                                tcpcheck_main(check);
                        }
                        if (check->result == CHK_RES_UNKNOWN) {
@@ -1190,7 +1200,13 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
         * the tasklet
         */
        tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
-       cs_detach_endp(check->cs);
+
+       if (cs_reset_endp(check->cs) < 0) {
+               /* If an error occurred at this stage, it will be fixed by the
+                * next check
+                */
+               check->cs->flags |= CS_FL_ERROR;
+       }
 
        if (check->sess != NULL) {
                vars_prune(&check->vars, check->sess, NULL);
index 25d28f14d1f54d07abd6a8408cc73dc2b4fe837a..665631fe887c87484f49c892a4116501e57265cd 100644 (file)
--- a/src/cli.c
+++ b/src/cli.c
@@ -2753,8 +2753,14 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
                 * connection.
                 */
                if (!si_conn_ready(cs_si(s->csb))) {
-                       cs_detach_endp(s->csb);
                        s->srv_conn = NULL;
+                       if (cs_reset_endp(s->csb) < 0) {
+                               if (!cs_si(s->csb)->err_type)
+                                       cs_si(s->csb)->err_type = SI_ET_CONN_OTHER;
+                               if (s->srv_error)
+                                       s->srv_error(s, cs_si(s->csb));
+                               return 1;
+                       }
                }
 
                sockaddr_free(&(cs_si(s->csb)->dst));
index 24a22b5219f68a872b71a425cf8472edcc19dba7..d45eba63c11e1f687dc6ff09abfcee02c8af61fe 100644 (file)
@@ -86,6 +86,7 @@ struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *se
                pool_free(pool_head_connstream, cs);
                cs = NULL;
        }
+       endp->flags &= ~CS_EP_ORPHAN;
        return cs;
 }
 
@@ -102,6 +103,7 @@ struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session
                pool_free(pool_head_connstream, cs);
                cs = NULL;
        }
+       endp->flags &= ~CS_EP_ORPHAN;
        return cs;
 }
 
@@ -113,6 +115,7 @@ struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags)
        if (unlikely(!cs))
                return NULL;
        cs->flags |= flags;
+       cs->endp->flags |=  CS_EP_DETACHED;
        cs->si = si_new(cs);
        if (unlikely(!cs->si)) {
                cs_free(cs);
@@ -132,6 +135,7 @@ struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags)
        if (unlikely(!cs))
                return NULL;
        cs->flags |= flags;
+       cs->endp->flags |=  CS_EP_DETACHED;
        cs->app = &check->obj_type;
        cs->data_cb = &check_conn_cb;
        return cs;
@@ -144,6 +148,7 @@ void cs_free(struct conn_stream *cs)
 {
        si_free(cs->si);
        if (cs->endp) {
+               BUG_ON(!(cs->endp->flags & CS_EP_DETACHED));
                cs_endpoint_free(cs->endp);
        }
        pool_free(pool_head_connstream, cs);
@@ -158,6 +163,7 @@ void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
        cs->endp->target = target;
        cs->endp->ctx = ctx;
        cs->endp->flags |= CS_EP_T_MUX;
+       cs->endp->flags &= ~CS_EP_DETACHED;
        if (!conn->ctx)
                conn->ctx = cs;
        if (cs_strm(cs)) {
@@ -176,8 +182,9 @@ void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
        cs->endp->target = target;
        cs->endp->ctx = ctx;
        cs->endp->flags |= CS_EP_T_APPLET;
+       cs->endp->flags &= ~CS_EP_DETACHED;
        appctx->owner = cs;
-       if (cs->si) {
+       if (cs_strm(cs)) {
                cs->si->ops = &si_applet_ops;
                cs->data_cb = NULL;
        }
@@ -191,7 +198,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
        cs->si = si_new(cs);
        if (unlikely(!cs->si))
                return -1;
-
+       cs->endp->flags &= ~CS_EP_ORPHAN;
        if (cs->endp->flags & CS_EP_T_MUX) {
                cs->si->ops = &si_conn_ops;
                cs->data_cb = &si_conn_cb;
@@ -220,9 +227,11 @@ void cs_detach_endp(struct conn_stream *cs)
 
                if (conn->mux) {
                        /* TODO: handle unsubscribe for healthchecks too */
+                       cs->endp->flags |= CS_EP_ORPHAN;
                        if (cs->si && cs->si->wait_event.events != 0)
                                conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
                        conn->mux->detach(cs);
+                       cs->endp = NULL;
                }
                else {
                        /* It's too early to have a mux, let's just destroy
@@ -238,13 +247,17 @@ void cs_detach_endp(struct conn_stream *cs)
        else if (cs->endp->flags & CS_EP_T_APPLET) {
                struct appctx *appctx = cs_appctx(cs);
 
+               cs->endp->flags |= CS_EP_ORPHAN;
                if (cs->si)
                        si_applet_release(cs->si);
                appctx_free(appctx);
+               cs->endp = NULL;
        }
 
        if (cs->endp) {
+               /* the cs is the only one one the endpoint */
                cs_endpoint_init(cs->endp);
+               cs->endp->flags |= CS_EP_DETACHED;
        }
 
        /* FIXME: Rest CS for now but must be reviewed. CS flags are only
@@ -266,6 +279,21 @@ void cs_detach_app(struct conn_stream *cs)
        cs->si  = NULL;
        cs->data_cb = NULL;
 
-       if (!cs->endp || !cs->endp->target)
+       if (!cs->endp || (cs->endp->flags & CS_EP_DETACHED))
                cs_free(cs);
 }
+
+int cs_reset_endp(struct conn_stream *cs)
+{
+       BUG_ON(!cs->app);
+       cs_detach_endp(cs);
+       if (!cs->endp) {
+               cs->endp = cs_endpoint_new();
+               if (!cs->endp) {
+                       cs->flags |= CS_FL_ERROR;
+                       return -1;
+               }
+               cs->endp->flags |= CS_EP_DETACHED;
+       }
+       return 0;
+}
index e6f70935c722f18dda003677e7471341aee88632..7228581e449e5d51c20d5c9b9c46f9e285be51dc 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -887,13 +887,12 @@ static struct appctx *dns_session_create(struct dns_session *ds)
 {
        struct appctx *appctx;
        struct session *sess;
-       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
        struct applet *applet = &dns_session_applet;
        struct sockaddr_storage *addr = NULL;
 
-       appctx = appctx_new(applet);
+       appctx = appctx_new(applet, NULL);
        if (!appctx)
                goto out_close;
        appctx->ctx.sft.ptr = (void *)ds;
@@ -907,17 +906,9 @@ static struct appctx *dns_session_create(struct dns_session *ds)
        if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
                goto out_free_sess;
 
-       endp = cs_endpoint_new();
-       if (!endp)
-               goto out_free_addr;
-       endp->target = appctx;
-       endp->ctx = appctx;
-       endp->flags |= CS_EP_T_APPLET;
-
-       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
        if (!cs) {
                ha_alert("Failed to initialize stream in dns_session_create().\n");
-               cs_endpoint_free(endp);
                goto out_free_addr;
        }
 
index 86fb646f1c058b366159eea5419f81f40220f766..1e731b123f3f7b120e3e80eaf945edd51050cc26 100644 (file)
@@ -1988,11 +1988,10 @@ spoe_create_appctx(struct spoe_config *conf)
 {
        struct appctx      *appctx;
        struct session     *sess;
-       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream      *strm;
 
-       if ((appctx = appctx_new(&spoe_applet)) == NULL)
+       if ((appctx = appctx_new(&spoe_applet, NULL)) == NULL)
                goto out_error;
 
        appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx);
@@ -2025,18 +2024,9 @@ spoe_create_appctx(struct spoe_config *conf)
        if (!sess)
                goto out_free_spoe;
 
-       endp = cs_endpoint_new();
-       if (!endp)
+       cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
+       if (!cs)
                goto out_free_sess;
-       endp->target = appctx;
-       endp->ctx = appctx;
-       endp->flags |= CS_EP_T_APPLET;
-
-       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
-       if (!cs) {
-               cs_endpoint_free(endp);
-               goto out_free_sess;
-       }
 
        strm = DISGUISE(cs_strm(cs));
        stream_set_backend(strm, conf->agent->b.be);
index ec219a5c0d3d8a738c5c5778270785149e3d2f73..0fce7b0faa091e702e561e1345a2ed970b47a7cc 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -176,7 +176,6 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
        cs = qc_attach_cs(qcs, &htx_buf);
        if (!cs)
                return 1;
-       cs->endp->flags |= CS_EP_NOT_FIRST;
 
        /* buffer is transferred to conn_stream and set to NULL
         * except on stream creation error.
index 705c15c19a2b5edaa29d7a56f84270a457e5b2c9..af26aae03c7dc480e4a630d93eaeeec6ae608e30 100644 (file)
@@ -2918,7 +2918,6 @@ __LJMP static int hlua_socket_new(lua_State *L)
        struct hlua_socket *socket;
        struct appctx *appctx;
        struct session *sess;
-       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
 
@@ -2946,7 +2945,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
        lua_setmetatable(L, -2);
 
        /* Create the applet context */
-       appctx = appctx_new(&update_applet);
+       appctx = appctx_new(&update_applet, NULL);
        if (!appctx) {
                hlua_pusherror(L, "socket: out of memory");
                goto out_fail_conf;
@@ -2964,17 +2963,9 @@ __LJMP static int hlua_socket_new(lua_State *L)
                goto out_fail_appctx;
        }
 
-       endp = cs_endpoint_new();
-       if (!endp)
-               goto out_fail_sess;
-       endp->target = appctx;
-       endp->ctx = appctx;
-       endp->flags |= CS_EP_T_APPLET;
-
-       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
        if (!cs) {
                hlua_pusherror(L, "socket: out of memory");
-               cs_endpoint_free(endp);
                goto out_fail_sess;
        }
 
index 8c477708fe69a40d0066249d55722b851453916e..57bb3f1cd38720a6e3fff12faebb794d5f70a794 100644 (file)
@@ -1254,7 +1254,13 @@ static __inline int do_l7_retry(struct stream *s, struct stream_interface *si)
        res->to_forward = 0;
        res->analyse_exp = TICK_ETERNITY;
        res->total = 0;
-       cs_detach_endp(s->csb);
+
+       if (cs_reset_endp(s->csb) < 0) {
+               s->csb->flags |= CS_FL_ERROR;
+               if (!(s->flags & SF_ERR_MASK))
+                       s->flags |= SF_ERR_INTERNAL;
+               return -1;
+       }
 
        b_free(&req->buf);
        /* Swap the L7 buffer with the channel buffer */
index 55701e5eabd7ecb458b2fbbb002efe3be0508e7c..1e87a5033bc86726e12fd7a88400e5f40bf2d8a4 100644 (file)
@@ -455,7 +455,6 @@ struct appctx *httpclient_start(struct httpclient *hc)
        struct applet *applet = &httpclient_applet;
        struct appctx *appctx;
        struct session *sess;
-       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
        struct sockaddr_storage *addr = NULL;
@@ -480,7 +479,7 @@ struct appctx *httpclient_start(struct httpclient *hc)
 
        /* The HTTP client will be created in the same thread as the caller,
         * avoiding threading issues */
-       appctx = appctx_new(applet);
+       appctx = appctx_new(applet, NULL);
        if (!appctx)
                goto out;
 
@@ -499,17 +498,9 @@ struct appctx *httpclient_start(struct httpclient *hc)
        if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst)))
                goto out_free_sess;
 
-       endp = cs_endpoint_new();
-       if (!endp)
-               goto out_free_addr;
-       endp->target = appctx;
-       endp->ctx = appctx;
-       endp->flags |= CS_EP_T_APPLET;
-
-       cs = cs_new_from_applet(endp, sess, &hc->req.buf);
+       cs = cs_new_from_applet(appctx->endp, sess, &hc->req.buf);
        if (!cs) {
                ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
-               cs_endpoint_free(endp);
                goto out_free_addr;
        }
        s = DISGUISE(cs_strm(cs));
index e18d9abcf678c078fdf19a15388ea8f19ad8e307..a74554e5534ff79985d7a7277b33ccd91922d15b 100644 (file)
@@ -155,6 +155,7 @@ enum fcgi_strm_st {
 /* FCGI stream descriptor */
 struct fcgi_strm {
        struct conn_stream *cs;
+       struct cs_endpoint *endp;
        struct session *sess;
        struct fcgi_conn *fconn;
 
@@ -1042,6 +1043,8 @@ static void fcgi_strm_destroy(struct fcgi_strm *fstrm)
         */
        LIST_DEL_INIT(&fstrm->send_list);
        tasklet_free(fstrm->shut_tl);
+       BUG_ON(fstrm->endp && !(fstrm->endp->flags & CS_EP_ORPHAN));
+       cs_endpoint_free(fstrm->endp);
        pool_free(pool_head_fcgi_strm, fstrm);
 
        TRACE_LEAVE(FCGI_EV_FSTRM_END, conn);
@@ -1077,6 +1080,7 @@ static struct fcgi_strm *fcgi_strm_new(struct fcgi_conn *fconn, int id)
        LIST_INIT(&fstrm->send_list);
        fstrm->fconn = fconn;
        fstrm->cs = NULL;
+       fstrm->endp = NULL;
        fstrm->flags = FCGI_SF_NONE;
        fstrm->proto_status = 0;
        fstrm->state = FCGI_SS_IDLE;
@@ -1132,6 +1136,7 @@ static struct fcgi_strm *fcgi_conn_stream_new(struct fcgi_conn *fconn, struct co
        }
        cs_attach_mux(cs, fstrm, fconn->conn);
        fstrm->cs = cs;
+       fstrm->endp = cs->endp;
        fstrm->sess = sess;
        fconn->nb_cs++;
 
@@ -3117,7 +3122,7 @@ static int fcgi_process(struct fcgi_conn *fconn)
 
                while (node) {
                        fstrm = container_of(node, struct fcgi_strm, by_id);
-                       if (fstrm->cs && fstrm->cs->endp->flags & CS_EP_WAIT_FOR_HS)
+                       if (fstrm->cs && fstrm->endp->flags & CS_EP_WAIT_FOR_HS)
                                fcgi_strm_notify_recv(fstrm);
                        node = eb32_next(node);
                }
index 9ebd0ffb1d45c55cddf92de5e6e7d85cab5cd69c..c504c0142d9884d7d9d4c3bc025f01e069927042 100644 (file)
@@ -119,6 +119,7 @@ struct h1c {
 struct h1s {
        struct h1c *h1c;
        struct conn_stream *cs;
+       struct cs_endpoint *endp;
        uint32_t flags;                /* Connection flags: H1S_F_* */
 
        struct wait_event *subs;      /* Address of the wait_event the conn_stream associated is waiting on */
@@ -716,27 +717,17 @@ static inline size_t h1s_data_pending(const struct h1s *h1s)
 static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
 {
        struct h1c *h1c = h1s->h1c;
-       struct cs_endpoint *endp;
 
        TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
 
-       endp = cs_endpoint_new();
-       if (!endp) {
-               TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
-               goto err;
-       }
-       endp->target = h1s;
-       endp->ctx = h1c->conn;
-       endp->flags |= CS_EP_T_MUX;
        if (h1s->flags & H1S_F_NOT_FIRST)
-               endp->flags |= CS_EP_NOT_FIRST;
+               h1s->endp->flags |= CS_EP_NOT_FIRST;
        if (h1s->req.flags & H1_MF_UPG_WEBSOCKET)
-               endp->flags |= CS_EP_WEBSOCKET;
+               h1s->endp->flags |= CS_EP_WEBSOCKET;
 
-       h1s->cs = cs_new_from_mux(endp, h1c->conn->owner, input);
+       h1s->cs = cs_new_from_mux(h1s->endp, h1c->conn->owner, input);
        if (!h1s->cs) {
                TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
-               cs_endpoint_free(endp);
                goto err;
        }
 
@@ -785,6 +776,7 @@ static struct h1s *h1s_new(struct h1c *h1c)
        h1c->h1s = h1s;
        h1s->sess = NULL;
        h1s->cs = NULL;
+       h1s->endp = NULL;
        h1s->flags = H1S_F_WANT_KAL;
        h1s->subs = NULL;
        h1s->rxbuf = BUF_NULL;
@@ -811,9 +803,8 @@ static struct h1s *h1s_new(struct h1c *h1c)
        return NULL;
 }
 
-static struct h1s *h1c_frt_stream_new(struct h1c *h1c)
+static struct h1s *h1c_frt_stream_new(struct h1c *h1c, struct conn_stream *cs, struct session *sess)
 {
-       struct session *sess = h1c->conn->owner;
        struct h1s *h1s;
 
        TRACE_ENTER(H1_EV_H1S_NEW, h1c->conn);
@@ -822,6 +813,20 @@ static struct h1s *h1c_frt_stream_new(struct h1c *h1c)
        if (!h1s)
                goto fail;
 
+       if (cs) {
+               cs_attach_mux(cs, h1s, h1c->conn);
+               h1s->cs = cs;
+               h1s->endp = cs->endp;
+       }
+       else {
+               h1s->endp = cs_endpoint_new();
+               if (!h1s->endp)
+                       goto fail;
+               h1s->endp->target = h1s;
+               h1s->endp->ctx = h1c->conn;
+               h1s->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN);
+       }
+
        h1s->sess = sess;
 
        if (h1c->px->options2 & PR_O2_REQBUG_OK)
@@ -834,6 +839,7 @@ static struct h1s *h1c_frt_stream_new(struct h1c *h1c)
 
   fail:
        TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn);
+       pool_free(pool_head_h1s, h1s);
        return NULL;
 }
 
@@ -850,6 +856,7 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s
        cs_attach_mux(cs, h1s, h1c->conn);
        h1s->flags |= H1S_F_RX_BLK;
        h1s->cs = cs;
+       h1s->endp = cs->endp;
        h1s->sess = sess;
 
        h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
@@ -903,6 +910,8 @@ static void h1s_destroy(struct h1s *h1s)
                }
 
                HA_ATOMIC_DEC(&h1c->px_counters->open_streams);
+               BUG_ON(h1s->endp && !(h1s->endp->flags & CS_EP_ORPHAN));
+               cs_endpoint_free(h1s->endp);
                pool_free(pool_head_h1s, h1s);
        }
 }
@@ -990,12 +999,8 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
        }
        else if (conn_ctx) {
                /* Upgraded frontend connection (from TCP) */
-               struct conn_stream *cs = conn_ctx;
-
-               if (!h1c_frt_stream_new(h1c))
+               if (!h1c_frt_stream_new(h1c, conn_ctx, h1c->conn->owner))
                        goto fail;
-               h1c->h1s->cs = cs;
-               cs_attach_mux(cs, h1c->h1s, conn);
 
                /* Attach the CS but Not ready yet */
                h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;
@@ -1879,11 +1884,11 @@ static size_t h1_process_demux(struct h1c *h1c, struct buffer *buf, size_t count
        /* Here h1s->cs is always defined */
        if (!(h1m->flags & H1_MF_CHNK) && (h1m->state == H1_MSG_DATA || (h1m->state == H1_MSG_TUNNEL))) {
                TRACE_STATE("notify the mux can use splicing", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
-               h1s->cs->endp->flags |= CS_EP_MAY_SPLICE;
+               h1s->endp->flags |= CS_EP_MAY_SPLICE;
        }
        else {
                TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
-               h1s->cs->endp->flags &= ~CS_EP_MAY_SPLICE;
+               h1s->endp->flags &= ~CS_EP_MAY_SPLICE;
        }
 
        /* Set EOI on conn-stream in DONE state iff:
@@ -2948,7 +2953,7 @@ static int h1_process(struct h1c * h1c)
 
                /* Create the H1 stream if not already there */
                if (!h1s) {
-                       h1s = h1c_frt_stream_new(h1c);
+                       h1s = h1c_frt_stream_new(h1c, NULL, h1c->conn->owner);
                        if (!h1s) {
                                b_reset(&h1c->ibuf);
                                h1c->flags = (h1c->flags & ~(H1C_F_ST_IDLE|H1C_F_WAIT_NEXT_REQ)) | H1C_F_ST_ERROR;
index 39e5ece76af58f48a3ff3f037cdb2cce99a8de1a..d4f5320ecca630280d9ad5e72b1d7b5800d9f168 100644 (file)
@@ -214,6 +214,7 @@ enum h2_ss {
  */
 struct h2s {
        struct conn_stream *cs;
+       struct cs_endpoint *endp;
        struct session *sess;
        struct h2c *h2c;
        struct eb32_node by_id; /* place in h2c's streams_by_id */
@@ -1520,6 +1521,8 @@ static void h2s_destroy(struct h2s *h2s)
 
        /* ditto, calling tasklet_free() here should be ok */
        tasklet_free(h2s->shut_tl);
+       BUG_ON(h2s->endp && !(h2s->endp->flags & CS_EP_ORPHAN));
+       cs_endpoint_free(h2s->endp);
        pool_free(pool_head_h2s, h2s);
 
        TRACE_LEAVE(H2_EV_H2S_END, conn);
@@ -1552,6 +1555,7 @@ static struct h2s *h2s_new(struct h2c *h2c, int id)
        LIST_INIT(&h2s->list);
        h2s->h2c       = h2c;
        h2s->cs        = NULL;
+       h2s->endp      = NULL;
        h2s->sws       = 0;
        h2s->flags     = H2_SF_NONE;
        h2s->errcode   = H2_ERR_NO_ERROR;
@@ -1590,7 +1594,6 @@ static struct h2s *h2s_new(struct h2c *h2c, int id)
 static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *input, uint32_t flags)
 {
        struct session *sess = h2c->conn->owner;
-       struct cs_endpoint *endp;
        struct h2s *h2s;
 
        TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn);
@@ -1602,17 +1605,18 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
        if (!h2s)
                goto out;
 
-       endp = cs_endpoint_new();
-       if (!endp)
+       h2s->endp = cs_endpoint_new();
+       if (!h2s->endp)
                goto out_close;
-       endp->target = h2s;
-       endp->ctx = h2c->conn;
-       endp->flags |= (CS_EP_T_MUX|CS_EP_NOT_FIRST);
+       h2s->endp->target = h2s;
+       h2s->endp->ctx = h2c->conn;
+       h2s->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST);
+
        /* FIXME wrong analogy between ext-connect and websocket, this need to
         * be refine.
         */
        if (flags & H2_SF_EXT_CONNECT_RCVD)
-               endp->flags |= CS_EP_WEBSOCKET;
+               h2s->endp->flags |= CS_EP_WEBSOCKET;
 
        /* The stream will record the request's accept date (which is either the
         * end of the connection's or the date immediately after the previous
@@ -1621,9 +1625,8 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
         */
        sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake;
 
-       h2s->cs = cs_new_from_mux(endp, sess, input);
+       h2s->cs = cs_new_from_mux(h2s->endp, sess, input);
        if (!h2s->cs) {
-               cs_endpoint_free(endp);
                goto out_close;
        }
        h2c->nb_cs++;
@@ -1675,6 +1678,7 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, s
 
        cs_attach_mux(cs, h2s, h2c->conn);
        h2s->cs = cs;
+       h2s->endp = cs->endp;
        h2s->sess = sess;
        h2c->nb_cs++;
 
@@ -4080,7 +4084,7 @@ static int h2_process(struct h2c *h2c)
 
                while (node) {
                        h2s = container_of(node, struct h2s, by_id);
-                       if (h2s->cs && h2s->cs->endp->flags & CS_EP_WAIT_FOR_HS)
+                       if (h2s->cs && h2s->endp->flags & CS_EP_WAIT_FOR_HS)
                                h2s_notify_recv(h2s);
                        node = eb32_next(node);
                }
@@ -5322,7 +5326,7 @@ static size_t h2s_frt_make_resp_headers(struct h2s *h2s, struct htx *htx)
                        break;
        }
 
-       if (!h2s->cs || h2s->cs->endp->flags & CS_EP_SHW) {
+       if (!h2s->cs || h2s->endp->flags & CS_EP_SHW) {
                /* Response already closed: add END_STREAM */
                es_now = 1;
        }
@@ -5742,7 +5746,7 @@ static size_t h2s_bck_make_req_headers(struct h2s *h2s, struct htx *htx)
                        break;
        }
 
-       if (!h2s->cs || h2s->cs->endp->flags & CS_EP_SHW) {
+       if (!h2s->cs || h2s->endp->flags & CS_EP_SHW) {
                /* Request already closed: add END_STREAM */
                es_now = 1;
        }
index 5bd71932e324ead35e83fd237797f16c1ef6d543..00cf542eda033e4a0f5b6deaae68aa06ae5e639e 100644 (file)
@@ -21,6 +21,7 @@
 
 struct mux_pt_ctx {
        struct conn_stream *cs;
+       struct cs_endpoint *endp;
        struct connection *conn;
        struct wait_event wait_event;
 };
@@ -207,6 +208,8 @@ static void mux_pt_destroy(struct mux_pt_ctx *ctx)
                if (conn && ctx->wait_event.events != 0)
                        conn->xprt->unsubscribe(conn, conn->xprt_ctx, ctx->wait_event.events,
                                                &ctx->wait_event);
+               BUG_ON(ctx->endp && !(ctx->endp->flags & CS_EP_ORPHAN));
+               cs_endpoint_free(ctx->endp);
                pool_free(pool_head_pt_ctx, ctx);
        }
 
@@ -272,7 +275,6 @@ struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status)
 static int mux_pt_init(struct connection *conn, struct proxy *prx, struct session *sess,
                       struct buffer *input)
 {
-       struct cs_endpoint *endp;
        struct conn_stream *cs = conn->ctx;
        struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
 
@@ -292,21 +294,27 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
        ctx->conn = conn;
 
        if (!cs) {
-               endp = cs_endpoint_new();
-               if (!endp)
+               ctx->endp = cs_endpoint_new();
+               if (!ctx->endp) {
+                       TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
                        goto fail_free_ctx;
-               endp->target = ctx;
-               endp->ctx = conn;
-               endp->flags |= CS_EP_T_MUX;
+               }
+               ctx->endp->target = ctx;
+               ctx->endp->ctx = conn;
+               ctx->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN);
 
-               cs = cs_new_from_mux(endp, sess, input);
+               cs = cs_new_from_mux(ctx->endp, sess, input);
                if (!cs) {
                        TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
-                       cs_endpoint_free(endp);
-                       goto fail_free_ctx;
+                       goto fail_free_endp;
                }
                TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
        }
+       else {
+               cs_attach_mux(cs, ctx, conn);
+               ctx->cs = cs;
+               ctx->endp = cs->endp;
+       }
        conn->ctx = ctx;
        ctx->cs = cs;
        cs->flags |= CS_FL_RCV_MORE;
@@ -316,6 +324,8 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
        TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
        return 0;
 
+ fail_free_endp:
+       cs_endpoint_free(ctx->endp);
  fail_free_ctx:
        if (ctx->wait_event.tasklet)
                tasklet_free(ctx->wait_event.tasklet);
@@ -399,8 +409,11 @@ static void mux_pt_destroy_meth(void *ctx)
        struct mux_pt_ctx *pt = ctx;
 
        TRACE_POINT(PT_EV_CONN_END, pt->conn, pt->cs);
-       if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt)
+       if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt) {
+               if (pt->conn->ctx != pt)
+                       pt->endp = NULL;
                mux_pt_destroy(pt);
+       }
 }
 
 /*
@@ -411,15 +424,14 @@ static void mux_pt_detach(struct conn_stream *cs)
        struct connection *conn = __cs_conn(cs);
        struct mux_pt_ctx *ctx;
 
-       ALREADY_CHECKED(conn);
-       ctx = conn->ctx;
-
        TRACE_ENTER(PT_EV_STRM_END, conn, cs);
 
+       ctx = conn->ctx;
+       ctx->cs = NULL;
+
        /* Subscribe, to know if we got disconnected */
        if (!conn_is_back(conn) && conn->owner != NULL &&
            !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
-               ctx->cs = NULL;
                conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
        } else {
                /* There's no session attached to that connection, destroy it */
index f61602f7bbfd3822cd99a923e112c00b07a7763b..c64f8d0f8e0b3bbee362bdbe4a594d2978902408 100644 (file)
@@ -117,9 +117,19 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        qcs->cs = NULL;
        qcs->flags = QC_SF_NONE;
 
+       qcs->endp = cs_endpoint_new();
+       if (!qcs->endp) {
+               pool_free(pool_head_qcs, qcs);
+               return NULL;
+       }
+       qcs->endp->target = qcs;
+       qcs->endp->ctx = qcc->conn;
+       qcs->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST);
+
        qcs->id = id;
        /* store transport layer stream descriptor in qcc tree */
        eb64_insert(&qcc->streams_by_id, &stream->by_id);
+
        qcc->strms[type].nb_streams++;
 
        /* If stream is local, use peer remote-limit, or else the opposite. */
@@ -160,6 +170,8 @@ void qcs_free(struct qcs *qcs)
        /* stream desc must be removed from MUX tree before release it */
        eb64_delete(&qcs->stream->by_id);
        qc_stream_desc_release(qcs->stream, qcs->qcc->conn->handle.qc);
+       BUG_ON(qcs->endp && !(qcs->endp->flags & CS_EP_ORPHAN));
+       cs_endpoint_free(qcs->endp);
        pool_free(pool_head_qcs, qcs);
 }
 
index 9701963182f456b490e8b499cced76b8d2dc4b8e..c31aa818fbfa0a3d0960042f9372f9794887d659 100644 (file)
@@ -3181,7 +3181,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        struct proxy *p = peers->peers_fe; /* attached frontend */
        struct appctx *appctx;
        struct session *sess;
-       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
        struct sockaddr_storage *addr = NULL;
@@ -3193,7 +3192,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        peer->last_hdshk = now_ms;
        s = NULL;
 
-       appctx = appctx_new(&peer_applet);
+       appctx = appctx_new(&peer_applet, NULL);
        if (!appctx)
                goto out_close;
 
@@ -3209,17 +3208,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr)))
                goto out_free_sess;
 
-       endp = cs_endpoint_new();
-       if (!endp)
-               goto out_free_addr;
-       endp->target = appctx;
-       endp->ctx = appctx;
-       endp->flags |= CS_EP_T_APPLET;
-
-       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
        if (!cs) {
                ha_alert("Failed to initialize stream in peer_session_create().\n");
-               cs_endpoint_free(endp);
                goto out_free_addr;
        }
 
index 0f016890dfc71f00730284fc02c54d6ad0df45c8..c5cd1d7ef6a0777e9d8b586744868b5f854f0650 100644 (file)
@@ -636,7 +636,6 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
        struct proxy *p = sink->forward_px;
        struct appctx *appctx;
        struct session *sess;
-       struct cs_endpoint *endp;
        struct conn_stream *cs;
        struct stream *s;
        struct applet *applet = &sink_forward_applet;
@@ -645,7 +644,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
        if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
                applet = &sink_forward_oc_applet;
 
-       appctx = appctx_new(applet);
+       appctx = appctx_new(applet, NULL);
        if (!appctx)
                goto out_close;
 
@@ -660,17 +659,9 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
        if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
                goto out_free_sess;
 
-       endp = cs_endpoint_new();
-       if (!endp)
-               goto out_free_addr;
-       endp->target = appctx;
-       endp->ctx = appctx;
-       endp->flags |= CS_EP_T_APPLET;
-
-       cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+       cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
        if (!cs) {
                ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
-               cs_endpoint_free(endp);
                goto out_free_addr;
        }
        s = DISGUISE(cs_strm(cs));
index 3609647564b2e85fc37118b12b74f466771dcf42..134dfcc3d5e13299a325fa043604b64e25962d8e 100644 (file)
@@ -339,7 +339,7 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a
 
        DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si));
 
-       appctx = appctx_new(app);
+       appctx = appctx_new(app, si->cs->endp);
        if (!appctx)
                return NULL;
        cs_attach_applet(si->cs, appctx, appctx);