From: Christopher Faulet Date: Wed, 15 Dec 2021 08:50:17 +0000 (+0100) Subject: MINOR: connection: Be prepared to handle conn-stream with no connection X-Git-Tag: v2.6-dev2~59 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=0256da14a5a35eb96687912f285cd55318229c60;p=thirdparty%2Fhaproxy.git MINOR: connection: Be prepared to handle conn-stream with no connection The conn-stream will progressively replace the stream-interface. Thus, a stream will have to allocate the backend conn-stream during its creation. This means it will be possible to have a conn-stream with no connection. To prepare this change, we test the conn-stream's connection when we retrieve it. --- diff --git a/include/haproxy/connection.h b/include/haproxy/connection.h index 7cc852fd1e..e0af4c3aa0 100644 --- a/include/haproxy/connection.h +++ b/include/haproxy/connection.h @@ -246,10 +246,16 @@ static inline void conn_xprt_shutw_hard(struct connection *c) c->xprt->shutw(c, c->xprt_ctx, 0); } +/* Returns the conn from a cs. If cs is NULL, returns NULL */ +static inline struct connection *cs_conn(const struct conn_stream *cs) +{ + return cs ? cs->conn : NULL; +} + /* shut read */ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode) { - if (cs->flags & CS_FL_SHR) + if (!cs_conn(cs) || cs->flags & CS_FL_SHR) return; /* clean data-layer shutdown */ @@ -261,7 +267,7 @@ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode) /* shut write */ static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode) { - if (cs->flags & CS_FL_SHW) + if (!cs_conn(cs) || cs->flags & CS_FL_SHW) return; /* clean data-layer shutdown */ @@ -387,29 +393,25 @@ static inline void conn_force_unsubscribe(struct connection *conn) /* Release a conn_stream */ static inline void cs_destroy(struct conn_stream *cs) { - if (cs->conn->mux) - cs->conn->mux->detach(cs); - else { - /* It's too early to have a mux, let's just destroy - * the connection - */ - struct connection *conn = cs->conn; - - conn_stop_tracking(conn); - conn_full_close(conn); - if (conn->destroy_cb) - conn->destroy_cb(conn); - conn_free(conn); + if (cs_conn(cs)) { + if (cs->conn->mux) + cs->conn->mux->detach(cs); + else { + /* It's too early to have a mux, let's just destroy + * the connection + */ + struct connection *conn = cs->conn; + + conn_stop_tracking(conn); + conn_full_close(conn); + if (conn->destroy_cb) + conn->destroy_cb(conn); + conn_free(conn); + } } cs_free(cs); } -/* Returns the conn from a cs. If cs is NULL, returns NULL */ -static inline struct connection *cs_conn(const struct conn_stream *cs) -{ - return cs ? cs->conn : NULL; -} - /* Returns the source address of the connection or NULL if not set */ static inline const struct sockaddr_storage *conn_src(struct connection *conn) { diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index c1c2b03fa8..55c622b961 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -183,9 +183,9 @@ static inline void si_release_endpoint(struct stream_interface *si) appctx_free(appctx); } else if ((cs = objt_cs(si->end))) { - if (si->wait_event.events != 0) + if (cs_conn(cs) && si->wait_event.events != 0) cs->conn->mux->unsubscribe(cs, si->wait_event.events, - &si->wait_event); + &si->wait_event); cs_destroy(cs); } si_detach_endpoint(si); @@ -481,7 +481,7 @@ static inline int si_sync_recv(struct stream_interface *si) return 0; cs = objt_cs(si->end); - if (!cs || !cs->conn->mux) + if (!cs_conn(cs) || !cs->conn->mux) return 0; // only conn_streams are supported if (si->wait_event.events & SUB_RETRY_RECV) @@ -578,7 +578,7 @@ static inline const struct sockaddr_storage *si_src(struct stream_interface *si) else { struct conn_stream *cs = objt_cs(si->end); - if (cs && cs->conn) + if (cs_conn(cs)) return conn_src(cs->conn); } return NULL; @@ -598,7 +598,7 @@ static inline const struct sockaddr_storage *si_dst(struct stream_interface *si) else { struct conn_stream *cs = objt_cs(si->end); - if (cs && cs->conn) + if (cs_conn(cs)) return conn_dst(cs->conn); } return NULL; @@ -622,7 +622,7 @@ static inline int si_get_src(struct stream_interface *si) else { struct conn_stream *cs = objt_cs(si->end); - if (cs && cs->conn) + if (cs_conn(cs)) src = conn_src(cs->conn); } if (!src) @@ -653,7 +653,7 @@ static inline int si_get_dst(struct stream_interface *si) else { struct conn_stream *cs = objt_cs(si->end); - if (cs && cs->conn) + if (cs_conn(cs)) dst = conn_dst(cs->conn); } if (!dst) diff --git a/src/backend.c b/src/backend.c index fb6313181f..6a793a0c7b 100644 --- a/src/backend.c +++ b/src/backend.c @@ -2220,8 +2220,6 @@ void back_handle_st_con(struct stream *s) void back_handle_st_cer(struct stream *s) { struct stream_interface *si = &s->si[1]; - struct conn_stream *cs = objt_cs(si->end); - struct connection *conn = cs_conn(cs); DBG_TRACE_ENTER(STRM_EV_STRM_PROC|STRM_EV_SI_ST, s); @@ -2230,6 +2228,8 @@ void back_handle_st_cer(struct stream *s) /* we probably have to release last stream from the server */ if (objt_server(s->target)) { + struct connection *conn = cs_conn(objt_cs(si->end)); + health_adjust(__objt_server(s->target), HANA_STATUS_L4_ERR); if (s->flags & SF_CURR_SESS) { diff --git a/src/check.c b/src/check.c index cb1be9b91d..97d340a57e 100644 --- a/src/check.c +++ b/src/check.c @@ -233,7 +233,9 @@ static void check_trace(enum trace_level level, uint64_t mask, if (check->cs) { - chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", check->cs->conn, check->cs->conn->flags); + struct connection *conn = cs_conn(check->cs); + + chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn ? conn->flags : 0); chunk_appendf(&trace_buf, " cs=%p(0x%08x)", check->cs, check->cs->flags); } @@ -791,7 +793,7 @@ void chk_report_conn_err(struct check *check, int errno_bck, int expired) retrieve_errno_from_socket(conn); if (conn && !(conn->flags & CO_FL_ERROR) && - !(cs->flags & CS_FL_ERROR) && !expired) + cs && !(cs->flags & CS_FL_ERROR) && !expired) return; TRACE_ENTER(CHK_EV_HCHK_END|CHK_EV_HCHK_ERR, check, 0, 0, (size_t[]){expired}); @@ -904,7 +906,7 @@ void chk_report_conn_err(struct check *check, int errno_bck, int expired) set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg); } - if (!conn || !conn->ctrl) { + if (!cs || !conn || !conn->ctrl) { /* error before any connection attempt (connection allocation error or no control layer) */ set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg); } @@ -1016,7 +1018,7 @@ int httpchk_build_status_header(struct server *s, struct buffer *buf) */ static int wake_srv_chk(struct conn_stream *cs) { - struct connection *conn = cs->conn; + struct connection *conn; struct check *check = cs->data; struct email_alertq *q = container_of(check, typeof(*q), check); int ret = 0; @@ -1031,9 +1033,9 @@ static int wake_srv_chk(struct conn_stream *cs) ret = tcpcheck_main(check); cs = check->cs; - conn = cs->conn; + conn = cs_conn(cs); - if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) { + if (unlikely(!conn || !cs || conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) { /* We may get error reports bypassing the I/O handlers, typically * the case when sending a pure TCP check which fails, then the I/O * handlers above are not called. This is completely handled by the @@ -1053,7 +1055,7 @@ static int wake_srv_chk(struct conn_stream *cs) ret = -1; if (check->wait_list.events) - cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list); + conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list); /* We may have been scheduled to run, and the * I/O handler expects to have a cs, so remove @@ -1171,6 +1173,8 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state) TRACE_STATE("health-check complete or aborted", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END, check); check->current_step = NULL; + cs = check->cs; + conn = cs_conn(cs); if (conn && conn->xprt) { /* The check was aborted and the connection was not yet closed. @@ -1182,8 +1186,8 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state) } if (cs) { - if (check->wait_list.events) - cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list); + if (conn && check->wait_list.events) + conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list); /* We may have been scheduled to run, and the * I/O handler expects to have a cs, so remove * the tasklet @@ -1352,7 +1356,10 @@ void free_check(struct check *check) check_release_buf(check, &check->bi); check_release_buf(check, &check->bo); if (check->cs) { - ha_free(&check->cs->conn); + struct connection *conn = cs_conn(check->cs); + + if (conn) + conn_free(conn); cs_free(check->cs); check->cs = NULL; } diff --git a/src/http_ana.c b/src/http_ana.c index c2d9d9b439..6cb248c415 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -1325,10 +1325,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) if (unlikely(htx_is_empty(htx) || htx->first == -1)) { /* 1: have we encountered a read error ? */ if (rep->flags & CF_READ_ERROR) { - struct connection *conn = NULL; - - if (objt_cs(s->si[1].end)) - conn = __objt_cs(s->si[1].end)->conn; + struct connection *conn = cs_conn(objt_cs(s->si[1].end)); /* Perform a L7 retry because server refuses the early data. */ if ((si_b->flags & SI_FL_L7_RETRY) && @@ -5007,7 +5004,7 @@ static void http_debug_stline(const char *dir, struct stream *s, const struct ht chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id, dir, objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1, - objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1); + cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1); max = HTX_SL_P1_LEN(sl); UBOUND(max, trash.size - trash.data - 3); @@ -5038,7 +5035,7 @@ static void http_debug_hdr(const char *dir, struct stream *s, const struct ist n chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id, dir, objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1, - objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1); + cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1); max = n.len; UBOUND(max, trash.size - trash.data - 3); diff --git a/src/stream.c b/src/stream.c index 628bdf56b6..98e34a993b 100644 --- a/src/stream.c +++ b/src/stream.c @@ -460,7 +460,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu si_set_state(&s->si[0], SI_ST_EST); s->si[0].hcto = sess->fe->timeout.clientfin; - if (cs && cs->conn->mux) { + if (cs_conn(cs) && cs->conn->mux) { if (cs->conn->mux->flags & MX_FL_CLEAN_ABRT) s->si[0].flags |= SI_FL_CLEAN_ABRT; if (cs->conn->mux->flags & MX_FL_HTX) @@ -883,8 +883,7 @@ int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout static void back_establish(struct stream *s) { struct stream_interface *si = &s->si[1]; - struct conn_stream *srv_cs = objt_cs(si->end); - struct connection *conn = srv_cs ? srv_cs->conn : objt_conn(si->end); + struct connection *conn = cs_conn(objt_cs(si->end)); struct channel *req = &s->req; struct channel *rep = &s->res; @@ -930,7 +929,7 @@ static void back_establish(struct stream *s) si_rx_endp_more(si); rep->flags |= CF_READ_ATTACHED; /* producer is now attached */ - if (objt_cs(si->end)) { + if (conn) { /* real connections have timeouts * if already defined, it means that a set-timeout rule has * been executed so do not overwrite them @@ -2164,9 +2163,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) && req->to_forward && (global.tune.options & GTUNE_USE_SPLICE) && - (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe && + (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe && __objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->rcv_pipe) && - (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe && + (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe && __objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->snd_pipe) && (pipes_used < global.maxpipes) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) || @@ -2357,9 +2356,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) && res->to_forward && (global.tune.options & GTUNE_USE_SPLICE) && - (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe && + (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe && __objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->snd_pipe) && - (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe && + (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe && __objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->rcv_pipe) && (pipes_used < global.maxpipes) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) || @@ -2436,18 +2435,18 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if (si_b->state == SI_ST_CLO && si_b->prev_state == SI_ST_EST) { chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n", - s->uniq_id, s->be->id, - objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, - objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); + s->uniq_id, s->be->id, + cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, + cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } if (si_f->state == SI_ST_CLO && si_f->prev_state == SI_ST_EST) { chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n", - s->uniq_id, s->be->id, - objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, - objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); + s->uniq_id, s->be->id, + cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, + cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } } @@ -2513,9 +2512,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) if (unlikely((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n", - s->uniq_id, s->be->id, - objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, - objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); + s->uniq_id, s->be->id, + cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1, + cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1); DISGUISE(write(1, trash.area, trash.data)); } @@ -3291,7 +3290,8 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st TICKS_TO_MS(1000)) : "", strm->si[1].err_type, strm->si[1].wait_event.events); - if ((cs = objt_cs(strm->si[0].end)) != NULL) { + if (cs_conn(objt_cs(strm->si[0].end)) != NULL) { + cs = __objt_cs(strm->si[0].end); conn = cs->conn; chunk_appendf(&trash, @@ -3327,7 +3327,8 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st (unsigned long long)tmpctx->t->cpu_time, (unsigned long long)tmpctx->t->lat_time); } - if ((cs = objt_cs(strm->si[1].end)) != NULL) { + if (cs_conn(objt_cs(strm->si[1].end)) != NULL) { + cs = __objt_cs(strm->si[1].end); conn = cs->conn; chunk_appendf(&trash, diff --git a/src/stream_interface.c b/src/stream_interface.c index c32d566c5f..e6254de379 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -354,12 +354,11 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) if (cs && cs->data_cb == &si_conn_cb) { struct stream_interface *si = cs->data; - struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end); struct stream *strm = si_strm(si); ret = make_proxy_line(trash.area, trash.size, objt_server(conn->target), - remote_cs ? remote_cs->conn : NULL, + cs_conn(objt_cs(si_opposite(si)->end)), strm); } else { @@ -434,7 +433,7 @@ static void stream_int_notify(struct stream_interface *si) /* process consumer side */ if (channel_is_empty(oc)) { - struct connection *conn = objt_cs(si->end) ? __objt_cs(si->end)->conn : NULL; + struct connection *conn = cs_conn(objt_cs(si->end)); if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) && (si->state == SI_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)))) @@ -800,7 +799,7 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state) struct conn_stream *cs = objt_cs(si->end); int ret = 0; - if (!cs) + if (!cs_conn(cs)) return t; if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) @@ -927,7 +926,7 @@ void si_sync_send(struct stream_interface *si) return; cs = objt_cs(si->end); - if (!cs || !cs->conn->mux) + if (!cs_conn(cs) || !cs->conn->mux) return; si_cs_send(cs);