From 13a35e57525675c6e422e881b0f88269c5a4a93d Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Mon, 20 Dec 2021 15:34:16 +0100 Subject: [PATCH] MAJOR: conn_stream/stream-int: move the appctx to the conn-stream Thanks to previous changes, it is now possible to set an appctx as endpoint for a conn-stream. This means the appctx is no longer linked to the stream-interface but to the conn-stream. Thus, a pointer to the conn-stream is explicitly stored in the stream-interface. The endpoint (connection or appctx) can be retrieved via the conn-stream. --- include/haproxy/conn_stream.h | 14 +- include/haproxy/stream.h | 4 +- include/haproxy/stream_interface-t.h | 5 +- include/haproxy/stream_interface.h | 195 ++++++++------------------- src/backend.c | 33 ++--- src/cache.c | 2 +- src/cli.c | 2 +- src/connection.c | 6 +- src/dns.c | 16 ++- src/flt_spoe.c | 10 +- src/frontend.c | 2 +- src/hlua.c | 21 ++- src/http_ana.c | 12 +- src/http_client.c | 13 +- src/log.c | 2 +- src/mux_h1.c | 2 +- src/mux_h2.c | 2 +- src/mux_pt.c | 2 +- src/peers.c | 15 ++- src/proxy.c | 2 +- src/resolvers.c | 4 +- src/sink.c | 16 ++- src/ssl_sample.c | 52 +++---- src/ssl_sock.c | 2 +- src/stats.c | 28 ++-- src/stream.c | 177 ++++++++++-------------- src/stream_interface.c | 49 ++++--- src/tcp_sample.c | 10 +- 28 files changed, 318 insertions(+), 380 deletions(-) diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h index dd5281193b..723265a596 100644 --- a/include/haproxy/conn_stream.h +++ b/include/haproxy/conn_stream.h @@ -41,11 +41,21 @@ void cs_free(struct conn_stream *cs); */ static inline void cs_init(struct conn_stream *cs, enum obj_type *endp) { + struct connection *conn = objt_conn(endp); + struct appctx *appctx = objt_appctx(endp); + cs->obj_type = OBJ_TYPE_CS; cs->flags = CS_FL_NONE; cs->end = endp; - if (objt_conn(endp)) - cs->ctx = endp; + if (conn) { + cs->ctx = conn; + if (!conn->ctx) + conn->ctx = cs; + } + else if (appctx) { + cs->ctx = appctx; + /* appctx->owner must be set by the caller for now */ + } cs->data = NULL; cs->data_cb = NULL; } diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h index a60e73f93e..fc7eaa2bc2 100644 --- a/include/haproxy/stream.h +++ b/include/haproxy/stream.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -58,8 +59,7 @@ extern struct pool_head *pool_head_uniqueid; extern struct data_cb sess_conn_cb; -struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input); -int stream_create_from_cs(struct conn_stream *cs, struct buffer *input); +struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct buffer *input); int stream_upgrade_from_cs(struct conn_stream *cs, struct buffer *input); int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_proto); diff --git a/include/haproxy/stream_interface-t.h b/include/haproxy/stream_interface-t.h index f271557d0b..a6e7983600 100644 --- a/include/haproxy/stream_interface-t.h +++ b/include/haproxy/stream_interface-t.h @@ -25,7 +25,8 @@ #include #include #include -#include + +struct conn_stream; /* A stream interface must have its own errors independently of the buffer's, * so that applications can rely on what the buffer reports while the stream @@ -127,7 +128,7 @@ struct stream_interface { enum si_state prev_state;/* SI_ST*, copy of previous state */ /* 16-bit hole here */ unsigned int flags; /* SI_FL_* */ - enum obj_type *end; /* points to the end point (connection or appctx) */ + struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */ struct si_ops *ops; /* general operations at the stream interface layer */ struct sockaddr_storage *src; /* source address (pool), when known, otherwise NULL */ struct sockaddr_storage *dst; /* destination address (pool), when known, otherwise NULL */ diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index 8228badd57..81089660d5 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -28,9 +28,6 @@ #include #include #include -#include -#include - extern struct si_ops si_embedded_ops; extern struct si_ops si_conn_ops; @@ -120,7 +117,7 @@ static inline int si_reset(struct stream_interface *si) si->conn_retries = 0; /* used for logging too */ si->exp = TICK_ETERNITY; si->flags &= SI_FL_ISBACK; - si->end = NULL; + si->cs = NULL; si->state = si->prev_state = SI_ST_INI; si->ops = &si_embedded_ops; si->wait_event.tasklet = tasklet_new(); @@ -155,45 +152,19 @@ static inline int si_state_in(enum si_state state, enum si_state_bit mask) return !!(si_state_bit(state) & mask); } -/* only detaches the endpoint from the SI, which means that it's set to - * NULL and that ->ops is mapped to si_embedded_ops. The previous endpoint - * is returned. - */ -static inline enum obj_type *si_detach_endpoint(struct stream_interface *si) -{ - enum obj_type *prev = si->end; - - si->end = NULL; - si->ops = &si_embedded_ops; - return prev; -} - -/* Reset the endpoint if it's a connection or an applet, For an applet, it is - * for now the same than si_release_endpoint(), the appctx is freed. But for a - * connection, the conn-stream is only detached. +/* Reset the endpoint detaching it from the conn-stream. For a connection + * attached to a mux, it is unsubscribe from any event. */ static inline void si_reset_endpoint(struct stream_interface *si) { - struct conn_stream *cs; - struct connection *conn; - struct appctx *appctx; - - if (!si->end) + if (!si->cs) return; - if ((appctx = objt_appctx(si->end))) { - if (appctx->applet->release && !si_state_in(si->state, SI_SB_DIS|SI_SB_CLO)) - appctx->applet->release(appctx); - appctx_free(appctx); - si_detach_endpoint(si); - } - else if ((cs = objt_cs(si->end))) { - if ((conn = cs_conn(cs)) && si->wait_event.events != 0) - conn->mux->unsubscribe(cs, si->wait_event.events, - &si->wait_event); - cs_detach(cs); - si->ops = &si_embedded_ops; - } + if (cs_conn_mux(si->cs) && si->wait_event.events != 0) + (cs_conn_mux(si->cs))->unsubscribe(si->cs, si->wait_event.events, &si->wait_event); + + cs_detach(si->cs); + si->ops = &si_embedded_ops; } /* Release the endpoint if it's a connection or an applet, then nullify it. @@ -201,63 +172,46 @@ static inline void si_reset_endpoint(struct stream_interface *si) */ static inline void si_release_endpoint(struct stream_interface *si) { - struct conn_stream *cs; - struct connection *conn; - struct appctx *appctx; - - if (!si->end) + if (!si->cs) return; + si_reset_endpoint(si); + cs_free(si->cs); + si->cs = NULL; + si->ops = &si_embedded_ops; - if ((appctx = objt_appctx(si->end))) { - if (appctx->applet->release && !si_state_in(si->state, SI_SB_DIS|SI_SB_CLO)) - appctx->applet->release(appctx); - appctx_free(appctx); - } - else if ((cs = objt_cs(si->end))) { - if ((conn = cs_conn(cs)) && si->wait_event.events != 0) - conn->mux->unsubscribe(cs, si->wait_event.events, - &si->wait_event); - cs_destroy(cs); - } - si_detach_endpoint(si); } -/* Attach conn_stream to the stream interface . The stream interface - * is configured to work with a connection and the connection it configured - * with a stream interface data layer. - */ +/* Attach conn_stream to the stream interface . */ static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs) { - si->ops = &si_conn_ops; - si->end = &cs->obj_type; - cs_attach(cs, si, &si_conn_cb); -} - -static inline struct conn_stream *si_attach_conn(struct stream_interface *si, struct connection *conn) -{ - struct conn_stream *cs; + si->cs = cs; + if (cs_conn(cs)) { + si->ops = &si_conn_ops; + cs_attach(cs, si, &si_conn_cb); + } + else if (cs_appctx(cs)) { + struct appctx *appctx = cs_appctx(cs); - si_reset_endpoint(si); - cs = objt_cs(si->end); - if (!cs) - cs = cs_new(&conn->obj_type); - if (cs) { - cs_init(cs, &conn->obj_type); - if (!conn->ctx) - conn->ctx = cs; - si_attach_cs(si, cs); + si->ops = &si_applet_ops; + appctx->owner = si; + cs_attach(cs, si, NULL); + } + else { + si->ops = &si_embedded_ops; + cs_attach(cs, si, NULL); } - return cs; } -/* Returns true if a connection is attached to the stream interface and - * if this connection is ready. +/* Attach connection to the stream interface . The stream interface + * is configured to work with a connection context. */ -static inline int si_conn_ready(struct stream_interface *si) +static inline void si_attach_conn(struct stream_interface *si, struct connection *conn) { - struct connection *conn = cs_conn(objt_cs(si->end)); - - return conn && conn_ctrl_ready(conn) && conn_xprt_ready(conn); + si_reset_endpoint(si); + cs_init(si->cs, &conn->obj_type); + if (!conn->ctx) + conn->ctx = si->cs; + si_attach_cs(si, si->cs); } /* Attach appctx to the stream interface . The stream interface @@ -265,15 +219,10 @@ static inline int si_conn_ready(struct stream_interface *si) */ static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx) { - si->ops = &si_applet_ops; - si->end = &appctx->obj_type; + si_reset_endpoint(si); + cs_init(si->cs, &appctx->obj_type); appctx->owner = si; -} - -/* returns a pointer to the appctx being run in the SI, which must be valid */ -static inline struct appctx *si_appctx(struct stream_interface *si) -{ - return __objt_appctx(si->end); + si_attach_cs(si, si->cs); } /* call the applet's release function if any. Needs to be called upon close() */ @@ -281,11 +230,21 @@ static inline void si_applet_release(struct stream_interface *si) { struct appctx *appctx; - appctx = objt_appctx(si->end); + appctx = cs_appctx(si->cs); if (appctx && appctx->applet->release && !si_state_in(si->state, SI_SB_DIS|SI_SB_CLO)) appctx->applet->release(appctx); } +/* Returns true if a connection is attached to the stream interface and + * if this connection is ready. + */ +static inline int si_conn_ready(struct stream_interface *si) +{ + struct connection *conn = cs_conn(si->cs); + + return conn && conn_ctrl_ready(conn) && conn_xprt_ready(conn); +} + /* Returns non-zero if the stream interface's Rx path is blocked */ static inline int si_rx_blocked(const struct stream_interface *si) { @@ -420,24 +379,6 @@ static inline void si_done_get(struct stream_interface *si) si->flags &= ~(SI_FL_WANT_GET | SI_FL_WAIT_DATA); } -/* Try to allocate a new conn_stream and assign it to the interface. If - * an endpoint was previously allocated, it is released first. The newly - * allocated conn_stream is initialized, assigned to the stream interface, - * and returned. - */ -static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struct connection *conn) -{ - struct conn_stream *cs; - - si_release_endpoint(si); - - cs = cs_new(&conn->obj_type); - if (cs) - si_attach_cs(si, cs); - - return cs; -} - /* Try to allocate a buffer for the stream-int's input channel. It relies on * channel_alloc_buffer() for this so it abides by its rules. It returns 0 on * failure, non-zero otherwise. If no buffer is available, the requester, @@ -456,25 +397,6 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait return ret; } -/* Release the interface's existing endpoint (connection or appctx) and - * allocate then initialize a new appctx which is assigned to the interface - * and returned. NULL may be returned upon memory shortage. Applet - * is assigned to the appctx, but it may be NULL. - */ -static inline struct appctx *si_alloc_appctx(struct stream_interface *si, struct applet *applet) -{ - struct appctx *appctx; - - si_release_endpoint(si); - appctx = appctx_new(applet); - if (appctx) { - si_attach_appctx(si, appctx); - appctx->t->nice = si_strm(si)->task->nice; - } - - return appctx; -} - /* Sends a shutr to the connection using the data layer */ static inline void si_shutr(struct stream_interface *si) { @@ -523,13 +445,10 @@ static inline void si_chk_rcv(struct stream_interface *si) */ static inline int si_sync_recv(struct stream_interface *si) { - struct conn_stream *cs; - if (!si_state_in(si->state, SI_SB_RDY|SI_SB_EST)) return 0; - cs = objt_cs(si->end); - if (!cs_conn_mux(cs)) + if (!cs_conn_mux(si->cs)) return 0; // only conn_streams are supported if (si->wait_event.events & SUB_RETRY_RECV) @@ -538,7 +457,7 @@ static inline int si_sync_recv(struct stream_interface *si) if (!si_rx_endp_ready(si) || si_rx_blocked(si)) return 0; // already failed - return si_cs_recv(cs); + return si_cs_recv(si->cs); } /* Calls chk_snd on the connection using the data layer */ @@ -624,7 +543,7 @@ static inline const struct sockaddr_storage *si_src(struct stream_interface *si) if (!(si->flags & SI_FL_ISBACK)) return sess_src(strm_sess(si_strm(si))); else { - struct connection *conn = cs_conn(objt_cs(si->end)); + struct connection *conn = cs_conn(si->cs); if (conn) return conn_src(conn); @@ -644,7 +563,7 @@ static inline const struct sockaddr_storage *si_dst(struct stream_interface *si) if (!(si->flags & SI_FL_ISBACK)) return sess_dst(strm_sess(si_strm(si))); else { - struct connection *conn = cs_conn(objt_cs(si->end)); + struct connection *conn = cs_conn(si->cs); if (conn) return conn_dst(conn); @@ -668,7 +587,7 @@ static inline int si_get_src(struct stream_interface *si) if (!(si->flags & SI_FL_ISBACK)) src = sess_src(strm_sess(si_strm(si))); else { - struct connection *conn = cs_conn(objt_cs(si->end)); + struct connection *conn = cs_conn(si->cs); if (conn) src = conn_src(conn); @@ -699,7 +618,7 @@ static inline int si_get_dst(struct stream_interface *si) if (!(si->flags & SI_FL_ISBACK)) dst = sess_dst(strm_sess(si_strm(si))); else { - struct connection *conn = cs_conn(objt_cs(si->end)); + struct connection *conn = cs_conn(si->cs); if (conn) dst = conn_dst(conn); diff --git a/src/backend.c b/src/backend.c index f289337c9e..8367a8ed9e 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1251,7 +1251,6 @@ int connect_server(struct stream *s) { struct connection *cli_conn = objt_conn(strm_orig(s)); struct connection *srv_conn = NULL; - struct conn_stream *srv_cs = NULL; struct server *srv; const int reuse_mode = s->be->options & PR_O_REUSE_MASK; int reuse = 0; @@ -1496,15 +1495,11 @@ int connect_server(struct stream *s) } if (avail >= 1) { - srv_cs = si_attach_conn(&s->si[1], srv_conn); - if (srv_cs) { - if (srv_conn->mux->attach(srv_conn, srv_cs, s->sess) == -1) { - srv_conn = NULL; - cs_init(srv_cs, NULL); - } - } - else + si_attach_conn(&s->si[1], srv_conn); + if (srv_conn->mux->attach(srv_conn, s->si[1].cs, s->sess) == -1) { + si_reset_endpoint(&s->si[1]); srv_conn = NULL; + } } else srv_conn = NULL; @@ -1518,8 +1513,6 @@ skip_reuse: /* no reuse or failed to reuse the connection above, pick a new one */ if (!srv_conn) { srv_conn = conn_new(s->target); - srv_cs = NULL; - if (srv_conn) { DBG_TRACE_STATE("alloc new be connection", STRM_EV_STRM_PROC|STRM_EV_SI_ST, s); srv_conn->owner = s->sess; @@ -1578,11 +1571,7 @@ skip_reuse: return SF_ERR_INTERNAL; /* how did we get there ? */ } - srv_cs = si_attach_conn(&s->si[1], srv_conn); - if (!srv_cs) { - conn_free(srv_conn); - return SF_ERR_RESOURCE; - } + si_attach_conn(&s->si[1], srv_conn); #if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation) if (!srv || (srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) || @@ -1690,7 +1679,7 @@ skip_reuse: if (init_mux) { const struct mux_ops *alt_mux = likely(!(s->flags & SF_WEBSOCKET)) ? NULL : srv_get_ws_proto(srv); - if (conn_install_mux_be(srv_conn, srv_cs, s->sess, alt_mux) < 0) { + if (conn_install_mux_be(srv_conn, s->si[1].cs, s->sess, alt_mux) < 0) { conn_full_close(srv_conn); return SF_ERR_INTERNAL; } @@ -1752,7 +1741,7 @@ skip_reuse: * sockets, socket pairs, and occasionally TCP connections on the * loopback on a heavily loaded system. */ - if ((srv_conn->flags & CO_FL_ERROR || srv_cs->flags & CS_FL_ERROR)) + if ((srv_conn->flags & CO_FL_ERROR || (s->si[1].cs)->flags & CS_FL_ERROR)) s->si[1].flags |= SI_FL_ERR; /* If we had early data, and the handshake ended, then @@ -1761,7 +1750,7 @@ skip_reuse: * the handshake. */ if (!(srv_conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))) - srv_cs->flags &= ~CS_FL_WAIT_FOR_HS; + (s->si[1].cs)->flags &= ~CS_FL_WAIT_FOR_HS; if (!si_state_in(s->si[1].state, SI_SB_EST|SI_SB_DIS|SI_SB_CLO) && (srv_conn->flags & CO_FL_WAIT_XPRT) == 0) { @@ -1778,7 +1767,7 @@ skip_reuse: * wake callback. Otherwise si_cs_recv()/si_cs_send() already take * care of it. */ - if ((srv_cs->flags & CS_FL_EOI) && !(si_ic(&s->si[1])->flags & CF_EOI)) + if (((s->si[1].cs)->flags & CS_FL_EOI) && !(si_ic(&s->si[1])->flags & CF_EOI)) si_ic(&s->si[1])->flags |= (CF_EOI|CF_READ_PARTIAL); /* catch all sync connect while the mux is not already installed */ @@ -2096,7 +2085,7 @@ void back_handle_st_req(struct stream *s) if (unlikely(obj_type(s->target) == OBJ_TYPE_APPLET)) { /* the applet directly goes to the EST state */ - struct appctx *appctx = objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); if (!appctx || appctx->applet != __objt_applet(s->target)) appctx = si_register_handler(si, objt_applet(s->target)); @@ -2231,7 +2220,7 @@ void back_handle_st_cer(struct stream *s) /* we probably have to release last stream from the server */ if (objt_server(s->target)) { - struct connection *conn = cs_conn(objt_cs(si->end)); + struct connection *conn = cs_conn(si->cs); health_adjust(__objt_server(s->target), HANA_STATUS_L4_ERR); diff --git a/src/cache.c b/src/cache.c index 04e7327663..d73696912e 100644 --- a/src/cache.c +++ b/src/cache.c @@ -2649,7 +2649,7 @@ smp_fetch_res_cache_name(const struct arg *args, struct sample *smp, return 0; /* Get appctx from the stream_interface. */ - appctx = si_appctx(&smp->strm->si[1]); + appctx = cs_appctx(smp->strm->si[1].cs); if (appctx && appctx->rule) { cconf = appctx->rule->arg.act.p[0]; if (cconf) { diff --git a/src/cli.c b/src/cli.c index 541bad8f3a..c8891129e7 100644 --- a/src/cli.c +++ b/src/cli.c @@ -1920,7 +1920,7 @@ static int _getsocks(char **args, char *payload, struct appctx *appctx, void *pr struct cmsghdr *cmsg; struct stream_interface *si = appctx->owner; struct stream *s = si_strm(si); - struct connection *remote = cs_conn(objt_cs(si_opposite(si)->end)); + struct connection *remote = cs_conn(si_opposite(si)->cs); struct msghdr msghdr; struct iovec iov; struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; diff --git a/src/connection.c b/src/connection.c index 9d125627d0..ee135baf4a 100644 --- a/src/connection.c +++ b/src/connection.c @@ -2018,7 +2018,7 @@ smp_fetch_fc_http_major(const struct arg *args, struct sample *smp, const char * conn = (kw[0] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[0] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; /* No connection or a connection with a RAW muxx */ if (!conn || (conn->mux && !(conn->mux->flags & MX_FL_HTX))) @@ -2115,7 +2115,7 @@ int smp_fetch_fc_err(const struct arg *args, struct sample *smp, const char *kw, conn = (kw[0] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[0] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; if (!conn) return 0; @@ -2142,7 +2142,7 @@ int smp_fetch_fc_err_str(const struct arg *args, struct sample *smp, const char conn = (kw[0] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[0] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; if (!conn) return 0; diff --git a/src/dns.c b/src/dns.c index 5a0724b656..2caa5ad27b 100644 --- a/src/dns.c +++ b/src/dns.c @@ -887,6 +887,7 @@ static struct appctx *dns_session_create(struct dns_session *ds) { struct appctx *appctx; struct session *sess; + struct conn_stream *cs; struct stream *s; struct applet *applet = &dns_session_applet; @@ -898,15 +899,21 @@ static struct appctx *dns_session_create(struct dns_session *ds) sess = session_new(ds->dss->srv->proxy, NULL, &appctx->obj_type); if (!sess) { - ha_alert("out of memory in peer_session_create().\n"); + ha_alert("out of memory in dns_session_create().\n"); goto out_free_appctx; } - if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) { - ha_alert("Failed to initialize stream in peer_session_create().\n"); + cs = cs_new(&appctx->obj_type); + if (!cs) { + ha_alert("out of memory in dns_session_create().\n"); goto out_free_sess; } + if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { + ha_alert("Failed to initialize stream in dns_session_create().\n"); + goto out_free_cs; + } + s->target = &ds->dss->srv->obj_type; if (!sockaddr_alloc(&s->si[1].dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr))) @@ -924,13 +931,14 @@ static struct appctx *dns_session_create(struct dns_session *ds) s->res.rto = TICK_ETERNITY; s->res.rex = TICK_ETERNITY; ds->appctx = appctx; - task_wakeup(s->task, TASK_WOKEN_INIT); return appctx; /* Error unrolling */ out_free_strm: LIST_DELETE(&s->list); pool_free(pool_head_stream, s); + out_free_cs: + cs_free(cs); out_free_sess: session_free(sess); out_free_appctx: diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 0badee3f8c..8b2de432c6 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1988,6 +1988,7 @@ spoe_create_appctx(struct spoe_config *conf) { struct appctx *appctx; struct session *sess; + struct conn_stream *cs; struct stream *strm; if ((appctx = appctx_new(&spoe_applet)) == NULL) @@ -2023,9 +2024,13 @@ spoe_create_appctx(struct spoe_config *conf) if (!sess) goto out_free_spoe; - if ((strm = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) + cs = cs_new(&appctx->obj_type); + if (!cs) goto out_free_sess; + if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL) + goto out_free_cs; + stream_set_backend(strm, conf->agent->b.be); /* applet is waiting for data */ @@ -2041,10 +2046,11 @@ spoe_create_appctx(struct spoe_config *conf) _HA_ATOMIC_INC(&conf->agent->counters.applets); task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT); - task_wakeup(strm->task, TASK_WOKEN_INIT); return appctx; /* Error unrolling */ + out_free_cs: + cs_free(cs); out_free_sess: session_free(sess); out_free_spoe: diff --git a/src/frontend.c b/src/frontend.c index c8ea0d43a2..56371da84a 100644 --- a/src/frontend.c +++ b/src/frontend.c @@ -105,7 +105,7 @@ int frontend_accept(struct stream *s) int alpn_len; /* try to report the ALPN value when available (also works for NPN) */ - if (conn == cs_conn(objt_cs(s->si[0].end))) { + if (conn == cs_conn(s->si[0].cs)) { if (conn_get_alpn(conn, &alpn_str, &alpn_len) && alpn_str) { int len = MIN(alpn_len, sizeof(alpn) - 1); memcpy(alpn, alpn_str, len); diff --git a/src/hlua.c b/src/hlua.c index 8c7dc61c99..c61e666678 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2617,7 +2617,7 @@ static int hlua_socket_getsockname(struct lua_State *L) si = appctx->owner; s = si_strm(si); - conn = cs_conn(objt_cs(s->si[1].end)); + conn = cs_conn(s->si[1].cs); if (!conn || !conn_get_src(conn)) { xref_unlock(&socket->xref, peer); lua_pushnil(L); @@ -2684,7 +2684,7 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua return 2; } - appctx = __objt_appctx(s->si[0].end); + appctx = cs_appctx(s->si[0].cs); /* Check for connection established. */ if (appctx->ctx.hlua_cosocket.connected) { @@ -2916,6 +2916,7 @@ __LJMP static int hlua_socket_new(lua_State *L) struct hlua_socket *socket; struct appctx *appctx; struct session *sess; + struct conn_stream *cs; struct stream *strm; /* Check stack size. */ @@ -2956,14 +2957,20 @@ __LJMP static int hlua_socket_new(lua_State *L) /* Now create a session, task and stream for this applet */ sess = session_new(socket_proxy, NULL, &appctx->obj_type); if (!sess) { + hlua_pusherror(L, "socket: out of memory"); + goto out_fail_appctx; + } + + cs = cs_new(&appctx->obj_type); + if (!cs) { hlua_pusherror(L, "socket: out of memory"); goto out_fail_sess; } - strm = stream_new(sess, &appctx->obj_type, &BUF_NULL); + strm = stream_new(sess, cs, &BUF_NULL); if (!strm) { hlua_pusherror(L, "socket: out of memory"); - goto out_fail_stream; + goto out_fail_cs; } /* Initialise cross reference between stream and Lua socket object. */ @@ -2981,9 +2988,11 @@ __LJMP static int hlua_socket_new(lua_State *L) return 1; - out_fail_stream: - session_free(sess); + out_fail_cs: + cs_free(cs); out_fail_sess: + session_free(sess); + out_fail_appctx: appctx_free(appctx); out_fail_conf: WILL_LJMP(lua_error(L)); diff --git a/src/http_ana.c b/src/http_ana.c index 848750c930..f0f49a70b8 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -1325,7 +1325,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) if (unlikely(htx_is_empty(htx) || htx->first == -1)) { /* 1: have we encountered a read error ? */ if (rep->flags & CF_READ_ERROR) { - struct connection *conn = cs_conn(objt_cs(s->si[1].end)); + struct connection *conn = cs_conn(s->si[1].cs); /* Perform a L7 retry because server refuses the early data. */ if ((si_b->flags & SI_FL_L7_RETRY) && @@ -1656,7 +1656,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) /* check for NTML authentication headers in 401 (WWW-Authenticate) and * 407 (Proxy-Authenticate) responses and set the connection to private */ - srv_conn = cs_conn(objt_cs(s->si[1].end)); + srv_conn = cs_conn(s->si[1].cs); if (srv_conn) { struct ist hdr; struct http_hdr_ctx ctx; @@ -3883,7 +3883,7 @@ static int http_handle_stats(struct stream *s, struct channel *req) struct htx *htx; struct htx_sl *sl; - appctx = si_appctx(si); + appctx = cs_appctx(si->cs); memset(&appctx->ctx.stats, 0, sizeof(appctx->ctx.stats)); appctx->st1 = appctx->st2 = 0; appctx->ctx.stats.st_code = STAT_STATUS_INIT; @@ -5004,7 +5004,7 @@ static void http_debug_stline(const char *dir, struct stream *s, const struct ht chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id, dir, objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1, - cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1); + cs_conn(s->si[1].cs) ? (unsigned short)(cs_conn(s->si[1].cs))->handle.fd : -1); max = HTX_SL_P1_LEN(sl); UBOUND(max, trash.size - trash.data - 3); @@ -5035,7 +5035,7 @@ static void http_debug_hdr(const char *dir, struct stream *s, const struct ist n chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id, dir, objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1, - cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1); + cs_conn(s->si[1].cs) ? (unsigned short)(cs_conn(s->si[1].cs))->handle.fd : -1); max = n.len; UBOUND(max, trash.size - trash.data - 3); @@ -5091,7 +5091,7 @@ void http_txn_reset_res(struct http_txn *txn) struct http_txn *http_create_txn(struct stream *s) { struct http_txn *txn; - struct conn_stream *cs = objt_cs(s->si[0].end); + struct conn_stream *cs = s->si[0].cs; txn = pool_alloc(pool_head_http_txn); if (!txn) diff --git a/src/http_client.c b/src/http_client.c index 1877a0f732..64e5190bec 100644 --- a/src/http_client.c +++ b/src/http_client.c @@ -454,6 +454,7 @@ struct appctx *httpclient_start(struct httpclient *hc) struct applet *applet = &httpclient_applet; struct appctx *appctx; struct session *sess; + struct conn_stream *cs; struct stream *s; int len; struct sockaddr_storage ss_url; @@ -485,9 +486,14 @@ struct appctx *httpclient_start(struct httpclient *hc) ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__); goto out_free_appctx; } - if ((s = stream_new(sess, &appctx->obj_type, &hc->req.buf)) == NULL) { + cs = cs_new(&appctx->obj_type); + if (!cs) { + ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__); + goto out_free_sess; + } + if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) { ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__); - goto out_free_appctx; + goto out_free_cs; } /* set the "timeout server" */ @@ -528,7 +534,6 @@ struct appctx *httpclient_start(struct httpclient *hc) si_cant_get(&s->si[0]); appctx_wakeup(appctx); - task_wakeup(s->task, TASK_WOKEN_INIT); hc->appctx = appctx; hc->flags |= HTTPCLIENT_FS_STARTED; appctx->ctx.httpclient.ptr = hc; @@ -543,6 +548,8 @@ struct appctx *httpclient_start(struct httpclient *hc) out_free_stream: LIST_DELETE(&s->list); pool_free(pool_head_stream, s); +out_free_cs: + cs_free(cs); out_free_sess: session_free(sess); out_free_appctx: diff --git a/src/log.c b/src/log.c index 92eef6ebeb..0213435c23 100644 --- a/src/log.c +++ b/src/log.c @@ -1970,7 +1970,7 @@ int sess_build_logline(struct session *sess, struct stream *s, char *dst, size_t if (likely(s)) { be = s->be; txn = s->txn; - be_conn = cs_conn(objt_cs(s->si[1].end)); + be_conn = cs_conn(s->si[1].cs); status = (txn ? txn->status : 0); s_flags = s->flags; uniq_id = s->uniq_id; diff --git a/src/mux_h1.c b/src/mux_h1.c index cc5336498f..5fdcd468d4 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -696,7 +696,7 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) if (h1s->req.flags & H1_MF_UPG_WEBSOCKET) cs->flags |= CS_FL_WEBSOCKET; - if (stream_create_from_cs(cs, input) < 0) { + if (!stream_new(h1c->conn->owner, cs, input)) { TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s); goto err; } diff --git a/src/mux_h2.c b/src/mux_h2.c index d14279310e..3a98e5d2c0 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -1550,7 +1550,7 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in * request. We can set the value now, it will be copied by stream_new(). */ sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake; - if (stream_create_from_cs(cs, input) < 0) + if (!stream_new(h2c->conn->owner, cs, input)) goto out_free_cs; /* We want the accept date presented to the next stream to be the one diff --git a/src/mux_pt.c b/src/mux_pt.c index e048a49b87..13e60d84da 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -297,7 +297,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio goto fail_free_ctx; } - if (stream_create_from_cs(cs, &BUF_NULL) < 0) { + if (!stream_new(conn->owner, cs, &BUF_NULL)) { TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs); goto fail_free; } diff --git a/src/peers.c b/src/peers.c index deb5a89f39..df3fd9eb6c 100644 --- a/src/peers.c +++ b/src/peers.c @@ -3181,6 +3181,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer struct proxy *p = peers->peers_fe; /* attached frontend */ struct appctx *appctx; struct session *sess; + struct conn_stream *cs; struct stream *s; peer->new_conn++; @@ -3203,11 +3204,17 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer goto out_free_appctx; } - if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) { - ha_alert("Failed to initialize stream in peer_session_create().\n"); + cs = cs_new(&appctx->obj_type); + if (!cs) { + ha_alert("out of memory in peer_session_create().\n"); goto out_free_sess; } + if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { + ha_alert("Failed to initialize stream in peer_session_create().\n"); + goto out_free_cs; + } + /* applet is waiting for data */ si_cant_get(&s->si[0]); appctx_wakeup(appctx); @@ -3225,7 +3232,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer s->res.flags |= CF_READ_DONTWAIT; peer->appctx = appctx; - task_wakeup(s->task, TASK_WOKEN_INIT); _HA_ATOMIC_INC(&active_peers); return appctx; @@ -3233,6 +3239,8 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer out_free_strm: LIST_DELETE(&s->list); pool_free(pool_head_stream, s); + out_free_cs: + cs_free(cs); out_free_sess: session_free(sess); out_free_appctx: @@ -4019,4 +4027,3 @@ static struct cli_kw_list cli_kws = {{ }, { /* Register cli keywords */ INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws); - diff --git a/src/proxy.c b/src/proxy.c index b874cb1edd..a97b3cd80e 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -2612,7 +2612,7 @@ static void dump_server_addr(const struct sockaddr_storage *addr, char *addr_str */ static int dump_servers_state(struct stream_interface *si) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct proxy *px = appctx->ctx.cli.p0; struct server *srv; char srv_addr[INET6_ADDRSTRLEN + 1]; diff --git a/src/resolvers.c b/src/resolvers.c index 097d007913..43e755b2c9 100644 --- a/src/resolvers.c +++ b/src/resolvers.c @@ -2586,7 +2586,7 @@ static int stats_dump_resolv_to_buffer(struct stream_interface *si, struct field *stats, size_t stats_count, struct list *stat_modules) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct channel *rep = si_ic(si); struct stats_module *mod; size_t idx = 0; @@ -2620,7 +2620,7 @@ int stats_dump_resolvers(struct stream_interface *si, struct field *stats, size_t stats_count, struct list *stat_modules) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct channel *rep = si_ic(si); struct resolvers *resolver = appctx->ctx.stats.obj1; struct dns_nameserver *ns = appctx->ctx.stats.obj2; diff --git a/src/sink.c b/src/sink.c index c1da18c8d4..a3464abbad 100644 --- a/src/sink.c +++ b/src/sink.c @@ -636,6 +636,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink struct proxy *p = sink->forward_px; struct appctx *appctx; struct session *sess; + struct conn_stream *cs; struct stream *s; struct applet *applet = &sink_forward_applet; @@ -650,15 +651,21 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink sess = session_new(p, NULL, &appctx->obj_type); if (!sess) { - ha_alert("out of memory in peer_session_create().\n"); + ha_alert("out of memory in sink_forward_session_create().\n"); goto out_free_appctx; } - if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) { - ha_alert("Failed to initialize stream in peer_session_create().\n"); + cs = cs_new(&appctx->obj_type); + if (!cs) { + ha_alert("out of memory in sink_forward_session_create"); goto out_free_sess; } + if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) { + ha_alert("Failed to initialize stream in sink_forward_session_create().\n"); + goto out_free_cs; + } + s->target = &sft->srv->obj_type; if (!sockaddr_alloc(&s->si[1].dst, &sft->srv->addr, sizeof(sft->srv->addr))) @@ -676,13 +683,14 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink s->res.rto = TICK_ETERNITY; s->res.rex = TICK_ETERNITY; sft->appctx = appctx; - task_wakeup(s->task, TASK_WOKEN_INIT); return appctx; /* Error unrolling */ out_free_strm: LIST_DELETE(&s->list); pool_free(pool_head_stream, s); + out_free_cs: + cs_free(cs); out_free_sess: session_free(sess); out_free_appctx: diff --git a/src/ssl_sample.c b/src/ssl_sample.c index 0ba14e90a5..0a78ecb90a 100644 --- a/src/ssl_sample.c +++ b/src/ssl_sample.c @@ -529,7 +529,7 @@ smp_fetch_ssl_x_der(const struct arg *args, struct sample *smp, const char *kw, SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); @@ -584,7 +584,7 @@ smp_fetch_ssl_x_chain_der(const struct arg *args, struct sample *smp, const char int i; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); @@ -647,7 +647,7 @@ smp_fetch_ssl_x_serial(const struct arg *args, struct sample *smp, const char *k SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); ssl = ssl_sock_get_ssl_object(conn); @@ -700,7 +700,7 @@ smp_fetch_ssl_x_sha1(const struct arg *args, struct sample *smp, const char *kw, SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); @@ -751,7 +751,7 @@ smp_fetch_ssl_x_notafter(const struct arg *args, struct sample *smp, const char SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); @@ -803,7 +803,7 @@ smp_fetch_ssl_x_i_dn(const struct arg *args, struct sample *smp, const char *kw, SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); @@ -871,7 +871,7 @@ smp_fetch_ssl_x_notbefore(const struct arg *args, struct sample *smp, const char SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); @@ -923,7 +923,7 @@ smp_fetch_ssl_x_s_dn(const struct arg *args, struct sample *smp, const char *kw, SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); @@ -1020,7 +1020,7 @@ smp_fetch_ssl_x_version(const struct arg *args, struct sample *smp, const char * SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); ssl = ssl_sock_get_ssl_object(conn); @@ -1065,7 +1065,7 @@ smp_fetch_ssl_x_sig_alg(const struct arg *args, struct sample *smp, const char * SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); @@ -1122,7 +1122,7 @@ smp_fetch_ssl_x_key_alg(const struct arg *args, struct sample *smp, const char * SSL *ssl; if (conn_server) - conn = smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + conn = smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; else conn = objt_conn(smp->sess->origin); ssl = ssl_sock_get_ssl_object(conn); @@ -1174,7 +1174,7 @@ smp_fetch_ssl_fc(const struct arg *args, struct sample *smp, const char *kw, voi conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; smp->data.type = SMP_T_BOOL; smp->data.u.sint = (conn && conn->xprt == &ssl_sock); @@ -1211,7 +1211,7 @@ smp_fetch_ssl_fc_is_resumed(const struct arg *args, struct sample *smp, const ch conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; ssl = ssl_sock_get_ssl_object(conn); @@ -1234,7 +1234,7 @@ smp_fetch_ssl_fc_cipher(const struct arg *args, struct sample *smp, const char * conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; smp->flags = 0; ssl = ssl_sock_get_ssl_object(conn); @@ -1268,7 +1268,7 @@ smp_fetch_ssl_fc_alg_keysize(const struct arg *args, struct sample *smp, const c conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; smp->flags = 0; ssl = ssl_sock_get_ssl_object(conn); @@ -1299,7 +1299,7 @@ smp_fetch_ssl_fc_use_keysize(const struct arg *args, struct sample *smp, const c conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; smp->flags = 0; ssl = ssl_sock_get_ssl_object(conn); @@ -1331,7 +1331,7 @@ smp_fetch_ssl_fc_npn(const struct arg *args, struct sample *smp, const char *kw, conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; ssl = ssl_sock_get_ssl_object(conn); if (!ssl) @@ -1366,7 +1366,7 @@ smp_fetch_ssl_fc_alpn(const struct arg *args, struct sample *smp, const char *kw conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; ssl = ssl_sock_get_ssl_object(conn); if (!ssl) @@ -1399,7 +1399,7 @@ smp_fetch_ssl_fc_protocol(const struct arg *args, struct sample *smp, const char conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; smp->flags = 0; ssl = ssl_sock_get_ssl_object(conn); @@ -1437,7 +1437,7 @@ smp_fetch_ssl_fc_session_id(const struct arg *args, struct sample *smp, const ch conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; ssl = ssl_sock_get_ssl_object(conn); if (!ssl) @@ -1469,7 +1469,7 @@ smp_fetch_ssl_fc_random(const struct arg *args, struct sample *smp, const char * conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; ssl = ssl_sock_get_ssl_object(conn); if (!ssl) @@ -1506,7 +1506,7 @@ smp_fetch_ssl_fc_session_key(const struct arg *args, struct sample *smp, const c conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; ssl = ssl_sock_get_ssl_object(conn); if (!ssl) @@ -1655,7 +1655,7 @@ smp_fetch_ssl_fc_err(const struct arg *args, struct sample *smp, const char *kw, conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; if (!conn || conn->xprt != &ssl_sock) return 0; @@ -1708,7 +1708,7 @@ smp_fetch_ssl_fc_err_str(const struct arg *args, struct sample *smp, const char conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; if (!conn || conn->xprt != &ssl_sock) return 0; @@ -1841,7 +1841,7 @@ static int smp_fetch_ssl_x_keylog(const struct arg *args, struct sample *smp, co const char *sfx; conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; if (!conn) return 0; @@ -1938,7 +1938,7 @@ smp_fetch_ssl_fc_unique_id(const struct arg *args, struct sample *smp, const cha conn = (kw[4] == 'b') ? cs_conn(__objt_check(smp->sess->origin)->cs) : NULL; else conn = (kw[4] != 'b') ? objt_conn(smp->sess->origin) : - smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)) : NULL; + smp->strm ? cs_conn(smp->strm->si[1].cs) : NULL; smp->flags = 0; ssl = ssl_sock_get_ssl_object(conn); diff --git a/src/ssl_sock.c b/src/ssl_sock.c index 460eb6019e..b79fc71d41 100644 --- a/src/ssl_sock.c +++ b/src/ssl_sock.c @@ -7735,7 +7735,7 @@ enum act_return ssl_action_wait_for_hs(struct act_rule *rule, struct proxy *px, struct conn_stream *cs; conn = objt_conn(sess->origin); - cs = objt_cs(s->si[0].end); + cs = s->si[0].cs; if (conn && cs) { if (conn->flags & (CO_FL_EARLY_SSL_HS | CO_FL_SSL_WAIT_HS)) { diff --git a/src/stats.c b/src/stats.c index bacde28081..1600ef4c83 100644 --- a/src/stats.c +++ b/src/stats.c @@ -1812,7 +1812,7 @@ int stats_fill_fe_stats(struct proxy *px, struct field *stats, int len, */ static int stats_dump_fe_stats(struct stream_interface *si, struct proxy *px) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct field *stats = stat_l[STATS_DOMAIN_PROXY]; struct stats_module *mod; size_t stats_count = ST_F_TOTAL_FIELDS; @@ -1979,7 +1979,7 @@ int stats_fill_li_stats(struct proxy *px, struct listener *l, int flags, */ static int stats_dump_li_stats(struct stream_interface *si, struct proxy *px, struct listener *l) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct field *stats = stat_l[STATS_DOMAIN_PROXY]; struct stats_module *mod; size_t stats_count = ST_F_TOTAL_FIELDS; @@ -2490,7 +2490,7 @@ int stats_fill_sv_stats(struct proxy *px, struct server *sv, int flags, */ static int stats_dump_sv_stats(struct stream_interface *si, struct proxy *px, struct server *sv) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct stats_module *mod; struct field *stats = stat_l[STATS_DOMAIN_PROXY]; size_t stats_count = ST_F_TOTAL_FIELDS; @@ -2815,7 +2815,7 @@ int stats_fill_be_stats(struct proxy *px, int flags, struct field *stats, int le */ static int stats_dump_be_stats(struct stream_interface *si, struct proxy *px) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct field *stats = stat_l[STATS_DOMAIN_PROXY]; struct stats_module *mod; size_t stats_count = ST_F_TOTAL_FIELDS; @@ -2856,7 +2856,7 @@ static int stats_dump_be_stats(struct stream_interface *si, struct proxy *px) */ static void stats_dump_html_px_hdr(struct stream_interface *si, struct proxy *px) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); char scope_txt[STAT_SCOPE_TXT_MAXLEN + sizeof STAT_SCOPE_PATTERN]; struct stats_module *mod; int stats_module_len = 0; @@ -2965,7 +2965,7 @@ static void stats_dump_html_px_hdr(struct stream_interface *si, struct proxy *px */ static void stats_dump_html_px_end(struct stream_interface *si, struct proxy *px) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); chunk_appendf(&trash, ""); if ((px->cap & PR_CAP_BE) && px->srv && (appctx->ctx.stats.flags & STAT_ADMIN)) { @@ -3007,7 +3007,7 @@ static void stats_dump_html_px_end(struct stream_interface *si, struct proxy *px int stats_dump_proxy_to_buffer(struct stream_interface *si, struct htx *htx, struct proxy *px, struct uri_auth *uri) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct stream *s = si_strm(si); struct channel *rep = si_ic(si); struct server *sv, *svs; /* server and server-state, server-state=server or server->track */ @@ -3380,7 +3380,7 @@ static void stats_dump_html_head(struct appctx *appctx, struct uri_auth *uri) */ static void stats_dump_html_info(struct stream_interface *si, struct uri_auth *uri) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); unsigned int up = (now.tv_sec - start_date.tv_sec); char scope_txt[STAT_SCOPE_TXT_MAXLEN + sizeof STAT_SCOPE_PATTERN]; const char *scope_ptr = stats_scope_ptr(appctx, si); @@ -3662,7 +3662,7 @@ static int stats_dump_proxies(struct stream_interface *si, struct htx *htx, struct uri_auth *uri) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct channel *rep = si_ic(si); struct proxy *px; @@ -3709,7 +3709,7 @@ static int stats_dump_proxies(struct stream_interface *si, static int stats_dump_stat_to_buffer(struct stream_interface *si, struct htx *htx, struct uri_auth *uri) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct channel *rep = si_ic(si); enum stats_domain domain = appctx->ctx.stats.domain; @@ -3811,7 +3811,7 @@ static int stats_dump_stat_to_buffer(struct stream_interface *si, struct htx *ht static int stats_process_http_post(struct stream_interface *si) { struct stream *s = si_strm(si); - struct appctx *appctx = objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct proxy *px = NULL; struct server *sv = NULL; @@ -4147,7 +4147,7 @@ static int stats_send_http_headers(struct stream_interface *si, struct htx *htx) { struct stream *s = si_strm(si); struct uri_auth *uri = s->be->uri_auth; - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct htx_sl *sl; unsigned int flags; @@ -4201,7 +4201,7 @@ static int stats_send_http_redirect(struct stream_interface *si, struct htx *htx char scope_txt[STAT_SCOPE_TXT_MAXLEN + sizeof STAT_SCOPE_PATTERN]; struct stream *s = si_strm(si); struct uri_auth *uri = s->be->uri_auth; - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct htx_sl *sl; unsigned int flags; @@ -4516,7 +4516,7 @@ int stats_fill_info(struct field *info, int len, uint flags) */ static int stats_dump_info_to_buffer(struct stream_interface *si) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); if (!stats_fill_info(info, INF_TOTAL_FIELDS, appctx->ctx.stats.flags)) return 0; diff --git a/src/stream.c b/src/stream.c index 10cac1e316..622d6b82f5 100644 --- a/src/stream.c +++ b/src/stream.c @@ -266,47 +266,24 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace } } -/* Create a new stream for connection . Return < 0 on error. This is only - * valid right after the handshake, before the connection's data layer is - * initialized, because it relies on the session to be in conn->owner. On +/* Upgrade an existing stream for conn-stream . Return < 0 on error. This + * is only valid right after a TCP to H1 upgrade. The stream should be + * "reativated" by removing SF_IGNORE flag. And the right mode must be set. On * success, buffer is transferred to the stream and thus points to * BUF_NULL. On error, it is unchanged and it is the caller responsibility to - * release it. - */ -int stream_create_from_cs(struct conn_stream *cs, struct buffer *input) -{ - struct connection *conn = cs_conn(cs); - struct stream *strm; - - if (!conn) - return -1; - - strm = stream_new(conn->owner, &cs->obj_type, input); - if (strm == NULL) - return -1; - - task_wakeup(strm->task, TASK_WOKEN_INIT); - return 0; -} - -/* Upgrade an existing TCP stream for connection . Return < 0 on error. - * This is only valid right after a TCP to H1 upgrade. The stream should be - * "reativated" by removing SF_IGNORE flag. And the right mode must be set. - * On success, buffer is transferred to the stream and thus points to - * BUF_NULL. On error, it is unchanged and it is the caller responsibility to * release it (this never happens for now). */ int stream_upgrade_from_cs(struct conn_stream *cs, struct buffer *input) { - struct connection *conn = cs_conn(cs); struct stream_interface *si = cs->data; struct stream *s = si_strm(si); - if (!conn) - return -1; + if (cs_conn_mux(cs)) { + const struct mux_ops *mux = cs_conn_mux(cs); - if (conn->mux->flags & MX_FL_HTX) - s->flags |= SF_HTX; + if (mux->flags & MX_FL_HTX) + s->flags |= SF_HTX; + } if (!b_is_null(input)) { /* Xfer the input buffer to the request channel. will @@ -362,12 +339,10 @@ int stream_buf_available(void *arg) * transfer to the stream and is set to BUF_NULL. On error, * buffer is unchanged and it is the caller responsibility to release it. */ -struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input) +struct stream *stream_new(struct session *sess, struct conn_stream *cs, struct buffer *input) { struct stream *s; struct task *t; - struct conn_stream *cs = objt_cs(origin); - struct appctx *appctx = objt_appctx(origin); DBG_TRACE_ENTER(STRM_EV_STRM_NEW); if (unlikely((s = pool_alloc(pool_head_stream)) == NULL)) @@ -472,9 +447,10 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu if (sess->fe->mode == PR_MODE_HTTP) s->flags |= SF_HTX; - if (appctx) - si_attach_appctx(&s->si[0], appctx); - else if (cs) { + si_attach_cs(&s->si[0], cs); + if (cs->flags & CS_FL_WEBSOCKET) + s->flags |= SF_WEBSOCKET; + if (cs_conn(cs)) { const struct mux_ops *mux = cs_conn_mux(cs); if (mux) { @@ -482,12 +458,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu s->si[0].flags |= SI_FL_CLEAN_ABRT; if (mux->flags & MX_FL_HTX) s->flags |= SF_HTX; - - if (cs->flags & CS_FL_WEBSOCKET) - s->flags |= SF_WEBSOCKET; } - /* attach the incoming connection to the stream interface now. */ - si_attach_cs(&s->si[0], cs); } @@ -504,7 +475,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu if (likely(sess->fe->options2 & PR_O2_INDEPSTR)) s->si[1].flags |= SI_FL_INDEP_STR; - if (!si_alloc_cs(&s->si[1], NULL)) + s->si[1].cs = cs_new(NULL); + if (!s->si[1].cs) goto out_fail_alloc_cs; stream_init_srv_conn(s); @@ -572,7 +544,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu s->si[1].l7_buffer = BUF_NULL; /* finish initialization of the accepted file descriptor */ - if (appctx) + if (cs_appctx(cs)) si_want_get(&s->si[0]); if (sess->fe->accept && sess->fe->accept(s) < 0) @@ -596,6 +568,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu * the caller must handle the task_wakeup */ DBG_TRACE_LEAVE(STRM_EV_STRM_NEW, s); + task_wakeup(s->task, TASK_WOKEN_INIT); return s; /* Error unrolling */ @@ -622,7 +595,7 @@ static void stream_free(struct stream *s) struct session *sess = strm_sess(s); struct proxy *fe = sess->fe; struct bref *bref, *back; - struct conn_stream *cli_cs = objt_cs(s->si[0].end); + /* struct conn_stream *cli_cs = objt_cs(s->si[0].end); */ int must_free_sess; int i; @@ -683,8 +656,9 @@ static void stream_free(struct stream *s) http_destroy_txn(s); /* ensure the client-side transport layer is destroyed */ - if (cli_cs) - cs_close(cli_cs); + /* Be sure it is useless !! */ + /* if (cli_cs) */ + /* cs_close(cli_cs); */ for (i = 0; i < s->store_count; i++) { if (!s->store[i].ts) @@ -747,7 +721,8 @@ static void stream_free(struct stream *s) LIST_DELETE(&s->list); /* applets do not release session yet */ - must_free_sess = objt_appctx(sess->origin) && sess->origin == s->si[0].end; + /* FIXME: Handle it in appctx_free ??? */ + must_free_sess = objt_appctx(sess->origin) && sess->origin == s->si[0].cs->end; si_release_endpoint(&s->si[1]); @@ -900,7 +875,7 @@ int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout static void back_establish(struct stream *s) { struct stream_interface *si = &s->si[1]; - struct connection *conn = cs_conn(objt_cs(si->end)); + struct connection *conn = cs_conn(si->cs); struct channel *req = &s->req; struct channel *rep = &s->res; @@ -1023,12 +998,12 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px, return ACT_RET_ERR; /* Initialise the context. */ - appctx = si_appctx(&s->si[1]); + appctx = cs_appctx(s->si[1].cs); memset(&appctx->ctx, 0, sizeof(appctx->ctx)); appctx->rule = rule; } else - appctx = si_appctx(&s->si[1]); + appctx = cs_appctx(s->si[1].cs); /* Stops the applet scheduling, in case of the init function miss * some data. @@ -1501,8 +1476,8 @@ static int process_store_rules(struct stream *s, struct channel *rep, int an_bit */ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_proto) { + struct conn_stream *cs = s->si[0].cs; struct connection *conn; - struct conn_stream *cs; /* Already an HTTP stream */ if (IS_HTX_STRM(s)) @@ -1513,9 +1488,8 @@ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_prot if (unlikely(!s->txn && !http_create_txn(s))) return 0; - conn = objt_conn(strm_sess(s)->origin); - cs = objt_cs(s->si[0].end); - if (conn && cs) { + conn = cs_conn(cs); + if (conn) { si_rx_endp_more(&s->si[0]); /* Make sure we're unsubscribed, the the new * mux will probably want to subscribe to @@ -1541,8 +1515,8 @@ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_prot * silently destroyed. The new mux will create new * streams. */ - cs_free(cs); - si_detach_endpoint(&s->si[0]); + /* FIXME: must be tested */ + /* si_release_endpoint(&s->si[0]); */ s->logs.logwait = 0; s->logs.level = 0; channel_abort(&s->req); @@ -2180,10 +2154,10 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) && req->to_forward && (global.tune.options & GTUNE_USE_SPLICE) && - (cs_conn(objt_cs(si_f->end)) && cs_conn(__objt_cs(si_f->end))->xprt && cs_conn(__objt_cs(si_f->end))->xprt->rcv_pipe && - cs_conn(__objt_cs(si_f->end))->mux && cs_conn(__objt_cs(si_f->end))->mux->rcv_pipe) && - (cs_conn(objt_cs(si_b->end)) && cs_conn(__objt_cs(si_b->end))->xprt && cs_conn(__objt_cs(si_b->end))->xprt->snd_pipe && - cs_conn(__objt_cs(si_b->end))->mux && cs_conn(__objt_cs(si_b->end))->mux->snd_pipe) && + (cs_conn(si_f->cs) && cs_conn(si_f->cs)->xprt && cs_conn(si_f->cs)->xprt->rcv_pipe && + cs_conn(si_f->cs)->mux && cs_conn(si_f->cs)->mux->rcv_pipe) && + (cs_conn(si_b->cs) && cs_conn(si_b->cs)->xprt && cs_conn(si_b->cs)->xprt->snd_pipe && + cs_conn(si_b->cs)->mux && cs_conn(si_b->cs)->mux->snd_pipe) && (pipes_used < global.maxpipes) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && @@ -2373,10 +2347,10 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) && res->to_forward && (global.tune.options & GTUNE_USE_SPLICE) && - (cs_conn(objt_cs(si_f->end)) && cs_conn(__objt_cs(si_f->end))->xprt && cs_conn(__objt_cs(si_f->end))->xprt->snd_pipe && - cs_conn(__objt_cs(si_f->end))->mux && cs_conn(__objt_cs(si_f->end))->mux->snd_pipe) && - (cs_conn(objt_cs(si_b->end)) && cs_conn(__objt_cs(si_b->end))->xprt && cs_conn(__objt_cs(si_b->end))->xprt->rcv_pipe && - cs_conn(__objt_cs(si_b->end))->mux && cs_conn(__objt_cs(si_b->end))->mux->rcv_pipe) && + (cs_conn(si_f->cs) && cs_conn(si_f->cs)->xprt && cs_conn(si_f->cs)->xprt->snd_pipe && + cs_conn(si_f->cs)->mux && cs_conn(si_f->cs)->mux->snd_pipe) && + (cs_conn(si_b->cs) && cs_conn(si_b->cs)->xprt && cs_conn(si_b->cs)->xprt->rcv_pipe && + cs_conn(si_b->cs)->mux && cs_conn(si_b->cs)->mux->rcv_pipe) && (pipes_used < global.maxpipes) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && @@ -2453,8 +2427,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) si_b->prev_state == SI_ST_EST) { chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n", s->uniq_id, s->be->id, - cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1, - cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1); + cs_conn(si_f->cs) ? (unsigned short)cs_conn(si_f->cs)->handle.fd : -1, + cs_conn(si_b->cs) ? (unsigned short)cs_conn(si_b->cs)->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } @@ -2462,8 +2436,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) si_f->prev_state == SI_ST_EST) { chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n", s->uniq_id, s->be->id, - cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1, - cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1); + cs_conn(si_f->cs) ? (unsigned short)cs_conn(si_f->cs)->handle.fd : -1, + cs_conn(si_b->cs) ? (unsigned short)cs_conn(si_b->cs)->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } } @@ -2530,8 +2504,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n", s->uniq_id, s->be->id, - cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1, - cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1); + cs_conn(si_f->cs) ? (unsigned short)cs_conn(si_f->cs)->handle.fd : -1, + cs_conn(si_b->cs) ? (unsigned short)cs_conn(si_b->cs)->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } @@ -2759,17 +2733,17 @@ void stream_dump(struct buffer *buf, const struct stream *s, const char *pfx, ch req = &s->req; res = &s->res; - csf = objt_cs(si_f->end); + csf = si_f->cs; cof = cs_conn(csf); - acf = objt_appctx(si_f->end); + acf = cs_appctx(csf); if (cof && cof->src && addr_to_str(cof->src, pn, sizeof(pn)) >= 0) src = pn; else if (acf) src = acf->applet->name; - csb = objt_cs(si_b->end); + csb = si_b->cs; cob = cs_conn(csb); - acb = objt_appctx(si_b->end); + acb = cs_appctx(csb); srv = objt_server(s->target); if (srv) dst = srv->id; @@ -2780,8 +2754,8 @@ void stream_dump(struct buffer *buf, const struct stream *s, const char *pfx, ch "%sstrm=%p,%x src=%s fe=%s be=%s dst=%s%c" "%stxn=%p,%x txn.req=%s,%x txn.rsp=%s,%x%c" "%srqf=%x rqa=%x rpf=%x rpa=%x sif=%s,%x sib=%s,%x%c" - "%saf=%p,%u csf=%p,%x%c" - "%sab=%p,%u csb=%p,%x%c" + "%scsf=%p,%x csb=%p,%x%c" + "%saf=%p,%u sab=%p,%u%c" "%scof=%p,%x:%s(%p)/%s(%p)/%s(%d)%c" "%scob=%p,%x:%s(%p)/%s(%p)/%s(%d)%c" "", @@ -2792,8 +2766,8 @@ void stream_dump(struct buffer *buf, const struct stream *s, const char *pfx, ch pfx, req->flags, req->analysers, res->flags, res->analysers, si_state_str(si_f->state), si_f->flags, si_state_str(si_b->state), si_b->flags, eol, - pfx, acf, acf ? acf->st0 : 0, csf, csf ? csf->flags : 0, eol, - pfx, acb, acb ? acb->st0 : 0, csb, csb ? csb->flags : 0, eol, + pfx, csf, csf ? csf->flags : 0, csb, csb ? csb->flags : 0, eol, + pfx, acf, acf ? acf->st0 : 0, acb, acb ? acb->st0 : 0, eol, pfx, cof, cof ? cof->flags : 0, conn_get_mux_name(cof), cof?cof->ctx:0, conn_get_xprt_name(cof), cof ? cof->xprt_ctx : 0, conn_get_ctrl_name(cof), cof ? cof->handle.fd : 0, eol, pfx, cob, cob ? cob->flags : 0, conn_get_mux_name(cob), cob?cob->ctx:0, conn_get_xprt_name(cob), @@ -3133,7 +3107,7 @@ void list_services(FILE *out) */ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct stream *strm) { - struct appctx *appctx = __objt_appctx(si->end); + struct appctx *appctx = cs_appctx(si->cs); struct tm tm; extern const char *monthname[12]; char pn[INET6_ADDRSTRLEN]; @@ -3217,9 +3191,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st else chunk_appendf(&trash, " backend= (id=-1 mode=-)"); - cs = objt_cs(strm->si[1].end); - conn = cs_conn(cs); - + conn = cs_conn(strm->si[1].cs); switch (conn && conn_get_src(conn) ? addr_to_str(conn->src, pn, sizeof(pn)) : AF_UNSPEC) { case AF_INET: case AF_INET6: @@ -3286,8 +3258,8 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st &strm->si[0], si_state_str(strm->si[0].state), strm->si[0].flags, - obj_type_name(strm->si[0].end), - obj_base_ptr(strm->si[0].end), + obj_type_name(strm->si[0].cs->end), + obj_base_ptr(strm->si[0].cs->end), strm->si[0].exp ? tick_is_expired(strm->si[0].exp, now_ms) ? "" : human_time(TICKS_TO_MS(strm->si[0].exp - now_ms), @@ -3299,20 +3271,20 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st &strm->si[1], si_state_str(strm->si[1].state), strm->si[1].flags, - obj_type_name(strm->si[1].end), - obj_base_ptr(strm->si[1].end), + obj_type_name(strm->si[1].cs->end), + obj_base_ptr(strm->si[1].cs->end), strm->si[1].exp ? tick_is_expired(strm->si[1].exp, now_ms) ? "" : human_time(TICKS_TO_MS(strm->si[1].exp - now_ms), TICKS_TO_MS(1000)) : "", strm->si[1].err_type, strm->si[1].wait_event.events); - if (cs_conn(objt_cs(strm->si[0].end)) != NULL) { - cs = __objt_cs(strm->si[0].end); - conn = cs_conn(cs); + cs = strm->si[0].cs; + chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx); + if ((conn = cs_conn(cs)) != NULL) { chunk_appendf(&trash, - " co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", + " co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", conn, conn_get_ctrl_name(conn), conn_get_xprt_name(conn), @@ -3329,11 +3301,10 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0); - chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx); } - else if ((tmpctx = objt_appctx(strm->si[0].end)) != NULL) { + else if ((tmpctx = cs_appctx(cs)) != NULL) { chunk_appendf(&trash, - " app0=%p st0=%d st1=%d st2=%d applet=%s tmask=0x%lx nice=%d calls=%u rate=%u cpu=%llu lat=%llu\n", + " app0=%p st0=%d st1=%d st2=%d applet=%s tmask=0x%lx nice=%d calls=%u rate=%u cpu=%llu lat=%llu\n", tmpctx, tmpctx->st0, tmpctx->st1, @@ -3344,12 +3315,11 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st (unsigned long long)tmpctx->t->cpu_time, (unsigned long long)tmpctx->t->lat_time); } - if (cs_conn(objt_cs(strm->si[1].end)) != NULL) { - cs = __objt_cs(strm->si[1].end); - conn = cs_conn(cs); - + cs = strm->si[1].cs; + chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx); + if ((conn = cs_conn(cs)) != NULL) { chunk_appendf(&trash, - " co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", + " co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", conn, conn_get_ctrl_name(conn), conn_get_xprt_name(conn), @@ -3366,11 +3336,10 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0); - chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx); } - else if ((tmpctx = objt_appctx(strm->si[1].end)) != NULL) { + else if ((tmpctx = cs_appctx(cs)) != NULL) { chunk_appendf(&trash, - " app1=%p st0=%d st1=%d st2=%d applet=%s tmask=0x%lx nice=%d calls=%u rate=%u cpu=%llu lat=%llu\n", + " app1=%p st0=%d st1=%d st2=%d applet=%s tmask=0x%lx nice=%d calls=%u rate=%u cpu=%llu lat=%llu\n", tmpctx, tmpctx->st0, tmpctx->st1, @@ -3680,7 +3649,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms), TICKS_TO_MS(1000)) : ""); - conn = cs_conn(objt_cs(curr_strm->si[0].end)); + conn = cs_conn(curr_strm->si[0].cs); chunk_appendf(&trash, " s0=[%d,%1xh,fd=%d,ex=%s]", curr_strm->si[0].state, @@ -3690,7 +3659,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms), TICKS_TO_MS(1000)) : ""); - conn = cs_conn(objt_cs(curr_strm->si[1].end)); + conn = cs_conn(curr_strm->si[1].cs); chunk_appendf(&trash, " s1=[%d,%1xh,fd=%d,ex=%s]", curr_strm->si[1].state, diff --git a/src/stream_interface.c b/src/stream_interface.c index 2edef2b450..ae07404fcf 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -306,13 +306,14 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si)); - appctx = si_alloc_appctx(si, app); + appctx = appctx_new(app); if (!appctx) return NULL; - + si_attach_appctx(si, appctx); + appctx->t->nice = si_strm(si)->task->nice; si_cant_get(si); appctx_wakeup(appctx); - return si_appctx(si); + return appctx; } /* This callback is used to send a valid PROXY protocol line to a socket being @@ -358,7 +359,7 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) ret = make_proxy_line(trash.area, trash.size, objt_server(conn->target), - cs_conn(objt_cs(si_opposite(si)->end)), + cs_conn(si_opposite(si)->cs), strm); } else { @@ -433,7 +434,7 @@ static void stream_int_notify(struct stream_interface *si) /* process consumer side */ if (channel_is_empty(oc)) { - struct connection *conn = cs_conn(objt_cs(si->end)); + struct connection *conn = cs_conn(si->cs); if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) && (si->state == SI_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)))) @@ -800,7 +801,7 @@ int si_cs_send(struct conn_stream *cs) struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state) { struct stream_interface *si = ctx; - struct conn_stream *cs = objt_cs(si->end); + struct conn_stream *cs = si->cs; int ret = 0; if (!cs_conn(cs)) @@ -916,7 +917,6 @@ void si_update_tx(struct stream_interface *si) void si_sync_send(struct stream_interface *si) { struct channel *oc = si_oc(si); - struct conn_stream *cs; oc->flags &= ~(CF_WRITE_NULL|CF_WRITE_PARTIAL); @@ -929,11 +929,10 @@ void si_sync_send(struct stream_interface *si) if (!si_state_in(si->state, SI_SB_CON|SI_SB_RDY|SI_SB_EST)) return; - cs = objt_cs(si->end); - if (!cs_conn_mux(cs)) + if (!cs_conn_mux(si->cs)) return; - si_cs_send(cs); + si_cs_send(si->cs); } /* Updates at once the channel flags, and timers of both stream interfaces of a @@ -964,15 +963,15 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b /* stream ints are processed outside of process_stream() and must be * handled at the latest moment. */ - if (obj_type(si_f->end) == OBJ_TYPE_APPCTX && + if (cs_appctx(si_f->cs) && ((si_rx_endp_ready(si_f) && !si_rx_blocked(si_f)) || (si_tx_endp_ready(si_f) && !si_tx_blocked(si_f)))) - appctx_wakeup(si_appctx(si_f)); + appctx_wakeup(cs_appctx(si_f->cs)); - if (obj_type(si_b->end) == OBJ_TYPE_APPCTX && + if (cs_appctx(si_b->cs) && ((si_rx_endp_ready(si_b) && !si_rx_blocked(si_b)) || (si_tx_endp_ready(si_b) && !si_tx_blocked(si_b)))) - appctx_wakeup(si_appctx(si_b)); + appctx_wakeup(cs_appctx(si_b->cs)); } /* @@ -987,9 +986,11 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b */ static void stream_int_shutr_conn(struct stream_interface *si) { - struct conn_stream *cs = __objt_cs(si->end); + struct conn_stream *cs = si->cs; struct channel *ic = si_ic(si); + BUG_ON(!cs_conn(cs)); + si_rx_shut_blk(si); if (ic->flags & CF_SHUTR) return; @@ -1023,10 +1024,12 @@ static void stream_int_shutr_conn(struct stream_interface *si) */ static void stream_int_shutw_conn(struct stream_interface *si) { - struct conn_stream *cs = __objt_cs(si->end); + struct conn_stream *cs = si->cs; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); + BUG_ON(!cs_conn(cs)); + oc->flags &= ~CF_SHUTW_NOW; if (oc->flags & CF_SHUTW) return; @@ -1120,7 +1123,7 @@ static void stream_int_chk_rcv_conn(struct stream_interface *si) static void stream_int_chk_snd_conn(struct stream_interface *si) { struct channel *oc = si_oc(si); - struct conn_stream *cs = __objt_cs(si->end); + struct conn_stream *cs = si->cs; struct connection *conn = cs_conn(cs); BUG_ON(!conn); @@ -1551,10 +1554,12 @@ int si_cs_recv(struct conn_stream *cs) */ static void stream_int_read0(struct stream_interface *si) { - struct conn_stream *cs = __objt_cs(si->end); + struct conn_stream *cs = si->cs; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); + BUG_ON(!cs_conn(cs)); + si_rx_shut_blk(si); if (ic->flags & CF_SHUTR) return; @@ -1624,7 +1629,7 @@ void si_applet_wake_cb(struct stream_interface *si) */ if ((si_rx_endp_ready(si) && !si_rx_blocked(si)) || (si_tx_endp_ready(si) && !si_tx_blocked(si))) - appctx_wakeup(si_appctx(si)); + appctx_wakeup(cs_appctx(si->cs)); } /* @@ -1686,7 +1691,7 @@ static void stream_int_shutw_applet(struct stream_interface *si) } /* on shutw we always wake the applet up */ - appctx_wakeup(si_appctx(si)); + appctx_wakeup(cs_appctx(si->cs)); switch (si->state) { case SI_ST_RDY: @@ -1730,7 +1735,7 @@ static void stream_int_chk_rcv_applet(struct stream_interface *si) if (!ic->pipe) { /* (re)start reading */ - appctx_wakeup(si_appctx(si)); + appctx_wakeup(cs_appctx(si->cs)); } } @@ -1756,7 +1761,7 @@ static void stream_int_chk_snd_applet(struct stream_interface *si) if (!channel_is_empty(oc)) { /* (re)start sending */ - appctx_wakeup(si_appctx(si)); + appctx_wakeup(cs_appctx(si->cs)); } } diff --git a/src/tcp_sample.c b/src/tcp_sample.c index 19edcd2438..d98f7c9b39 100644 --- a/src/tcp_sample.c +++ b/src/tcp_sample.c @@ -54,7 +54,7 @@ smp_fetch_src(const struct arg *args, struct sample *smp, const char *kw, void * if (kw[0] == 'b') { /* bc_src */ struct connection *conn = ((obj_type(smp->sess->origin) == OBJ_TYPE_CHECK) ? cs_conn(__objt_check(smp->sess->origin)->cs) - : (smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)): NULL)); + : (smp->strm ? cs_conn(smp->strm->si[1].cs): NULL)); if (conn && conn_get_src(conn)) src = conn_src(conn); } @@ -98,7 +98,7 @@ smp_fetch_sport(const struct arg *args, struct sample *smp, const char *kw, void if (kw[0] == 'b') { /* bc_src_port */ struct connection *conn = ((obj_type(smp->sess->origin) == OBJ_TYPE_CHECK) ? cs_conn(__objt_check(smp->sess->origin)->cs) - : (smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)): NULL)); + : (smp->strm ? cs_conn(smp->strm->si[1].cs): NULL)); if (conn && conn_get_src(conn)) src = conn_src(conn); } @@ -133,7 +133,7 @@ smp_fetch_dst(const struct arg *args, struct sample *smp, const char *kw, void * if (kw[0] == 'b') { /* bc_dst */ struct connection *conn = ((obj_type(smp->sess->origin) == OBJ_TYPE_CHECK) ? cs_conn(__objt_check(smp->sess->origin)->cs) - : (smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)): NULL)); + : (smp->strm ? cs_conn(smp->strm->si[1].cs): NULL)); if (conn && conn_get_dst(conn)) dst = conn_dst(conn); } @@ -229,7 +229,7 @@ smp_fetch_dport(const struct arg *args, struct sample *smp, const char *kw, void if (kw[0] == 'b') { /* bc_dst_port */ struct connection *conn = ((obj_type(smp->sess->origin) == OBJ_TYPE_CHECK) ? cs_conn(__objt_check(smp->sess->origin)->cs) - : (smp->strm ? cs_conn(objt_cs(smp->strm->si[1].end)): NULL)); + : (smp->strm ? cs_conn(smp->strm->si[1].cs): NULL)); if (conn && conn_get_dst(conn)) dst = conn_dst(conn); } @@ -325,7 +325,7 @@ static inline int get_tcp_info(const struct arg *args, struct sample *smp, /* get the object associated with the stream interface.The * object can be other thing than a connection. For example, * it be a appctx. */ - conn = cs_conn(objt_cs(smp->strm->si[dir].end)); + conn = cs_conn(smp->strm->si[dir].cs); if (!conn) return 0; -- 2.39.5