From: Christopher Faulet Date: Wed, 23 Mar 2022 10:01:09 +0000 (+0100) Subject: MEDIUM: conn-stream: Pre-allocate endpoint to create CS from muxes and applets X-Git-Tag: v2.6-dev6~101 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a9e8b3979df11eb9957a0da545dbc75a409eb548;p=thirdparty%2Fhaproxy.git MEDIUM: conn-stream: Pre-allocate endpoint to create CS from muxes and applets It is a transient commit to prepare next changes. Now, when a conn-stream is created from an applet or a multiplexer, an endpoint is always provided. In addition, the API to create a conn-stream was specialized to have one function per type. The next step will be to share the endpoint structure. --- diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h index 97164ac30e..97b9c347b4 100644 --- a/include/haproxy/applet.h +++ b/include/haproxy/applet.h @@ -59,7 +59,7 @@ static inline void appctx_init(struct appctx *appctx) * appctx_free(). is assigned as the applet, but it can be NULL. The * applet's task is always created on the current thread. */ -static inline struct appctx *appctx_new(struct applet *applet, void *owner) +static inline struct appctx *appctx_new(struct applet *applet) { struct appctx *appctx; @@ -67,7 +67,6 @@ static inline struct appctx *appctx_new(struct applet *applet, void *owner) if (likely(appctx != NULL)) { appctx->obj_type = OBJ_TYPE_APPCTX; appctx->applet = applet; - appctx->owner = owner; appctx_init(appctx); appctx->t = task_new_here(); if (unlikely(appctx->t == NULL)) { diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h index 638b0cf2d6..e0c3b50bc3 100644 --- a/include/haproxy/conn_stream.h +++ b/include/haproxy/conn_stream.h @@ -23,11 +23,13 @@ #define _HAPROXY_CONN_STREAM_H #include -#include #include #include #include +struct buffer; +struct session; +struct appctx; struct stream; struct stream_interface; struct check; @@ -38,10 +40,16 @@ struct cs_endpoint *cs_endpoint_new(); void cs_endpoint_free(struct cs_endpoint *endp); struct conn_stream *cs_new(struct cs_endpoint *endp); +struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input); +struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input); +struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags); +struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags); void cs_free(struct conn_stream *cs); -void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx); -void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx); -int cs_attach_app(struct conn_stream *cs, enum obj_type *app); + +void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx); +void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx); +int cs_attach_strm(struct conn_stream *cs, struct stream *strm); + void cs_detach_endp(struct conn_stream *cs); void cs_detach_app(struct conn_stream *cs); diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index aab468752f..6c97b7fda9 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -115,15 +115,13 @@ static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *b return NULL; endp->target = qcs; endp->ctx = qcs->qcc->conn; - cs = cs_new(endp); + endp->flags |= CS_EP_T_MUX; + cs = cs_new_from_mux(endp, qcs->qcc->conn->owner, buf); if (!cs) { cs_endpoint_free(endp); return NULL; } - cs_attach_endp_mux(cs, qcs, qcs->qcc->conn); qcs->cs = cs; - stream_new(qcs->qcc->conn->owner, cs, buf); - ++qcs->qcc->nb_cs; return cs; diff --git a/src/backend.c b/src/backend.c index 2f33f37c22..f8c13b4f47 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1570,7 +1570,9 @@ skip_reuse: return SF_ERR_INTERNAL; /* how did we get there ? */ } - cs_attach_endp_mux(s->csb, NULL, srv_conn); + cs_attach_mux(s->csb, NULL, srv_conn); + srv_conn->ctx = s->csb; + #if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation) if (!srv || (srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) || diff --git a/src/check.c b/src/check.c index b26e7b9b27..44583c243d 100644 --- a/src/check.c +++ b/src/check.c @@ -1391,11 +1391,9 @@ int start_check_task(struct check *check, int mininter, if (check->type == PR_O2_EXT_CHK) t = task_new_on(0); else { - check->cs = cs_new(NULL); + check->cs = cs_new_from_check(check, CS_FL_NONE); if (!check->cs) goto fail_alloc_cs; - if (cs_attach_app(check->cs, &check->obj_type) < 0) - goto fail_attach_cs; t = task_new_anywhere(); } @@ -1420,7 +1418,6 @@ int start_check_task(struct check *check, int mininter, return 1; fail_alloc_task: - fail_attach_cs: cs_free(check->cs); fail_alloc_cs: ha_alert("Starting [%s:%s] check: out of memory.\n", diff --git a/src/conn_stream.c b/src/conn_stream.c index 5b8177484b..24a22b5219 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -75,6 +75,68 @@ struct conn_stream *cs_new(struct cs_endpoint *endp) return NULL; } +struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input) +{ + struct conn_stream *cs; + + cs = cs_new(endp); + if (unlikely(!cs)) + return NULL; + if (unlikely(!stream_new(sess, cs, input))) { + pool_free(pool_head_connstream, cs); + cs = NULL; + } + return cs; +} + +struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input) +{ + struct conn_stream *cs; + struct appctx *appctx = endp->ctx; + + cs = cs_new(endp); + if (unlikely(!cs)) + return NULL; + appctx->owner = cs; + if (unlikely(!stream_new(sess, cs, input))) { + pool_free(pool_head_connstream, cs); + cs = NULL; + } + return cs; +} + +struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags) +{ + struct conn_stream *cs; + + cs = cs_new(NULL); + if (unlikely(!cs)) + return NULL; + cs->flags |= flags; + cs->si = si_new(cs); + if (unlikely(!cs->si)) { + cs_free(cs); + return NULL; + } + cs->app = &strm->obj_type; + cs->si->ops = &si_embedded_ops; + cs->data_cb = NULL; + return cs; +} + +struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags) +{ + struct conn_stream *cs; + + cs = cs_new(NULL); + if (unlikely(!cs)) + return NULL; + cs->flags |= flags; + cs->app = &check->obj_type; + cs->data_cb = &check_conn_cb; + return cs; +} + /* Releases a conn_stream previously allocated by cs_new(), as well as any * buffer it would still hold. */ @@ -89,11 +151,11 @@ void cs_free(struct conn_stream *cs) /* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */ -void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx) +void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx) { struct connection *conn = ctx; - cs->endp->target = endp; + cs->endp->target = target; cs->endp->ctx = ctx; cs->endp->flags |= CS_EP_T_MUX; if (!conn->ctx) @@ -107,11 +169,11 @@ void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx) } /* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */ -void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx) +void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx) { - struct appctx *appctx = endp; + struct appctx *appctx = target; - cs->endp->target = endp; + cs->endp->target = target; cs->endp->ctx = ctx; cs->endp->flags |= CS_EP_T_APPLET; appctx->owner = cs; @@ -122,31 +184,26 @@ void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx) } /* Attaches a conn_stream to a app layer and sets the relevant callbacks */ -int cs_attach_app(struct conn_stream *cs, enum obj_type *app) +int cs_attach_strm(struct conn_stream *cs, struct stream *strm) { - cs->app = app; + cs->app = &strm->obj_type; - if (objt_stream(app)) { - if (!cs->si) - cs->si = si_new(cs); - if (unlikely(!cs->si)) - return -1; + cs->si = si_new(cs); + if (unlikely(!cs->si)) + return -1; - if (cs->endp->flags & CS_EP_T_MUX) { - cs->si->ops = &si_conn_ops; - cs->data_cb = &si_conn_cb; - } - else if (cs->endp->flags & CS_EP_T_APPLET) { - cs->si->ops = &si_applet_ops; - cs->data_cb = NULL; - } - else { - cs->si->ops = &si_embedded_ops; - cs->data_cb = NULL; - } + if (cs->endp->flags & CS_EP_T_MUX) { + cs->si->ops = &si_conn_ops; + cs->data_cb = &si_conn_cb; + } + else if (cs->endp->flags & CS_EP_T_APPLET) { + cs->si->ops = &si_applet_ops; + cs->data_cb = NULL; + } + else { + cs->si->ops = &si_embedded_ops; + cs->data_cb = NULL; } - else if (objt_check(app)) - cs->data_cb = &check_conn_cb; return 0; } diff --git a/src/dns.c b/src/dns.c index 461e3c7232..e6f70935c7 100644 --- a/src/dns.c +++ b/src/dns.c @@ -887,19 +887,15 @@ static struct appctx *dns_session_create(struct dns_session *ds) { struct appctx *appctx; struct session *sess; + struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; struct applet *applet = &dns_session_applet; + struct sockaddr_storage *addr = NULL; - cs = cs_new(NULL); - if (!cs) { - ha_alert("out of memory in dns_session_create().\n"); - goto out_close; - } - - appctx = appctx_new(applet, cs); + appctx = appctx_new(applet); if (!appctx) - goto out_free_cs; + goto out_close; appctx->ctx.sft.ptr = (void *)ds; sess = session_new(ds->dss->srv->proxy, NULL, &appctx->obj_type); @@ -908,18 +904,28 @@ static struct appctx *dns_session_create(struct dns_session *ds) goto out_free_appctx; } - if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { - ha_alert("Failed to initialize stream in dns_session_create().\n"); + if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr))) goto out_free_sess; + + endp = cs_endpoint_new(); + if (!endp) + goto out_free_addr; + endp->target = appctx; + endp->ctx = appctx; + endp->flags |= CS_EP_T_APPLET; + + cs = cs_new_from_applet(endp, sess, &BUF_NULL); + if (!cs) { + ha_alert("Failed to initialize stream in dns_session_create().\n"); + cs_endpoint_free(endp); + goto out_free_addr; } + s = DISGUISE(cs_strm(cs)); + cs_si(s->csb)->dst = addr; + cs_si(s->csb)->flags |= SI_FL_NOLINGER; s->target = &ds->dss->srv->obj_type; - if (!sockaddr_alloc(&cs_si(s->csb)->dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr))) - goto out_free_strm; - - cs_attach_endp_app(cs, appctx, appctx); s->flags = SF_ASSIGNED|SF_ADDR_SET; - cs_si(s->csb)->flags |= SI_FL_NOLINGER; s->do_log = NULL; s->uniq_id = 0; @@ -934,15 +940,12 @@ static struct appctx *dns_session_create(struct dns_session *ds) return appctx; /* Error unrolling */ - out_free_strm: - LIST_DELETE(&s->list); - pool_free(pool_head_stream, s); + out_free_addr: + sockaddr_free(&addr); out_free_sess: session_free(sess); out_free_appctx: appctx_free(appctx); - out_free_cs: - cs_free(cs); out_close: return NULL; } diff --git a/src/flt_spoe.c b/src/flt_spoe.c index e8daf1be18..86fb646f1c 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1988,16 +1988,13 @@ spoe_create_appctx(struct spoe_config *conf) { struct appctx *appctx; struct session *sess; + struct cs_endpoint *endp; struct conn_stream *cs; struct stream *strm; - cs = cs_new(NULL); - if (!cs) + if ((appctx = appctx_new(&spoe_applet)) == NULL) goto out_error; - if ((appctx = appctx_new(&spoe_applet, cs)) == NULL) - goto out_free_cs; - appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx); if (SPOE_APPCTX(appctx) == NULL) goto out_free_appctx; @@ -2028,14 +2025,24 @@ spoe_create_appctx(struct spoe_config *conf) if (!sess) goto out_free_spoe; - if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL) + endp = cs_endpoint_new(); + if (!endp) + goto out_free_sess; + endp->target = appctx; + endp->ctx = appctx; + endp->flags |= CS_EP_T_APPLET; + + cs = cs_new_from_applet(endp, sess, &BUF_NULL); + if (!cs) { + cs_endpoint_free(endp); goto out_free_sess; + } - cs_attach_endp_app(cs, appctx, appctx); + strm = DISGUISE(cs_strm(cs)); stream_set_backend(strm, conf->agent->b.be); /* applet is waiting for data */ - si_cant_get(strm->csf->si); + si_cant_get(cs_si(strm->csf)); appctx_wakeup(appctx); strm->do_log = NULL; @@ -2058,8 +2065,6 @@ spoe_create_appctx(struct spoe_config *conf) pool_free(pool_head_spoe_appctx, SPOE_APPCTX(appctx)); out_free_appctx: appctx_free(appctx); - out_free_cs: - cs_free(cs); out_error: return NULL; } diff --git a/src/hlua.c b/src/hlua.c index 3f9b123b2d..705c15c19a 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2918,8 +2918,9 @@ __LJMP static int hlua_socket_new(lua_State *L) struct hlua_socket *socket; struct appctx *appctx; struct session *sess; + struct cs_endpoint *endp; struct conn_stream *cs; - struct stream *strm; + struct stream *s; /* Check stack size. */ if (!lua_checkstack(L, 3)) { @@ -2944,17 +2945,11 @@ __LJMP static int hlua_socket_new(lua_State *L) lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref); lua_setmetatable(L, -2); - cs = cs_new(NULL); - if (!cs) { - hlua_pusherror(L, "socket: out of memory"); - goto out_fail_conf; - } - /* Create the applet context */ - appctx = appctx_new(&update_applet, cs); + appctx = appctx_new(&update_applet); if (!appctx) { hlua_pusherror(L, "socket: out of memory"); - goto out_fail_cs; + goto out_fail_conf; } appctx->ctx.hlua_cosocket.connected = 0; @@ -2969,13 +2964,21 @@ __LJMP static int hlua_socket_new(lua_State *L) goto out_fail_appctx; } - strm = stream_new(sess, cs, &BUF_NULL); - if (!strm) { + endp = cs_endpoint_new(); + if (!endp) + goto out_fail_sess; + endp->target = appctx; + endp->ctx = appctx; + endp->flags |= CS_EP_T_APPLET; + + cs = cs_new_from_applet(endp, sess, &BUF_NULL); + if (!cs) { hlua_pusherror(L, "socket: out of memory"); + cs_endpoint_free(endp); goto out_fail_sess; } - cs_attach_endp_app(cs, appctx, appctx); + s = DISGUISE(cs_strm(cs)); /* Initialise cross reference between stream and Lua socket object. */ xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref); @@ -2984,11 +2987,11 @@ __LJMP static int hlua_socket_new(lua_State *L) * and retrieve data from the server. The connection is initialized * with the "struct server". */ - si_set_state(strm->csb->si, SI_ST_ASS); + si_set_state(cs_si(s->csb), SI_ST_ASS); /* Force destination server. */ - strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED; - strm->target = &socket_tcp->obj_type; + s->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED; + s->target = &socket_tcp->obj_type; return 1; @@ -2996,8 +2999,6 @@ __LJMP static int hlua_socket_new(lua_State *L) session_free(sess); out_fail_appctx: appctx_free(appctx); - out_fail_cs: - cs_free(cs); out_fail_conf: WILL_LJMP(lua_error(L)); return 0; diff --git a/src/http_client.c b/src/http_client.c index dd27482073..55701e5eab 100644 --- a/src/http_client.c +++ b/src/http_client.c @@ -455,8 +455,10 @@ struct appctx *httpclient_start(struct httpclient *hc) struct applet *applet = &httpclient_applet; struct appctx *appctx; struct session *sess; + struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; + struct sockaddr_storage *addr = NULL; int len; struct sockaddr_storage ss_url; struct sockaddr_storage* ss_dst; @@ -476,17 +478,11 @@ struct appctx *httpclient_start(struct httpclient *hc) goto out; } - cs = cs_new(NULL); - if (!cs) { - ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__); - goto out; - } - /* The HTTP client will be created in the same thread as the caller, * avoiding threading issues */ - appctx = appctx_new(applet, cs); + appctx = appctx_new(applet); if (!appctx) - goto out_free_cs; + goto out; sess = session_new(httpclient_proxy, NULL, &appctx->obj_type); if (!sess) { @@ -494,25 +490,33 @@ struct appctx *httpclient_start(struct httpclient *hc) goto out_free_appctx; } - if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) { - ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__); - goto out_free_sess; - } - - /* set the "timeout server" */ - s->req.wto = hc->timeout_server; - s->res.rto = hc->timeout_server; - /* if httpclient_set_dst() was used, sets the alternative address */ if (hc->dst) ss_dst = hc->dst; else ss_dst = &ss_url; - if (!sockaddr_alloc(&cs_si(s->csb)->dst, ss_dst, sizeof(*hc->dst))) { - ha_alert("httpclient: Failed to initialize stream in %s:%d.\n", __FUNCTION__, __LINE__); - goto out_free_stream; + if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst))) + goto out_free_sess; + + endp = cs_endpoint_new(); + if (!endp) + goto out_free_addr; + endp->target = appctx; + endp->ctx = appctx; + endp->flags |= CS_EP_T_APPLET; + + cs = cs_new_from_applet(endp, sess, &hc->req.buf); + if (!cs) { + ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__); + cs_endpoint_free(endp); + goto out_free_addr; } + s = DISGUISE(cs_strm(cs)); + + /* set the "timeout server" */ + s->req.wto = hc->timeout_server; + s->res.rto = hc->timeout_server; /* choose the SSL server or not */ switch (out.scheme) { @@ -529,9 +533,9 @@ struct appctx *httpclient_start(struct httpclient *hc) break; } - cs_attach_endp_app(cs, appctx, appctx); - s->flags |= SF_ASSIGNED|SF_ADDR_SET; + cs_si(s->csb)->dst = addr; cs_si(s->csb)->flags |= SI_FL_NOLINGER; + s->flags |= SF_ASSIGNED|SF_ADDR_SET; s->res.flags |= CF_READ_DONTWAIT; /* applet is waiting for data */ @@ -550,14 +554,16 @@ struct appctx *httpclient_start(struct httpclient *hc) return appctx; out_free_stream: + cs_detach_app(cs); LIST_DELETE(&s->list); pool_free(pool_head_stream, s); + cs_free(cs); +out_free_addr: + sockaddr_free(&addr); out_free_sess: session_free(sess); out_free_appctx: appctx_free(appctx); -out_free_cs: - cs_free(cs); out: return NULL; diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index ea956b3341..e18d9abcf6 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -1130,7 +1130,7 @@ static struct fcgi_strm *fcgi_conn_stream_new(struct fcgi_conn *fconn, struct co TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn); goto out; } - cs_attach_endp_mux(cs, fstrm, fconn->conn); + cs_attach_mux(cs, fstrm, fconn->conn); fstrm->cs = cs; fstrm->sess = sess; fconn->nb_cs++; diff --git a/src/mux_h1.c b/src/mux_h1.c index 669b411051..9ebd0ffb1d 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -717,9 +717,9 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) { struct h1c *h1c = h1s->h1c; struct cs_endpoint *endp; - struct conn_stream *cs; TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s); + endp = cs_endpoint_new(); if (!endp) { TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); @@ -727,36 +727,27 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) } endp->target = h1s; endp->ctx = h1c->conn; + endp->flags |= CS_EP_T_MUX; if (h1s->flags & H1S_F_NOT_FIRST) endp->flags |= CS_EP_NOT_FIRST; if (h1s->req.flags & H1_MF_UPG_WEBSOCKET) endp->flags |= CS_EP_WEBSOCKET; - cs = cs_new(endp); - if (!cs) { + h1s->cs = cs_new_from_mux(endp, h1c->conn->owner, input); + if (!h1s->cs) { TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); cs_endpoint_free(endp); goto err; } - cs_attach_endp_mux(cs, h1s, h1c->conn); - h1s->cs = cs; - - if (!stream_new(h1c->conn->owner, cs, input)) { - TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); - goto err_cs; - } HA_ATOMIC_INC(&h1c->px_counters->open_streams); HA_ATOMIC_INC(&h1c->px_counters->total_streams); h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY; TRACE_LEAVE(H1_EV_STRM_NEW, h1c->conn, h1s); - return cs; + return h1s->cs; - err_cs: - cs_free(cs); err: - h1s->cs = NULL; TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn, h1s); return NULL; } @@ -856,7 +847,7 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s if (!h1s) goto fail; - cs_attach_endp_mux(cs, h1s, h1c->conn); + cs_attach_mux(cs, h1s, h1c->conn); h1s->flags |= H1S_F_RX_BLK; h1s->cs = cs; h1s->sess = sess; @@ -1004,7 +995,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session if (!h1c_frt_stream_new(h1c)) goto fail; h1c->h1s->cs = cs; - cs_attach_endp_mux(cs, h1c->h1s, conn); + cs_attach_mux(cs, h1c->h1s, conn); /* Attach the CS but Not ready yet */ h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED; diff --git a/src/mux_h2.c b/src/mux_h2.c index 98a837d42b..39e5ece76a 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -1591,7 +1591,6 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in { struct session *sess = h2c->conn->owner; struct cs_endpoint *endp; - struct conn_stream *cs; struct h2s *h2s; TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn); @@ -1608,30 +1607,26 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in goto out_close; endp->target = h2s; endp->ctx = h2c->conn; - endp->flags |= CS_EP_NOT_FIRST; + endp->flags |= (CS_EP_T_MUX|CS_EP_NOT_FIRST); /* FIXME wrong analogy between ext-connect and websocket, this need to * be refine. */ if (flags & H2_SF_EXT_CONNECT_RCVD) endp->flags |= CS_EP_WEBSOCKET; - cs = cs_new(endp); - if (!cs) { - cs_endpoint_free(endp); - goto out_close; - } - cs_attach_endp_mux(cs, h2s, h2c->conn); - h2s->cs = cs; - h2c->nb_cs++; - /* The stream will record the request's accept date (which is either the * end of the connection's or the date immediately after the previous * request) and the idle time, which is the delay since the previous * request. We can set the value now, it will be copied by stream_new(). */ sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake; - if (!stream_new(h2c->conn->owner, cs, input)) - goto out_free_cs; + + h2s->cs = cs_new_from_mux(endp, sess, input); + if (!h2s->cs) { + cs_endpoint_free(endp); + goto out_close; + } + h2c->nb_cs++; /* We want the accept date presented to the next stream to be the one * we have now, the handshake time to be null (since the next stream @@ -1649,12 +1644,6 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in TRACE_LEAVE(H2_EV_H2S_NEW, h2c->conn); return h2s; - out_free_cs: - h2c->nb_cs--; - if (!h2c->nb_cs) - h2c->idle_start = now_ms; - cs_free(cs); - h2s->cs = NULL; out_close: h2s_destroy(h2s); out: @@ -1684,7 +1673,7 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, s if (!h2s) goto out; - cs_attach_endp_mux(cs, h2s, h2c->conn); + cs_attach_mux(cs, h2s, h2c->conn); h2s->cs = cs; h2s->sess = sess; h2c->nb_cs++; diff --git a/src/mux_pt.c b/src/mux_pt.c index e49898e2eb..5bd71932e3 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -297,18 +297,14 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio goto fail_free_ctx; endp->target = ctx; endp->ctx = conn; - cs = cs_new(endp); + endp->flags |= CS_EP_T_MUX; + + cs = cs_new_from_mux(endp, sess, input); if (!cs) { TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn); cs_endpoint_free(endp); goto fail_free_ctx; } - cs_attach_endp_mux(cs, ctx, conn); - - if (!stream_new(conn->owner, cs, &BUF_NULL)) { - TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs); - goto fail_free; - } TRACE_POINT(PT_EV_STRM_NEW, conn, cs); } conn->ctx = ctx; @@ -320,9 +316,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs); return 0; - fail_free: - cs_free(cs); -fail_free_ctx: + fail_free_ctx: if (ctx->wait_event.tasklet) tasklet_free(ctx->wait_event.tasklet); pool_free(pool_head_pt_ctx, ctx); @@ -379,7 +373,7 @@ static int mux_pt_attach(struct connection *conn, struct conn_stream *cs, struct TRACE_ENTER(PT_EV_STRM_NEW, conn); if (ctx->wait_event.events) conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event); - cs_attach_endp_mux(cs, ctx, conn); + cs_attach_mux(cs, ctx, conn); ctx->cs = cs; cs->flags |= CS_FL_RCV_MORE; diff --git a/src/peers.c b/src/peers.c index b88f4ec05b..9701963182 100644 --- a/src/peers.c +++ b/src/peers.c @@ -3181,8 +3181,10 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer struct proxy *p = peers->peers_fe; /* attached frontend */ struct appctx *appctx; struct session *sess; + struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; + struct sockaddr_storage *addr = NULL; peer->new_conn++; peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); @@ -3191,15 +3193,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer peer->last_hdshk = now_ms; s = NULL; - cs = cs_new(NULL); - if (!cs) { - ha_alert("out of memory in peer_session_create().\n"); - goto out_close; - } - - appctx = appctx_new(&peer_applet, cs); + appctx = appctx_new(&peer_applet); if (!appctx) - goto out_free_cs; + goto out_close; appctx->st0 = PEER_SESS_ST_CONNECT; appctx->ctx.peers.ptr = (void *)peer; @@ -3210,23 +3206,34 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer goto out_free_appctx; } - if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { - ha_alert("Failed to initialize stream in peer_session_create().\n"); + if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr))) goto out_free_sess; + + endp = cs_endpoint_new(); + if (!endp) + goto out_free_addr; + endp->target = appctx; + endp->ctx = appctx; + endp->flags |= CS_EP_T_APPLET; + + cs = cs_new_from_applet(endp, sess, &BUF_NULL); + if (!cs) { + ha_alert("Failed to initialize stream in peer_session_create().\n"); + cs_endpoint_free(endp); + goto out_free_addr; } + s = DISGUISE(cs_strm(cs)); + /* applet is waiting for data */ si_cant_get(cs_si(s->csf)); appctx_wakeup(appctx); /* initiate an outgoing connection */ - s->target = peer_session_target(peer, s); - if (!sockaddr_alloc(&(cs_si(s->csb)->dst), &peer->addr, sizeof(peer->addr))) - goto out_free_strm; - - cs_attach_endp_app(cs, appctx, appctx); - s->flags = SF_ASSIGNED|SF_ADDR_SET; + cs_si(s->csb)->dst = addr; cs_si(s->csb)->flags |= SI_FL_NOLINGER; + s->flags = SF_ASSIGNED|SF_ADDR_SET; + s->target = peer_session_target(peer, s); s->do_log = NULL; s->uniq_id = 0; @@ -3238,15 +3245,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer return appctx; /* Error unrolling */ - out_free_strm: - LIST_DELETE(&s->list); - pool_free(pool_head_stream, s); + out_free_addr: + sockaddr_free(&addr); out_free_sess: session_free(sess); out_free_appctx: appctx_free(appctx); - out_free_cs: - cs_free(cs); out_close: return NULL; } diff --git a/src/sink.c b/src/sink.c index d704042d1b..0f016890df 100644 --- a/src/sink.c +++ b/src/sink.c @@ -636,22 +636,18 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink struct proxy *p = sink->forward_px; struct appctx *appctx; struct session *sess; + struct cs_endpoint *endp; struct conn_stream *cs; struct stream *s; struct applet *applet = &sink_forward_applet; + struct sockaddr_storage *addr = NULL; if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) applet = &sink_forward_oc_applet; - cs = cs_new(NULL); - if (!cs) { - ha_alert("out of memory in sink_forward_session_create"); - goto out_close; - } - - appctx = appctx_new(applet, cs); + appctx = appctx_new(applet); if (!appctx) - goto out_free_cs; + goto out_close; appctx->ctx.sft.ptr = (void *)sft; @@ -661,19 +657,29 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink goto out_free_appctx; } - if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { - ha_alert("Failed to initialize stream in sink_forward_session_create().\n"); + if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr))) goto out_free_sess; + + endp = cs_endpoint_new(); + if (!endp) + goto out_free_addr; + endp->target = appctx; + endp->ctx = appctx; + endp->flags |= CS_EP_T_APPLET; + + cs = cs_new_from_applet(endp, sess, &BUF_NULL); + if (!cs) { + ha_alert("Failed to initialize stream in sink_forward_session_create().\n"); + cs_endpoint_free(endp); + goto out_free_addr; } + s = DISGUISE(cs_strm(cs)); + cs_si(s->csb)->dst = addr; + cs_si(s->csb)->flags |= SI_FL_NOLINGER; s->target = &sft->srv->obj_type; - if (!sockaddr_alloc(&cs_si(s->csb)->dst, &sft->srv->addr, sizeof(sft->srv->addr))) - goto out_free_strm; - - cs_attach_endp_app(cs, appctx, appctx); s->flags = SF_ASSIGNED|SF_ADDR_SET; - cs_si(s->csb)->flags |= SI_FL_NOLINGER; s->do_log = NULL; s->uniq_id = 0; @@ -688,15 +694,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink return appctx; /* Error unrolling */ - out_free_strm: - LIST_DELETE(&s->list); - pool_free(pool_head_stream, s); + out_free_addr: + sockaddr_free(&addr); out_free_sess: session_free(sess); out_free_appctx: appctx_free(appctx); - out_free_cs: - cs_free(cs); out_close: return NULL; } diff --git a/src/stream.c b/src/stream.c index e30027c522..b07de9bc07 100644 --- a/src/stream.c +++ b/src/stream.c @@ -443,15 +443,13 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b s->flags |= SF_HTX; s->csf = cs; - s->csb = cs_new(NULL); + if (cs_attach_strm(s->csf, s) < 0) + goto out_fail_attach_csf; + + s->csb = cs_new_from_strm(s, CS_FL_NONE); if (!s->csb) goto out_fail_alloc_csb; - if (cs_attach_app(s->csf, &s->obj_type) < 0) - goto out_fail_attach_csf; - if (cs_attach_app(s->csb, &s->obj_type) < 0) - goto out_fail_attach_csb; - si_set_state(cs_si(s->csf), SI_ST_EST); cs_si(s->csf)->hcto = sess->fe->timeout.clientfin; diff --git a/src/stream_interface.c b/src/stream_interface.c index 4e2f7bb2f3..3609647564 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -339,10 +339,11 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si)); - appctx = appctx_new(app, si->cs); + appctx = appctx_new(app); if (!appctx) return NULL; - cs_attach_endp_app(si->cs, appctx, appctx); + cs_attach_applet(si->cs, appctx, appctx); + appctx->owner = si->cs; appctx->t->nice = si_strm(si)->task->nice; si_cant_get(si); appctx_wakeup(appctx); diff --git a/src/tcpcheck.c b/src/tcpcheck.c index ee439daeb2..53ee771f46 100644 --- a/src/tcpcheck.c +++ b/src/tcpcheck.c @@ -1101,7 +1101,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check); goto out; } - cs_attach_endp_mux(check->cs, NULL, conn); + cs_attach_mux(check->cs, NULL, conn); + conn->ctx = check->cs; tasklet_set_tid(check->wait_list.tasklet, tid); conn_set_owner(conn, check->sess, NULL);