From cda94accb1f0e2615b36688a7271829cb3a3be33 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Thu, 23 Dec 2021 17:28:17 +0100 Subject: [PATCH] MAJOR: stream/conn_stream: Move the stream-interface into the conn-stream Thanks to all previous changes, it is now possible to move the stream-interface into the conn-stream. To do so, some SI functions are removed and their conn-stream counterparts are added. In addition, the conn-stream is now responsible to create and release the stream-interface. While the stream-interfaces were inlined in the stream structure, there is now a pointer in the conn-stream. stream-interfaces are now dynamically allocated. Thus a dedicated pool is added. It is a temporary change because, at the end, the stream-interface structure will most probably disappear. --- include/haproxy/channel.h | 8 +-- include/haproxy/conn_stream-t.h | 4 +- include/haproxy/conn_stream.h | 66 +++--------------- include/haproxy/stream-t.h | 1 - include/haproxy/stream_interface.h | 82 ++-------------------- src/backend.c | 8 +-- src/cli.c | 2 +- src/conn_stream.c | 107 ++++++++++++++++++++++++++-- src/connection.c | 8 +-- src/dns.c | 3 +- src/flt_spoe.c | 5 +- src/h3.c | 3 +- src/hlua.c | 5 +- src/hq_interop.c | 4 +- src/http_ana.c | 2 +- src/http_client.c | 3 +- src/http_fetch.c | 4 +- src/mux_h1.c | 3 +- src/mux_h2.c | 4 +- src/mux_pt.c | 3 +- src/peers.c | 3 +- src/sink.c | 3 +- src/stream.c | 108 +++++++++++++---------------- src/stream_interface.c | 35 +++++++++- src/tcp_act.c | 2 +- src/tcp_sample.c | 12 ++-- src/tcpcheck.c | 15 +++- 27 files changed, 261 insertions(+), 242 deletions(-) diff --git a/include/haproxy/channel.h b/include/haproxy/channel.h index 07127bde92..8de8e17580 100644 --- a/include/haproxy/channel.h +++ b/include/haproxy/channel.h @@ -68,18 +68,18 @@ static inline struct stream *chn_strm(const struct channel *chn) static inline struct stream_interface *chn_prod(const struct channel *chn) { if (chn->flags & CF_ISRESP) - return &LIST_ELEM(chn, struct stream *, res)->si[1]; + return LIST_ELEM(chn, struct stream *, res)->csb->si; else - return &LIST_ELEM(chn, struct stream *, req)->si[0]; + return LIST_ELEM(chn, struct stream *, req)->csf->si; } /* returns a pointer to the stream interface consuming the channel (producer) */ static inline struct stream_interface *chn_cons(const struct channel *chn) { if (chn->flags & CF_ISRESP) - return &LIST_ELEM(chn, struct stream *, res)->si[0]; + return LIST_ELEM(chn, struct stream *, res)->csf->si; else - return &LIST_ELEM(chn, struct stream *, req)->si[1]; + return LIST_ELEM(chn, struct stream *, req)->csb->si; } /* c_orig() : returns the pointer to the channel buffer's origin */ diff --git a/include/haproxy/conn_stream-t.h b/include/haproxy/conn_stream-t.h index 88775e9178..580d104bbb 100644 --- a/include/haproxy/conn_stream-t.h +++ b/include/haproxy/conn_stream-t.h @@ -25,6 +25,8 @@ #include +struct stream_interface; + /* conn_stream flags */ enum { CS_FL_NONE = 0x00000000, /* Just for initialization purposes */ @@ -92,7 +94,7 @@ struct conn_stream { unsigned int flags; /* CS_FL_* */ enum obj_type *end; /* points to the end point (connection or appctx) */ enum obj_type *app; /* points to the applicative point (stream or check) */ - void *data; /* pointer to upper layer's entity (eg: stream interface) */ + struct stream_interface *si; const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */ void *ctx; /* mux-specific context */ }; diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h index 730a29bbd4..5be833dbca 100644 --- a/include/haproxy/conn_stream.h +++ b/include/haproxy/conn_stream.h @@ -29,13 +29,16 @@ #include struct stream; +struct stream_interface; struct check; #define IS_HTX_CS(cs) (cs_conn(cs) && IS_HTX_CONN(cs_conn(cs))) -struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, void *data, const struct data_cb *data_cb); +struct conn_stream *cs_new(); void cs_free(struct conn_stream *cs); - +void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx); +int cs_attach_app(struct conn_stream *cs, enum obj_type *app); +void cs_detach_endp(struct conn_stream *cs); /* * Initializes all required fields for a new conn_strema. @@ -47,7 +50,7 @@ static inline void cs_init(struct conn_stream *cs) cs->end = NULL; cs->app = NULL; cs->ctx = NULL; - cs->data = NULL; + cs->si = NULL; cs->data_cb = NULL; } @@ -77,11 +80,6 @@ static inline struct appctx *cs_appctx(const struct conn_stream *cs) return (cs ? objt_appctx(cs->end) : NULL); } -static inline struct stream_interface *cs_si(const struct conn_stream *cs) -{ - return (cs ? cs->data : NULL); -} - static inline struct stream *cs_strm(const struct conn_stream *cs) { return (cs ? objt_stream(cs->app) : NULL); @@ -92,57 +90,9 @@ static inline struct check *cs_check(const struct conn_stream *cs) return (cs ? objt_check(cs->app) : NULL); } -/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */ -static inline void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx) -{ - cs->end = endp; - cs->ctx = ctx; -} - -/* Attaches a conn_stream to a app layer and sets the relevant callbacks */ -static inline void cs_attach_app(struct conn_stream *cs, enum obj_type *app, void *data, const struct data_cb *data_cb) -{ - cs->app = app; - cs->data = data; - cs->data_cb = data_cb; -} - -/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux - * owns the connection ->detach() callback is called. Otherwise, it means the - * conn-stream owns the connection. In this case the connection is closed and - * released. For an applet, the appctx is released. At the end, the conn-stream - * is not released but some fields a reset. - */ -static inline void cs_detach_endp(struct conn_stream *cs) +static inline struct stream_interface *cs_si(const struct conn_stream *cs) { - struct connection *conn; - struct appctx *appctx; - - if ((conn = cs_conn(cs))) { - if (conn->mux) - conn->mux->detach(cs); - else { - /* It's too early to have a mux, let's just destroy - * the connection - */ - conn_stop_tracking(conn); - conn_full_close(conn); - if (conn->destroy_cb) - conn->destroy_cb(conn); - conn_free(conn); - } - } - else if ((appctx = cs_appctx(cs))) { - if (appctx->applet->release) - appctx->applet->release(appctx); - appctx_free(appctx); - } - - /* Rest CS */ - cs->flags = CS_FL_NONE; - cs->end = NULL; - cs->ctx = NULL; - cs->data_cb = NULL; + return (cs_strm(cs) ? cs->si : NULL); } /* Release a conn_stream */ diff --git a/include/haproxy/stream-t.h b/include/haproxy/stream-t.h index 7e8a257a2a..f75a181406 100644 --- a/include/haproxy/stream-t.h +++ b/include/haproxy/stream-t.h @@ -167,7 +167,6 @@ struct stream { struct conn_stream *csf; /* frontend conn-stream */ struct conn_stream *csb; /* backend conn-stream */ - struct stream_interface si[2]; /* client and server stream interfaces */ struct strm_logs logs; /* logs for this stream */ void (*do_log)(struct stream *s); /* the function to call in order to log (or NULL) */ diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index 5236ffd07e..a30f345268 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -33,6 +33,10 @@ extern struct si_ops si_embedded_ops; extern struct si_ops si_conn_ops; extern struct si_ops si_applet_ops; extern struct data_cb si_conn_cb; +extern struct data_cb check_conn_cb; + +struct stream_interface *si_new(struct conn_stream *cs); +void si_free(struct stream_interface *si); /* main event functions used to move data between sockets and buffers */ int si_check_timeouts(struct stream_interface *si); @@ -87,7 +91,7 @@ static inline struct task *si_task(struct stream_interface *si) /* returns the stream interface on the other side. Used during forwarding. */ static inline struct stream_interface *si_opposite(struct stream_interface *si) { - return ((si->flags & SI_FL_ISBACK) ? &(cs_strm(si->cs)->si[0]) : &(cs_strm(si->cs)->si[1])); + return ((si->flags & SI_FL_ISBACK) ? cs_strm(si->cs)->csf->si : cs_strm(si->cs)->csb->si); } /* initializes a stream interface in the SI_ST_INI state. It's detached from @@ -105,6 +109,7 @@ static inline int si_reset(struct stream_interface *si) si->cs = NULL; si->state = si->prev_state = SI_ST_INI; si->ops = &si_embedded_ops; + si->l7_buffer = BUF_NULL; si->wait_event.tasklet = tasklet_new(); if (!si->wait_event.tasklet) return -1; @@ -137,81 +142,6 @@ static inline int si_state_in(enum si_state state, enum si_state_bit mask) return !!(si_state_bit(state) & mask); } -/* Reset the endpoint detaching it from the conn-stream. For a connection - * attached to a mux, it is unsubscribe from any event. - */ -static inline void si_reset_endpoint(struct stream_interface *si) -{ - if (!si->cs) - return; - - if (cs_conn_mux(si->cs) && si->wait_event.events != 0) - (cs_conn_mux(si->cs))->unsubscribe(si->cs, si->wait_event.events, &si->wait_event); - - cs_detach_endp(si->cs); - si->ops = &si_embedded_ops; -} - -/* Release the endpoint if it's a connection or an applet, then nullify it. - * Note: released connections are closed then freed. - */ -static inline void si_release_endpoint(struct stream_interface *si) -{ - if (!si->cs) - return; - si_reset_endpoint(si); - cs_free(si->cs); - si->cs = NULL; - si->ops = &si_embedded_ops; - -} - -/* Attach conn_stream to the stream interface . */ -static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs) -{ - si->cs = cs; - if (cs_conn(cs)) { - si->ops = &si_conn_ops; - cs_attach_app(cs, &si_strm(si)->obj_type, si, &si_conn_cb); - } - else if (cs_appctx(cs)) { - struct appctx *appctx = cs_appctx(cs); - - si->ops = &si_applet_ops; - appctx->owner = cs; - cs_attach_app(cs, &si_strm(si)->obj_type, si, NULL); - } - else { - si->ops = &si_embedded_ops; - cs_attach_app(cs, &si_strm(si)->obj_type, si, NULL); - } -} - -/* Attach connection to the stream interface . The stream interface - * is configured to work with a connection context. - */ -static inline void si_attach_conn(struct stream_interface *si, struct connection *conn) -{ - si_reset_endpoint(si); - if (!conn->ctx) - conn->ctx = si->cs; - si->ops = &si_conn_ops; - cs_attach_endp(si->cs, &conn->obj_type, conn); - cs_attach_app(si->cs, &si_strm(si)->obj_type, si, &si_conn_cb); -} - -/* Attach appctx to the stream interface . The stream interface - * is configured to work with an applet context. - */ -static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx) -{ - si_reset_endpoint(si); - appctx->owner = si->cs; - si->ops = &si_applet_ops; - cs_attach_endp(si->cs, &appctx->obj_type, appctx); - cs_attach_app(si->cs, &si_strm(si)->obj_type, si, NULL); -} - /* call the applet's release function if any. Needs to be called upon close() */ static inline void si_applet_release(struct stream_interface *si) { diff --git a/src/backend.c b/src/backend.c index 3da12f812d..0fed42a461 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1495,9 +1495,9 @@ int connect_server(struct stream *s) } if (avail >= 1) { - si_attach_conn(cs_si(s->csb), srv_conn); + cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn); if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) { - si_reset_endpoint(cs_si(s->csb)); + cs_detach_endp(s->csb); srv_conn = NULL; } } @@ -1571,7 +1571,7 @@ skip_reuse: return SF_ERR_INTERNAL; /* how did we get there ? */ } - si_attach_conn(cs_si(s->csb), srv_conn); + cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn); #if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation) if (!srv || (srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) || @@ -2289,7 +2289,7 @@ void back_handle_st_cer(struct stream *s) * Note: the stream-interface will be switched to ST_REQ, ST_ASS or * ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset. */ - si_reset_endpoint(cs_si(s->csb)); + cs_detach_endp(s->csb); stream_choose_redispatch(s); diff --git a/src/cli.c b/src/cli.c index e0f9736b86..63ca4c6d08 100644 --- a/src/cli.c +++ b/src/cli.c @@ -2715,7 +2715,7 @@ int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit) * connection. */ if (!si_conn_ready(cs_si(s->csb))) { - si_reset_endpoint(cs_si(s->csb)); + cs_detach_endp(s->csb); s->srv_conn = NULL; } diff --git a/src/conn_stream.c b/src/conn_stream.c index 41d4a2e5c0..0c7bf9b423 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -14,7 +14,7 @@ #include #include #include -//#include +#include DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream)); @@ -22,7 +22,7 @@ DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream)); /* Tries to allocate a new conn_stream and initialize its main fields. On * failure, nothing is allocated and NULL is returned. */ -struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, void *data, const struct data_cb *data_cb) +struct conn_stream *cs_new() { struct conn_stream *cs; @@ -30,8 +30,6 @@ struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, v if (unlikely(!cs)) return NULL; cs_init(cs); - cs_attach_endp(cs, endp, ctx); - cs_attach_app(cs, app, data, data_cb); return cs; } @@ -40,5 +38,106 @@ struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, v */ void cs_free(struct conn_stream *cs) { + si_free(cs->si); pool_free(pool_head_connstream, cs); } + + +/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */ +void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx) +{ + struct connection *conn; + struct appctx *appctx; + + cs->end = endp; + cs->ctx = ctx; + if ((conn = objt_conn(endp)) != NULL) { + if (!conn->ctx) + conn->ctx = cs; + if (cs_strm(cs)) { + cs->si->ops = &si_conn_ops; + cs->data_cb = &si_conn_cb; + } + else if (cs_check(cs)) + cs->data_cb = &check_conn_cb; + } + else if ((appctx = objt_appctx(endp)) != NULL) { + appctx->owner = cs; + if (cs->si) { + cs->si->ops = &si_applet_ops; + cs->data_cb = NULL; + } + } +} + +/* Attaches a conn_stream to a app layer and sets the relevant callbacks */ +int cs_attach_app(struct conn_stream *cs, enum obj_type *app) +{ + cs->app = app; + + if (objt_stream(app)) { + if (!cs->si) + cs->si = si_new(cs); + if (unlikely(!cs->si)) + return -1; + + if (cs_conn(cs)) { + cs->si->ops = &si_conn_ops; + cs->data_cb = &si_conn_cb; + } + else if (cs_appctx(cs)) { + cs->si->ops = &si_applet_ops; + cs->data_cb = NULL; + } + else { + cs->si->ops = &si_embedded_ops; + cs->data_cb = NULL; + } + } + else if (objt_check(app)) + cs->data_cb = &check_conn_cb; + return 0; +} + +/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux + * owns the connection ->detach() callback is called. Otherwise, it means the + * conn-stream owns the connection. In this case the connection is closed and + * released. For an applet, the appctx is released. At the end, the conn-stream + * is not released but some fields a reset. + */ +void cs_detach_endp(struct conn_stream *cs) +{ + struct connection *conn; + struct appctx *appctx; + + if ((conn = cs_conn(cs))) { + if (conn->mux) { + if (cs->si && cs->si->wait_event.events != 0) + conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event); + conn->mux->detach(cs); + } + else { + /* It's too early to have a mux, let's just destroy + * the connection + */ + conn_stop_tracking(conn); + conn_full_close(conn); + if (conn->destroy_cb) + conn->destroy_cb(conn); + conn_free(conn); + } + } + else if ((appctx = cs_appctx(cs))) { + if (appctx->applet->release) + appctx->applet->release(appctx); + appctx_free(appctx); + } + + /* Rest CS */ + cs->flags = CS_FL_NONE; + cs->end = NULL; + cs->ctx = NULL; + if (cs->si) + cs->si->ops = &si_embedded_ops; + cs->data_cb = NULL; +} diff --git a/src/connection.c b/src/connection.c index 60cbd22210..1d53eed795 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1738,8 +1738,8 @@ static int make_proxy_line_v2(char *buf, int buf_len, struct server *srv, struct memcpy(hdr->sig, pp2_signature, PP2_SIGNATURE_LEN); if (strm) { - src = si_src(&strm->si[0]); - dst = si_dst(&strm->si[0]); + src = si_src(strm->csf->si); + dst = si_dst(strm->csf->si); } else if (remote && conn_get_src(remote) && conn_get_dst(remote)) { src = conn_src(remote); @@ -1937,8 +1937,8 @@ int make_proxy_line(char *buf, int buf_len, struct server *srv, struct connectio const struct sockaddr_storage *dst = NULL; if (strm) { - src = si_src(&strm->si[0]); - dst = si_dst(&strm->si[0]); + src = si_src(strm->csf->si); + dst = si_dst(strm->csf->si); } else if (remote && conn_get_src(remote) && conn_get_dst(remote)) { src = conn_src(remote); diff --git a/src/dns.c b/src/dns.c index 0bcecec2d3..249db228f8 100644 --- a/src/dns.c +++ b/src/dns.c @@ -903,11 +903,12 @@ static struct appctx *dns_session_create(struct dns_session *ds) goto out_free_appctx; } - cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); + cs = cs_new(); if (!cs) { ha_alert("out of memory in dns_session_create().\n"); goto out_free_sess; } + cs_attach_endp(cs, &appctx->obj_type, appctx); if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { ha_alert("Failed to initialize stream in dns_session_create().\n"); diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 6c4c9f8ab9..5d0a535242 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -2024,9 +2024,10 @@ spoe_create_appctx(struct spoe_config *conf) if (!sess) goto out_free_spoe; - cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); + cs = cs_new(); if (!cs) goto out_free_sess; + cs_attach_endp(cs, &appctx->obj_type, appctx); if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL) goto out_free_cs; @@ -2034,7 +2035,7 @@ spoe_create_appctx(struct spoe_config *conf) stream_set_backend(strm, conf->agent->b.be); /* applet is waiting for data */ - si_cant_get(&strm->si[0]); + si_cant_get(strm->csf->si); appctx_wakeup(appctx); strm->do_log = NULL; diff --git a/src/h3.c b/src/h3.c index bda2a46477..b5699bfbfc 100644 --- a/src/h3.c +++ b/src/h3.c @@ -176,9 +176,10 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, if (fin) htx->flags |= HTX_FL_EOM; - cs = cs_new(qcs->qcc->conn->obj_type); + cs = cs_new(); if (!cs) return 1; + cs_attach_endp(&qcs->qcc->conn->obj_type, qcs); cs->flags |= CS_FL_NOT_FIRST; cs->ctx = qcs; diff --git a/src/hlua.c b/src/hlua.c index a6cc839eb3..3b2bd73efd 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2961,11 +2961,12 @@ __LJMP static int hlua_socket_new(lua_State *L) goto out_fail_appctx; } - cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); + cs = cs_new(); if (!cs) { hlua_pusherror(L, "socket: out of memory"); goto out_fail_sess; } + cs_attach_endp(cs, &appctx->obj_type, appctx); strm = stream_new(sess, cs, &BUF_NULL); if (!strm) { @@ -2980,7 +2981,7 @@ __LJMP static int hlua_socket_new(lua_State *L) * and retrieve data from the server. The connection is initialized * with the "struct server". */ - si_set_state(&strm->si[1], SI_ST_ASS); + si_set_state(strm->csb->si, SI_ST_ASS); /* Force destination server. */ strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED; diff --git a/src/hq_interop.c b/src/hq_interop.c index 376779d3e6..c9d8c9a3da 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -72,10 +72,10 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx) htx_add_endof(htx, HTX_BLK_EOH); htx_to_buf(htx, &htx_buf); - cs = cs_new(&qcs->qcc->conn->obj_type); + cs = cs_new(); if (!cs) return -1; - + cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs); cs->ctx = qcs; stream_create_from_cs(cs, &htx_buf); diff --git a/src/http_ana.c b/src/http_ana.c index c502d43980..a3fd15cb74 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -1257,7 +1257,7 @@ static __inline int do_l7_retry(struct stream *s, struct stream_interface *si) res->to_forward = 0; res->analyse_exp = TICK_ETERNITY; res->total = 0; - si_reset_endpoint(cs_si(s->csb)); + cs_detach_endp(s->csb); b_free(&req->buf); /* Swap the L7 buffer with the channel buffer */ diff --git a/src/http_client.c b/src/http_client.c index 9cbecf0686..b84e6178ef 100644 --- a/src/http_client.c +++ b/src/http_client.c @@ -486,11 +486,12 @@ struct appctx *httpclient_start(struct httpclient *hc) ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__); goto out_free_appctx; } - cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); + cs = cs_new(); if (!cs) { ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__); goto out_free_sess; } + cs_attach_endp(cs, &appctx->obj_type, appctx); if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) { ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__); goto out_free_cs; diff --git a/src/http_fetch.c b/src/http_fetch.c index 99dc89a51c..22919d3549 100644 --- a/src/http_fetch.c +++ b/src/http_fetch.c @@ -1186,7 +1186,7 @@ static int smp_fetch_base32(const struct arg *args, struct sample *smp, const ch */ static int smp_fetch_base32_src(const struct arg *args, struct sample *smp, const char *kw, void *private) { - const struct sockaddr_storage *src = (smp->strm ? si_src(&smp->strm->si[0]) : NULL); + const struct sockaddr_storage *src = (smp->strm ? si_src(smp->strm->csf->si) : NULL); struct buffer *temp; if (!src) @@ -2053,7 +2053,7 @@ static int smp_fetch_url32(const struct arg *args, struct sample *smp, const cha */ static int smp_fetch_url32_src(const struct arg *args, struct sample *smp, const char *kw, void *private) { - const struct sockaddr_storage *src = (smp->strm ? si_src(&smp->strm->si[0]) : NULL); + const struct sockaddr_storage *src = (smp->strm ? si_src(smp->strm->csf->si) : NULL); struct buffer *temp; if (!src) diff --git a/src/mux_h1.c b/src/mux_h1.c index 48ba0a3229..bfa923a857 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -682,11 +682,12 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) struct conn_stream *cs; TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s); - cs = cs_new(&h1c->conn->obj_type, h1s, NULL, NULL, NULL); + cs = cs_new(); if (!cs) { TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); goto err; } + cs_attach_endp(cs, &h1c->conn->obj_type, h1s); h1s->cs = cs; if (h1s->flags & H1S_F_NOT_FIRST) diff --git a/src/mux_h2.c b/src/mux_h2.c index 2049b51052..4fed9b2dfc 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -1529,11 +1529,11 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in if (!h2s) goto out; - cs = cs_new(&h2c->conn->obj_type, h2s, NULL, NULL, NULL); + cs = cs_new(); if (!cs) goto out_close; - cs->flags |= CS_FL_NOT_FIRST; + cs_attach_endp(cs, &h2c->conn->obj_type, h2s); h2s->cs = cs; h2c->nb_cs++; diff --git a/src/mux_pt.c b/src/mux_pt.c index 7ba9da92e5..9f1aaa877e 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -291,11 +291,12 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio ctx->conn = conn; if (!cs) { - cs = cs_new(&conn->obj_type, NULL, NULL, NULL, NULL); + cs = cs_new(); if (!cs) { TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn); goto fail_free_ctx; } + cs_attach_endp(cs, &conn->obj_type, NULL); if (!stream_new(conn->owner, cs, &BUF_NULL)) { TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs); diff --git a/src/peers.c b/src/peers.c index 8e7246dc0b..5172214f85 100644 --- a/src/peers.c +++ b/src/peers.c @@ -3204,11 +3204,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer goto out_free_appctx; } - cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); + cs = cs_new(); if (!cs) { ha_alert("out of memory in peer_session_create().\n"); goto out_free_sess; } + cs_attach_endp(cs, &appctx->obj_type, appctx); if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { ha_alert("Failed to initialize stream in peer_session_create().\n"); diff --git a/src/sink.c b/src/sink.c index 7a4751b242..aa0ecfa26e 100644 --- a/src/sink.c +++ b/src/sink.c @@ -655,11 +655,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink goto out_free_appctx; } - cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL); + cs = cs_new(); if (!cs) { ha_alert("out of memory in sink_forward_session_create"); goto out_free_sess; } + cs_attach_endp(cs, &appctx->obj_type, appctx); if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { ha_alert("Failed to initialize stream in sink_forward_session_create().\n"); diff --git a/src/stream.c b/src/stream.c index f7c2f6fbf9..9dbf965374 100644 --- a/src/stream.c +++ b/src/stream.c @@ -438,30 +438,26 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b if (sess->fe->mode == PR_MODE_HTTP) s->flags |= SF_HTX; - cs->app = &s->obj_type; s->csf = cs; - s->csb = cs_new(NULL, NULL, &s->obj_type, &s->si[1], NULL); + s->csb = cs_new(); if (!s->csb) - goto out_fail_alloc_cs; + goto out_fail_alloc_csb; - s->si[0].flags = SI_FL_NONE; - if (si_reset(&s->si[0]) < 0) - goto out_fail_reset_si0; - si_attach_cs(&s->si[0], s->csf); - si_set_state(&s->si[0], SI_ST_EST); - s->si[0].hcto = sess->fe->timeout.clientfin; + if (cs_attach_app(s->csf, &s->obj_type) < 0) + goto out_fail_attach_csf; + if (cs_attach_app(s->csb, &s->obj_type) < 0) + goto out_fail_attach_csb; - if (likely(sess->fe->options2 & PR_O2_INDEPSTR)) - s->si[0].flags |= SI_FL_INDEP_STR; + si_set_state(cs_si(s->csf), SI_ST_EST); + cs_si(s->csf)->hcto = sess->fe->timeout.clientfin; - s->si[1].flags = SI_FL_ISBACK; - if (si_reset(&s->si[1]) < 0) - goto out_fail_reset_si1; - si_attach_cs(&s->si[1], s->csb); - s->si[1].hcto = TICK_ETERNITY; + if (likely(sess->fe->options2 & PR_O2_INDEPSTR)) + cs_si(s->csf)->flags |= SI_FL_INDEP_STR; + cs_si(s->csb)->flags = SI_FL_ISBACK; + cs_si(s->csb)->hcto = TICK_ETERNITY; if (likely(sess->fe->options2 & PR_O2_INDEPSTR)) - s->si[1].flags |= SI_FL_INDEP_STR; + cs_si(s->csb)->flags |= SI_FL_INDEP_STR; if (cs->flags & CS_FL_WEBSOCKET) s->flags |= SF_WEBSOCKET; @@ -470,7 +466,7 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b if (mux) { if (mux->flags & MX_FL_CLEAN_ABRT) - s->si[0].flags |= SI_FL_CLEAN_ABRT; + cs_si(s->csf)->flags |= SI_FL_CLEAN_ABRT; if (mux->flags & MX_FL_HTX) s->flags |= SF_HTX; } @@ -539,10 +535,9 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0) goto out_fail_accept; - s->si[1].l7_buffer = BUF_NULL; /* finish initialization of the accepted file descriptor */ if (cs_appctx(cs)) - si_want_get(&s->si[0]); + si_want_get(cs_si(s->csf)); if (sess->fe->accept && sess->fe->accept(s) < 0) goto out_fail_accept; @@ -571,13 +566,12 @@ struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct b /* Error unrolling */ out_fail_accept: flt_stream_release(s, 0); - tasklet_free(s->si[1].wait_event.tasklet); LIST_DELETE(&s->list); - out_fail_reset_si1: - tasklet_free(s->si[0].wait_event.tasklet); - out_fail_reset_si0: - si_release_endpoint(&s->si[1]); - out_fail_alloc_cs: + out_fail_attach_csb: + si_free(cs_si(s->csf)); + out_fail_attach_csf: + cs_free(s->csb); + out_fail_alloc_csb: task_destroy(t); out_fail_alloc: pool_free(pool_head_stream, s); @@ -722,23 +716,15 @@ static void stream_free(struct stream *s) /* FIXME: Handle it in appctx_free ??? */ must_free_sess = objt_appctx(sess->origin) && sess->origin == s->csf->end; + /* FIXME: ATTENTION, si CSF est librérer avant, ça plante !!!! */ + cs_destroy(s->csb); + cs_destroy(s->csf); - si_release_endpoint(cs_si(s->csb)); - si_release_endpoint(cs_si(s->csf)); - - tasklet_free(s->si[0].wait_event.tasklet); - tasklet_free(s->si[1].wait_event.tasklet); - - b_free(&s->si[1].l7_buffer); if (must_free_sess) { sess->origin = NULL; session_free(sess); } - sockaddr_free(&s->si[0].src); - sockaddr_free(&s->si[0].dst); - sockaddr_free(&s->si[1].src); - sockaddr_free(&s->si[1].dst); pool_free(pool_head_stream, s); /* We may want to free the maximum amount of pools if the proxy is stopping */ @@ -2187,7 +2173,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) } } else { - si_reset_endpoint(si_b); + cs_detach_endp(s->csb); si_b->state = SI_ST_CLO; /* shutw+ini = abort */ channel_shutw_now(req); /* fix buffer flags upon abort */ channel_shutr_now(res); @@ -3157,7 +3143,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st chunk_appendf(&trash, " flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p waiting=%d epoch=%#x\n", - strm->flags, strm->si[1].conn_retries, strm->srv_conn, strm->pend_pos, + strm->flags, strm->csb->si->conn_retries, strm->srv_conn, strm->pend_pos, LIST_INLIST(&strm->buffer_wait.list), strm->stream_epoch); chunk_appendf(&trash, @@ -3253,29 +3239,29 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st chunk_appendf(&trash, " si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s et=0x%03x sub=%d)\n", - &strm->si[0], - si_state_str(strm->si[0].state), - strm->si[0].flags, + strm->csf->si, + si_state_str(strm->csf->si->state), + strm->csf->si->flags, obj_type_name(strm->csf->end), obj_base_ptr(strm->csf->end), - strm->si[0].exp ? - tick_is_expired(strm->si[0].exp, now_ms) ? "" : - human_time(TICKS_TO_MS(strm->si[0].exp - now_ms), + strm->csf->si->exp ? + tick_is_expired(strm->csf->si->exp, now_ms) ? "" : + human_time(TICKS_TO_MS(strm->csf->si->exp - now_ms), TICKS_TO_MS(1000)) : "", - strm->si[0].err_type, strm->si[0].wait_event.events); + strm->csf->si->err_type, strm->csf->si->wait_event.events); chunk_appendf(&trash, " si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s et=0x%03x sub=%d)\n", - &strm->si[1], - si_state_str(strm->si[1].state), - strm->si[1].flags, + strm->csb->si, + si_state_str(strm->csb->si->state), + strm->csb->si->flags, obj_type_name(strm->csb->end), obj_base_ptr(strm->csb->end), - strm->si[1].exp ? - tick_is_expired(strm->si[1].exp, now_ms) ? "" : - human_time(TICKS_TO_MS(strm->si[1].exp - now_ms), + strm->csb->si->exp ? + tick_is_expired(strm->csb->si->exp, now_ms) ? "" : + human_time(TICKS_TO_MS(strm->csb->si->exp - now_ms), TICKS_TO_MS(1000)) : "", - strm->si[1].err_type, strm->si[1].wait_event.events); + strm->csb->si->err_type, strm->csb->si->wait_event.events); cs = strm->csf; chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx); @@ -3650,21 +3636,21 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) conn = cs_conn(curr_strm->csf); chunk_appendf(&trash, " s0=[%d,%1xh,fd=%d,ex=%s]", - curr_strm->si[0].state, - curr_strm->si[0].flags, + curr_strm->csf->si->state, + curr_strm->csf->si->flags, conn ? conn->handle.fd : -1, - curr_strm->si[0].exp ? - human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms), + curr_strm->csf->si->exp ? + human_time(TICKS_TO_MS(curr_strm->csf->si->exp - now_ms), TICKS_TO_MS(1000)) : ""); conn = cs_conn(curr_strm->csb); chunk_appendf(&trash, " s1=[%d,%1xh,fd=%d,ex=%s]", - curr_strm->si[1].state, - curr_strm->si[1].flags, + curr_strm->csb->si->state, + curr_strm->csb->si->flags, conn ? conn->handle.fd : -1, - curr_strm->si[1].exp ? - human_time(TICKS_TO_MS(curr_strm->si[1].exp - now_ms), + curr_strm->csb->si->exp ? + human_time(TICKS_TO_MS(curr_strm->csb->si->exp - now_ms), TICKS_TO_MS(1000)) : ""); chunk_appendf(&trash, diff --git a/src/stream_interface.c b/src/stream_interface.c index 5b86bf8d91..aa3b6d876d 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,9 @@ #include +DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface)); + + /* functions used by default on a detached stream-interface */ static void stream_int_shutr(struct stream_interface *si); static void stream_int_shutw(struct stream_interface *si); @@ -98,6 +102,35 @@ struct data_cb si_conn_cb = { .name = "STRM", }; + +struct stream_interface *si_new(struct conn_stream *cs) +{ + struct stream_interface *si; + + si = pool_alloc(pool_head_streaminterface); + if (unlikely(!si)) + return NULL; + si->flags = SI_FL_NONE; + if (si_reset(si) < 0) { + pool_free(pool_head_streaminterface, si); + return NULL; + } + si->cs = cs; + return si; +} + +void si_free(struct stream_interface *si) +{ + if (!si) + return; + + b_free(&si->l7_buffer); + tasklet_free(si->wait_event.tasklet); + sockaddr_free(&si->src); + sockaddr_free(&si->dst); + pool_free(pool_head_streaminterface, si); +} + /* * This function only has to be called once after a wakeup event in case of * suspected timeout. It controls the stream interface timeouts and sets @@ -309,7 +342,7 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a appctx = appctx_new(app); if (!appctx) return NULL; - si_attach_appctx(si, appctx); + cs_attach_endp(si->cs, &appctx->obj_type, appctx); appctx->t->nice = si_strm(si)->task->nice; si_cant_get(si); appctx_wakeup(appctx); diff --git a/src/tcp_act.c b/src/tcp_act.c index a2ed7d08bd..ecb33d9f3a 100644 --- a/src/tcp_act.c +++ b/src/tcp_act.c @@ -288,7 +288,7 @@ static enum act_return tcp_exec_action_silent_drop(struct act_rule *rule, struct * is present, returning with ERR will cause lingering to be disabled. */ if (strm) - strm->si[0].flags |= SI_FL_NOLINGER; + strm->csf->si->flags |= SI_FL_NOLINGER; /* We're on the client-facing side, we must force to disable lingering to * ensure we will use an RST exclusively and kill any pending data. diff --git a/src/tcp_sample.c b/src/tcp_sample.c index 4a77036fd5..895a1306fd 100644 --- a/src/tcp_sample.c +++ b/src/tcp_sample.c @@ -65,7 +65,7 @@ smp_fetch_src(const struct arg *args, struct sample *smp, const char *kw, void * src = conn_src(conn); } else /* src */ - src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess)); + src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess)); if (!src) return 0; @@ -109,7 +109,7 @@ smp_fetch_sport(const struct arg *args, struct sample *smp, const char *kw, void src = conn_src(conn); } else /* src_port */ - src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess)); + src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess)); if (!src) return 0; @@ -144,7 +144,7 @@ smp_fetch_dst(const struct arg *args, struct sample *smp, const char *kw, void * dst = conn_dst(conn); } else /* dst */ - dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess)); + dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess)); if (!dst) return 0; @@ -181,7 +181,7 @@ int smp_fetch_dst_is_local(const struct arg *args, struct sample *smp, const cha dst = conn_dst(conn); } else /* dst_is_local */ - dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess)); + dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess)); if (!dst) return 0; @@ -207,7 +207,7 @@ int smp_fetch_src_is_local(const struct arg *args, struct sample *smp, const cha src = conn_src(conn); } else /* src_is_local */ - src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess)); + src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess)); if (!src) return 0; @@ -240,7 +240,7 @@ smp_fetch_dport(const struct arg *args, struct sample *smp, const char *kw, void dst = conn_dst(conn); } else /* dst_port */ - dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess)); + dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess)); if (!dst) return 0; diff --git a/src/tcpcheck.c b/src/tcpcheck.c index 3697f41f63..0aab10fed2 100644 --- a/src/tcpcheck.c +++ b/src/tcpcheck.c @@ -1093,7 +1093,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec /* No connection, prepare a new one */ conn = conn_new((s ? &s->obj_type : &proxy->obj_type)); if (conn) - cs = cs_new(&conn->obj_type, conn, &check->obj_type, NULL, &check_conn_cb); + cs = cs_new(); if (!conn || !cs) { chunk_printf(&trash, "TCPCHK error allocating connection at step %d", tcpcheck_get_step_id(check, rule)); @@ -1106,7 +1106,18 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec conn_free(conn); goto out; } - + cs_attach_endp(cs, &conn->obj_type, conn); + if (cs_attach_app(cs, &check->obj_type) < 0) { + chunk_printf(&trash, "TCPCHK error allocating connection at step %d", + tcpcheck_get_step_id(check, rule)); + if (rule->comment) + chunk_appendf(&trash, " comment: '%s'", rule->comment); + set_server_check_status(check, HCHK_STATUS_SOCKERR, trash.area); + ret = TCPCHK_EVAL_STOP; + TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check); + cs_destroy(cs); + goto out; + } tasklet_set_tid(check->wait_list.tasklet, tid); check->cs = cs; -- 2.39.5