From: Christopher Faulet Date: Fri, 17 Dec 2021 16:28:35 +0000 (+0100) Subject: MEDIUM: conn-stream: No longer access connection field directly X-Git-Tag: v2.6-dev2~50 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=897d612d684b2d63e9e972d4198262f1af0d20fb;p=thirdparty%2Fhaproxy.git MEDIUM: conn-stream: No longer access connection field directly To be able to handle applets as a conn-stream endpoint, we must be prepared to handle different types of endpoints. First of all, the conn-strream's connection must no longer be used directly. --- diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h index cd833b2f05..bceb7a6fda 100644 --- a/include/haproxy/conn_stream.h +++ b/include/haproxy/conn_stream.h @@ -30,7 +30,7 @@ extern struct pool_head *pool_head_connstream; -#define IS_HTX_CS(cs) (IS_HTX_CONN((cs)->conn)) +#define IS_HTX_CS(cs) (cs_conn(cs) && IS_HTX_CONN(cs_conn(cs))) struct conn_stream *cs_new(struct connection *conn, void *target); void cs_free(struct conn_stream *cs); @@ -52,6 +52,16 @@ static inline struct connection *cs_conn(const struct conn_stream *cs) return cs ? cs->conn : NULL; } +/* Returns the mux of the connection from a cs if the endpoint is a + * connection. Otherwise NULL is returned. + */ +static inline const struct mux_ops *cs_conn_mux(const struct conn_stream *cs) +{ + const struct connection *conn = cs_conn(cs); + + return (conn ? conn->mux : NULL); +} + /* Attaches a conn_stream to a data layer and sets the relevant callbacks */ static inline void cs_attach(struct conn_stream *cs, void *data, const struct data_cb *data_cb) { @@ -66,15 +76,15 @@ static inline void cs_attach(struct conn_stream *cs, void *data, const struct da */ static inline void cs_detach(struct conn_stream *cs) { - if (cs_conn(cs)) { - if (cs->conn->mux) - cs->conn->mux->detach(cs); + struct connection *conn; + + if ((conn = cs_conn(cs))) { + if (conn->mux) + conn->mux->detach(cs); else { /* It's too early to have a mux, let's just destroy * the connection */ - struct connection *conn = cs->conn; - conn_stop_tracking(conn); conn_full_close(conn); if (conn->destroy_cb) @@ -102,24 +112,30 @@ static inline const char *cs_get_data_name(const struct conn_stream *cs) /* shut read */ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode) { + const struct mux_ops *mux; + if (!cs_conn(cs) || cs->flags & CS_FL_SHR) return; /* clean data-layer shutdown */ - if (cs->conn->mux && cs->conn->mux->shutr) - cs->conn->mux->shutr(cs, mode); + mux = cs_conn_mux(cs); + if (mux && mux->shutr) + mux->shutr(cs, mode); cs->flags |= (mode == CS_SHR_DRAIN) ? CS_FL_SHRD : CS_FL_SHRR; } /* shut write */ static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode) { + const struct mux_ops *mux; + if (!cs_conn(cs) || cs->flags & CS_FL_SHW) return; /* clean data-layer shutdown */ - if (cs->conn->mux && cs->conn->mux->shutw) - cs->conn->mux->shutw(cs, mode); + mux = cs_conn_mux(cs); + if (mux && mux->shutw) + mux->shutw(cs, mode); cs->flags |= (mode == CS_SHW_NORMAL) ? CS_FL_SHWN : CS_FL_SHWS; } diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index 2a373d2917..15acc39bd4 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -175,6 +175,7 @@ static inline enum obj_type *si_detach_endpoint(struct stream_interface *si) static inline void si_reset_endpoint(struct stream_interface *si) { struct conn_stream *cs; + struct connection *conn; struct appctx *appctx; if (!si->end) @@ -187,9 +188,9 @@ static inline void si_reset_endpoint(struct stream_interface *si) si_detach_endpoint(si); } else if ((cs = objt_cs(si->end))) { - if (cs_conn(cs) && si->wait_event.events != 0) - cs->conn->mux->unsubscribe(cs, si->wait_event.events, - &si->wait_event); + if ((conn = cs_conn(cs)) && si->wait_event.events != 0) + conn->mux->unsubscribe(cs, si->wait_event.events, + &si->wait_event); cs_detach(cs); si->ops = &si_embedded_ops; } @@ -201,6 +202,7 @@ static inline void si_reset_endpoint(struct stream_interface *si) static inline void si_release_endpoint(struct stream_interface *si) { struct conn_stream *cs; + struct connection *conn; struct appctx *appctx; if (!si->end) @@ -212,9 +214,9 @@ static inline void si_release_endpoint(struct stream_interface *si) appctx_free(appctx); } else if ((cs = objt_cs(si->end))) { - if (cs_conn(cs) && si->wait_event.events != 0) - cs->conn->mux->unsubscribe(cs, si->wait_event.events, - &si->wait_event); + if ((conn = cs_conn(cs)) && si->wait_event.events != 0) + conn->mux->unsubscribe(cs, si->wait_event.events, + &si->wait_event); cs_destroy(cs); } si_detach_endpoint(si); @@ -527,7 +529,7 @@ static inline int si_sync_recv(struct stream_interface *si) return 0; cs = objt_cs(si->end); - if (!cs_conn(cs) || !cs->conn->mux) + if (!cs_conn_mux(cs)) return 0; // only conn_streams are supported if (si->wait_event.events & SUB_RETRY_RECV) @@ -622,10 +624,10 @@ static inline const struct sockaddr_storage *si_src(struct stream_interface *si) if (!(si->flags & SI_FL_ISBACK)) return sess_src(strm_sess(si_strm(si))); else { - struct conn_stream *cs = objt_cs(si->end); + struct connection *conn = cs_conn(objt_cs(si->end)); - if (cs_conn(cs)) - return conn_src(cs->conn); + if (conn) + return conn_src(conn); } return NULL; } @@ -642,10 +644,10 @@ static inline const struct sockaddr_storage *si_dst(struct stream_interface *si) if (!(si->flags & SI_FL_ISBACK)) return sess_dst(strm_sess(si_strm(si))); else { - struct conn_stream *cs = objt_cs(si->end); + struct connection *conn = cs_conn(objt_cs(si->end)); - if (cs_conn(cs)) - return conn_dst(cs->conn); + if (conn) + return conn_dst(conn); } return NULL; } @@ -666,10 +668,10 @@ static inline int si_get_src(struct stream_interface *si) if (!(si->flags & SI_FL_ISBACK)) src = sess_src(strm_sess(si_strm(si))); else { - struct conn_stream *cs = objt_cs(si->end); + struct connection *conn = cs_conn(objt_cs(si->end)); - if (cs_conn(cs)) - src = conn_src(cs->conn); + if (conn) + src = conn_src(conn); } if (!src) return 0; @@ -697,10 +699,10 @@ static inline int si_get_dst(struct stream_interface *si) if (!(si->flags & SI_FL_ISBACK)) dst = sess_dst(strm_sess(si_strm(si))); else { - struct conn_stream *cs = objt_cs(si->end); + struct connection *conn = cs_conn(objt_cs(si->end)); - if (cs_conn(cs)) - dst = conn_dst(cs->conn); + if (conn) + dst = conn_dst(conn); } if (!dst) return 0; diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index 374d26e46e..3a4f7c0397 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -3609,7 +3609,7 @@ static void fcgi_detach(struct conn_stream *cs) /* this stream may be blocked waiting for some data to leave, so orphan * it in this case. */ - if (!(cs->conn->flags & CO_FL_ERROR) && + if (!(fconn->conn->flags & CO_FL_ERROR) && (fconn->state != FCGI_CS_CLOSED) && (fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) && (fstrm->subs || (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))) { diff --git a/src/mux_h1.c b/src/mux_h1.c index 2e1090a775..81b5cce055 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -3419,9 +3419,9 @@ static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode) /* NOTE: Be sure to handle abort (cf. h2_shutr) */ if (cs->flags & CS_FL_SHR) goto end; - if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutr) - cs->conn->xprt->shutr(cs->conn, cs->conn->xprt_ctx, - (mode == CS_SHR_DRAIN)); + + if (conn_xprt_ready(h1c->conn) && h1c->conn->xprt->shutr) + h1c->conn->xprt->shutr(h1c->conn, h1c->conn->xprt_ctx, (mode == CS_SHR_DRAIN)); end: TRACE_LEAVE(H1_EV_STRM_SHUT, h1c->conn, h1s); } @@ -3464,7 +3464,7 @@ static void h1_shutw(struct conn_stream *cs, enum cs_shw_mode mode) h1c->flags |= H1C_F_ST_SILENT_SHUT; if (!b_data(&h1c->obuf)) - h1_shutw_conn(cs->conn); + h1_shutw_conn(h1c->conn); end: TRACE_LEAVE(H1_EV_STRM_SHUT, h1c->conn, h1s); } @@ -3671,28 +3671,28 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res); int ret = 0; - TRACE_ENTER(H1_EV_STRM_RECV, cs->conn, h1s, 0, (size_t[]){count}); + TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count}); if ((h1m->flags & H1_MF_CHNK) || (h1m->state != H1_MSG_DATA && h1m->state != H1_MSG_TUNNEL)) { h1c->flags &= ~H1C_F_WANT_SPLICE; - TRACE_STATE("Allow xprt rcv_buf on !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, cs->conn, h1s); + TRACE_STATE("Allow xprt rcv_buf on !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, h1c->conn, h1s); goto end; } h1c->flags |= H1C_F_WANT_SPLICE; if (h1s_data_pending(h1s)) { - TRACE_STATE("flush input buffer before splicing", H1_EV_STRM_RECV, cs->conn, h1s); + TRACE_STATE("flush input buffer before splicing", H1_EV_STRM_RECV, h1c->conn, h1s); goto end; } if (!h1_recv_allowed(h1c)) { - TRACE_DEVEL("leaving on !recv_allowed", H1_EV_STRM_RECV, cs->conn, h1s); + TRACE_DEVEL("leaving on !recv_allowed", H1_EV_STRM_RECV, h1c->conn, h1s); goto end; } if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN) && count > h1m->curr_len) count = h1m->curr_len; - ret = cs->conn->xprt->rcv_pipe(cs->conn, cs->conn->xprt_ctx, pipe, count); + ret = h1c->conn->xprt->rcv_pipe(h1c->conn, h1c->conn->xprt_ctx, pipe, count); if (ret >= 0) { if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN)) { if (ret > h1m->curr_len) { @@ -3700,14 +3700,14 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c h1c->flags |= H1C_F_ST_ERROR; cs->flags |= CS_FL_ERROR; TRACE_ERROR("too much payload, more than announced", - H1_EV_RX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, cs->conn, h1s); + H1_EV_RX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); goto end; } h1m->curr_len -= ret; if (!h1m->curr_len) { h1m->state = H1_MSG_DONE; h1c->flags &= ~H1C_F_WANT_SPLICE; - TRACE_STATE("payload fully received", H1_EV_STRM_RECV, cs->conn, h1s); + TRACE_STATE("payload fully received", H1_EV_STRM_RECV, h1c->conn, h1s); } } HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, ret); @@ -3715,22 +3715,22 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c } end: - if (conn_xprt_read0_pending(cs->conn)) { + if (conn_xprt_read0_pending(h1c->conn)) { h1s->flags |= H1S_F_REOS; h1c->flags &= ~H1C_F_WANT_SPLICE; - TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, cs->conn, h1s); + TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, h1c->conn, h1s); } if (!(h1c->flags & H1C_F_WANT_SPLICE)) { TRACE_STATE("notify the mux can't use splicing anymore", H1_EV_STRM_RECV, h1c->conn, h1s); cs->flags &= ~CS_FL_MAY_SPLICE; if (!(h1c->wait_event.events & SUB_RETRY_RECV)) { - TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, cs->conn, h1s); - cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, h1c->conn, h1s); + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); } } - TRACE_LEAVE(H1_EV_STRM_RECV, cs->conn, h1s, 0, (size_t[]){ret}); + TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret}); return ret; } @@ -3741,37 +3741,37 @@ static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe) struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req); int ret = 0; - TRACE_ENTER(H1_EV_STRM_SEND, cs->conn, h1s, 0, (size_t[]){pipe->data}); + TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){pipe->data}); if (b_data(&h1c->obuf)) { if (!(h1c->wait_event.events & SUB_RETRY_SEND)) { - TRACE_STATE("more data to send, subscribing", H1_EV_STRM_SEND, cs->conn, h1s); - cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, SUB_RETRY_SEND, &h1c->wait_event); + TRACE_STATE("more data to send, subscribing", H1_EV_STRM_SEND, h1c->conn, h1s); + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_SEND, &h1c->wait_event); } goto end; } - ret = cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe); + ret = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, pipe); if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN)) { if (ret > h1m->curr_len) { h1s->flags |= H1S_F_PROCESSING_ERROR; h1c->flags |= H1C_F_ST_ERROR; cs->flags |= CS_FL_ERROR; TRACE_ERROR("too much payload, more than announced", - H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, cs->conn, h1s); + H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); goto end; } h1m->curr_len -= ret; if (!h1m->curr_len) { h1m->state = H1_MSG_DONE; - TRACE_STATE("payload fully xferred", H1_EV_TX_DATA|H1_EV_TX_BODY, cs->conn, h1s); + TRACE_STATE("payload fully xferred", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s); } } HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, ret); HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, ret); end: - TRACE_LEAVE(H1_EV_STRM_SEND, cs->conn, h1s, 0, (size_t[]){ret}); + TRACE_LEAVE(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){ret}); return ret; } #endif diff --git a/src/mux_h2.c b/src/mux_h2.c index 1389f84877..c46633208f 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -4289,7 +4289,7 @@ static void h2_detach(struct conn_stream *cs) /* this stream may be blocked waiting for some data to leave (possibly * an ES or RST frame), so orphan it in this case. */ - if (!(cs->conn->flags & CO_FL_ERROR) && + if (!(h2c->conn->flags & CO_FL_ERROR) && (h2c->st0 < H2_CS_ERROR) && (h2s->flags & (H2_SF_BLK_MBUSY | H2_SF_BLK_MROOM | H2_SF_BLK_MFCTL)) && ((h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) || h2s->subs)) { diff --git a/src/mux_pt.c b/src/mux_pt.c index 8530564c2c..1f8b4dc2ad 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -405,8 +405,8 @@ static void mux_pt_destroy_meth(void *ctx) */ static void mux_pt_detach(struct conn_stream *cs) { - struct connection *conn = cs->conn; - struct mux_pt_ctx *ctx = cs->conn->ctx; + struct connection *conn = cs_conn(cs); + struct mux_pt_ctx *ctx = conn->ctx; TRACE_ENTER(PT_EV_STRM_END, conn, cs); @@ -440,37 +440,41 @@ static int mux_pt_avail_streams(struct connection *conn) static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode) { - TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs); + struct connection *conn = cs_conn(cs); + + TRACE_ENTER(PT_EV_STRM_SHUT, conn, cs); if (cs->flags & CS_FL_SHR) return; cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM); - if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutr) - cs->conn->xprt->shutr(cs->conn, cs->conn->xprt_ctx, + if (conn_xprt_ready(conn) && conn->xprt->shutr) + conn->xprt->shutr(conn, conn->xprt_ctx, (mode == CS_SHR_DRAIN)); else if (mode == CS_SHR_DRAIN) - conn_ctrl_drain(cs->conn); + conn_ctrl_drain(conn); if (cs->flags & CS_FL_SHW) - conn_full_close(cs->conn); + conn_full_close(conn); - TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs); + TRACE_LEAVE(PT_EV_STRM_SHUT, conn, cs); } static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode) { - TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs); + struct connection *conn = cs_conn(cs); + + TRACE_ENTER(PT_EV_STRM_SHUT, conn, cs); if (cs->flags & CS_FL_SHW) return; - if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutw) - cs->conn->xprt->shutw(cs->conn, cs->conn->xprt_ctx, + if (conn_xprt_ready(conn) && conn->xprt->shutw) + conn->xprt->shutw(conn, conn->xprt_ctx, (mode == CS_SHW_NORMAL)); if (!(cs->flags & CS_FL_SHR)) - conn_sock_shutw(cs->conn, (mode == CS_SHW_NORMAL)); + conn_sock_shutw(conn, (mode == CS_SHW_NORMAL)); else - conn_full_close(cs->conn); + conn_full_close(conn); - TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs); + TRACE_LEAVE(PT_EV_STRM_SHUT, conn, cs); } /* @@ -488,44 +492,46 @@ static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode) */ static size_t mux_pt_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { + struct connection *conn = cs_conn(cs); size_t ret = 0; - TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){count}); + TRACE_ENTER(PT_EV_RX_DATA, conn, cs, buf, (size_t[]){count}); if (!count) { cs->flags |= (CS_FL_RCV_MORE | CS_FL_WANT_ROOM); goto end; } b_realign_if_empty(buf); - ret = cs->conn->xprt->rcv_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags); - if (conn_xprt_read0_pending(cs->conn)) { + ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, count, flags); + if (conn_xprt_read0_pending(conn)) { cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM); cs->flags |= CS_FL_EOS; - TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs); + TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, cs); } - if (cs->conn->flags & CO_FL_ERROR) { + if (conn->flags & CO_FL_ERROR) { cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM); cs->flags |= CS_FL_ERROR; - TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs); + TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, cs); } end: - TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){ret}); + TRACE_LEAVE(PT_EV_RX_DATA, conn, cs, buf, (size_t[]){ret}); return ret; } /* Called from the upper layer, to send data */ static size_t mux_pt_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { + struct connection *conn = cs_conn(cs); size_t ret; - TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){count}); + TRACE_ENTER(PT_EV_TX_DATA, conn, cs, buf, (size_t[]){count}); - ret = cs->conn->xprt->snd_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags); + ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, count, flags); if (ret > 0) b_del(buf, ret); - TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){ret}); + TRACE_LEAVE(PT_EV_TX_DATA, conn, cs, buf, (size_t[]){ret}); return ret; } @@ -536,8 +542,10 @@ static size_t mux_pt_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t */ static int mux_pt_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type}); - return cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, event_type, es); + struct connection *conn = cs_conn(cs); + + TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, conn, cs, 0, (size_t[]){event_type}); + return conn->xprt->subscribe(conn, conn->xprt_ctx, event_type, es); } /* Called from the upper layer, to unsubscribe from events . @@ -546,41 +554,45 @@ static int mux_pt_subscribe(struct conn_stream *cs, int event_type, struct wait_ */ static int mux_pt_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es) { - TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type}); - return cs->conn->xprt->unsubscribe(cs->conn, cs->conn->xprt_ctx, event_type, es); + struct connection *conn = cs_conn(cs); + + TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, conn, cs, 0, (size_t[]){event_type}); + return conn->xprt->unsubscribe(conn, conn->xprt_ctx, event_type, es); } #if defined(USE_LINUX_SPLICE) /* Send and get, using splicing */ static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count) { + struct connection *conn = cs_conn(cs); int ret; - TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){count}); + TRACE_ENTER(PT_EV_RX_DATA, conn, cs, 0, (size_t[]){count}); - ret = cs->conn->xprt->rcv_pipe(cs->conn, cs->conn->xprt_ctx, pipe, count); - if (conn_xprt_read0_pending(cs->conn)) { + ret = conn->xprt->rcv_pipe(conn, conn->xprt_ctx, pipe, count); + if (conn_xprt_read0_pending(conn)) { cs->flags |= CS_FL_EOS; - TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs); + TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, cs); } - if (cs->conn->flags & CO_FL_ERROR) { + if (conn->flags & CO_FL_ERROR) { cs->flags |= CS_FL_ERROR; - TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs); + TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, cs); } - TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){ret}); + TRACE_LEAVE(PT_EV_RX_DATA, conn, cs, 0, (size_t[]){ret}); return (ret); } static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe) { + struct connection *conn = cs_conn(cs); int ret; - TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){pipe->data}); + TRACE_ENTER(PT_EV_TX_DATA, conn, cs, 0, (size_t[]){pipe->data}); - ret = cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe); + ret = conn->xprt->snd_pipe(conn, conn->xprt_ctx, pipe); - TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){ret}); + TRACE_LEAVE(PT_EV_TX_DATA, conn, cs, 0, (size_t[]){ret}); return ret; } #endif diff --git a/src/stream.c b/src/stream.c index 619d04eda6..10cac1e316 100644 --- a/src/stream.c +++ b/src/stream.c @@ -275,9 +275,13 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace */ int stream_create_from_cs(struct conn_stream *cs, struct buffer *input) { + struct connection *conn = cs_conn(cs); struct stream *strm; - strm = stream_new(cs->conn->owner, &cs->obj_type, input); + if (!conn) + return -1; + + strm = stream_new(conn->owner, &cs->obj_type, input); if (strm == NULL) return -1; @@ -294,10 +298,14 @@ int stream_create_from_cs(struct conn_stream *cs, struct buffer *input) */ int stream_upgrade_from_cs(struct conn_stream *cs, struct buffer *input) { + struct connection *conn = cs_conn(cs); struct stream_interface *si = cs->data; struct stream *s = si_strm(si); - if (cs->conn->mux->flags & MX_FL_HTX) + if (!conn) + return -1; + + if (conn->mux->flags & MX_FL_HTX) s->flags |= SF_HTX; if (!b_is_null(input)) { @@ -467,10 +475,12 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu if (appctx) si_attach_appctx(&s->si[0], appctx); else if (cs) { - if (cs_conn(cs) && cs->conn->mux) { - if (cs->conn->mux->flags & MX_FL_CLEAN_ABRT) + const struct mux_ops *mux = cs_conn_mux(cs); + + if (mux) { + if (mux->flags & MX_FL_CLEAN_ABRT) s->si[0].flags |= SI_FL_CLEAN_ABRT; - if (cs->conn->mux->flags & MX_FL_HTX) + if (mux->flags & MX_FL_HTX) s->flags |= SF_HTX; if (cs->flags & CS_FL_WEBSOCKET) @@ -2170,10 +2180,10 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) && req->to_forward && (global.tune.options & GTUNE_USE_SPLICE) && - (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe && - __objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->rcv_pipe) && - (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe && - __objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->snd_pipe) && + (cs_conn(objt_cs(si_f->end)) && cs_conn(__objt_cs(si_f->end))->xprt && cs_conn(__objt_cs(si_f->end))->xprt->rcv_pipe && + cs_conn(__objt_cs(si_f->end))->mux && cs_conn(__objt_cs(si_f->end))->mux->rcv_pipe) && + (cs_conn(objt_cs(si_b->end)) && cs_conn(__objt_cs(si_b->end))->xprt && cs_conn(__objt_cs(si_b->end))->xprt->snd_pipe && + cs_conn(__objt_cs(si_b->end))->mux && cs_conn(__objt_cs(si_b->end))->mux->snd_pipe) && (pipes_used < global.maxpipes) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && @@ -2363,10 +2373,10 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) && res->to_forward && (global.tune.options & GTUNE_USE_SPLICE) && - (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe && - __objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->snd_pipe) && - (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe && - __objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->rcv_pipe) && + (cs_conn(objt_cs(si_f->end)) && cs_conn(__objt_cs(si_f->end))->xprt && cs_conn(__objt_cs(si_f->end))->xprt->snd_pipe && + cs_conn(__objt_cs(si_f->end))->mux && cs_conn(__objt_cs(si_f->end))->mux->snd_pipe) && + (cs_conn(objt_cs(si_b->end)) && cs_conn(__objt_cs(si_b->end))->xprt && cs_conn(__objt_cs(si_b->end))->xprt->rcv_pipe && + cs_conn(__objt_cs(si_b->end))->mux && cs_conn(__objt_cs(si_b->end))->mux->rcv_pipe) && (pipes_used < global.maxpipes) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && @@ -2443,8 +2453,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) si_b->prev_state == SI_ST_EST) { chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n", s->uniq_id, s->be->id, - cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, - cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); + cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1, + cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } @@ -2452,8 +2462,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) si_f->prev_state == SI_ST_EST) { chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n", s->uniq_id, s->be->id, - cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, - cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); + cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1, + cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } } @@ -2520,8 +2530,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n", s->uniq_id, s->be->id, - cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, - cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); + cs_conn(objt_cs(si_f->end)) ? (unsigned short)cs_conn(__objt_cs(si_f->end))->handle.fd : -1, + cs_conn(objt_cs(si_b->end)) ? (unsigned short)cs_conn(__objt_cs(si_b->end))->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } @@ -3299,7 +3309,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st if (cs_conn(objt_cs(strm->si[0].end)) != NULL) { cs = __objt_cs(strm->si[0].end); - conn = cs->conn; + conn = cs_conn(cs); chunk_appendf(&trash, " co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", @@ -3336,7 +3346,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st if (cs_conn(objt_cs(strm->si[1].end)) != NULL) { cs = __objt_cs(strm->si[1].end); - conn = cs->conn; + conn = cs_conn(cs); chunk_appendf(&trash, " co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", diff --git a/src/stream_interface.c b/src/stream_interface.c index e6254de379..2edef2b450 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -562,11 +562,13 @@ static void stream_int_notify(struct stream_interface *si) */ static int si_cs_process(struct conn_stream *cs) { - struct connection *conn = cs->conn; + struct connection *conn = cs_conn(cs); struct stream_interface *si = cs->data; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); + BUG_ON(!conn); + /* If we have data to send, try it now */ if (!channel_is_empty(oc) && !(si->wait_event.events & SUB_RETRY_SEND)) si_cs_send(cs); @@ -648,12 +650,14 @@ static int si_cs_process(struct conn_stream *cs) */ int si_cs_send(struct conn_stream *cs) { - struct connection *conn = cs->conn; + struct connection *conn = cs_conn(cs); struct stream_interface *si = cs->data; struct channel *oc = si_oc(si); int ret; int did_send = 0; + BUG_ON(!conn); + if (conn->flags & CO_FL_ERROR || cs->flags & (CS_FL_ERROR|CS_FL_ERR_PENDING)) { /* We're probably there because the tasklet was woken up, * but process_stream() ran before, detected there were an @@ -752,7 +756,7 @@ int si_cs_send(struct conn_stream *cs) } } - ret = cs->conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag); + ret = conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag); if (ret > 0) { did_send = 1; co_set_data(oc, co_data(oc) - ret); @@ -926,7 +930,7 @@ void si_sync_send(struct stream_interface *si) return; cs = objt_cs(si->end); - if (!cs_conn(cs) || !cs->conn->mux) + if (!cs_conn_mux(cs)) return; si_cs_send(cs); @@ -1117,6 +1121,9 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) { struct channel *oc = si_oc(si); struct conn_stream *cs = __objt_cs(si->end); + struct connection *conn = cs_conn(cs); + + BUG_ON(!conn); if (unlikely(!si_state_in(si->state, SI_SB_CON|SI_SB_RDY|SI_SB_EST) || (oc->flags & CF_SHUTW))) @@ -1132,7 +1139,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) si_cs_send(cs); - if (cs->flags & (CS_FL_ERROR|CS_FL_ERR_PENDING) || cs->conn->flags & CO_FL_ERROR) { + if (cs->flags & (CS_FL_ERROR|CS_FL_ERR_PENDING) || conn->flags & CO_FL_ERROR) { /* Write error on the file descriptor */ if (si->state >= SI_ST_CON) si->flags |= SI_FL_ERR; @@ -1209,13 +1216,15 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) */ int si_cs_recv(struct conn_stream *cs) { - struct connection *conn = cs->conn; + struct connection *conn = cs_conn(cs); struct stream_interface *si = cs->data; struct channel *ic = si_ic(si); int ret, max, cur_read = 0; int read_poll = MAX_READ_POLL_LOOPS; int flags = 0; + BUG_ON(!conn); + /* If not established yet, do nothing. */ if (si->state != SI_ST_EST) return 0; @@ -1373,7 +1382,7 @@ int si_cs_recv(struct conn_stream *cs) * CS_FL_RCV_MORE on the CS if more space is needed. */ max = channel_recv_max(ic); - ret = cs->conn->mux->rcv_buf(cs, &ic->buf, max, cur_flags); + ret = conn->mux->rcv_buf(cs, &ic->buf, max, cur_flags); if (cs->flags & CS_FL_WANT_ROOM) { /* CS_FL_WANT_ROOM must not be reported if the channel's diff --git a/src/tcpcheck.c b/src/tcpcheck.c index 9376d3244a..bf7e5c91ae 100644 --- a/src/tcpcheck.c +++ b/src/tcpcheck.c @@ -1218,7 +1218,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec if (conn_ctrl_ready(conn) && (connect->options & TCPCHK_OPT_LINGER)) { /* Some servers don't like reset on close */ - HA_ATOMIC_AND(&fdtab[cs->conn->handle.fd].state, ~FD_LINGER_RISK); + HA_ATOMIC_AND(&fdtab[conn->handle.fd].state, ~FD_LINGER_RISK); } if (conn_ctrl_ready(conn) && (conn->flags & (CO_FL_SEND_PROXY | CO_FL_SOCKS4))) { @@ -1495,7 +1495,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_send(struct check *check, struct tcpcheck_r } } if ((IS_HTX_CONN(conn) && !htx_is_empty(htxbuf(&check->bo))) || (!IS_HTX_CONN(conn) && b_data(&check->bo))) { - cs->conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list); + conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list); ret = TCPCHK_EVAL_WAIT; TRACE_DEVEL("data not fully sent, wait", CHK_EV_TCPCHK_SND|CHK_EV_TX_DATA, check); goto out;