]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: connection: Be prepared to handle conn-stream with no connection
authorChristopher Faulet <cfaulet@haproxy.com>
Wed, 15 Dec 2021 08:50:17 +0000 (09:50 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Thu, 24 Feb 2022 10:00:01 +0000 (11:00 +0100)
The conn-stream will progressively replace the stream-interface. Thus, a
stream will have to allocate the backend conn-stream during its
creation. This means it will be possible to have a conn-stream with no
connection. To prepare this change, we test the conn-stream's connection
when we retrieve it.

include/haproxy/connection.h
include/haproxy/stream_interface.h
src/backend.c
src/check.c
src/http_ana.c
src/stream.c
src/stream_interface.c

index 7cc852fd1e5930f88b61344a3d837c1d8f676519..e0af4c3aa067cab1e90c2861716405ef17a1d30d 100644 (file)
@@ -246,10 +246,16 @@ static inline void conn_xprt_shutw_hard(struct connection *c)
                c->xprt->shutw(c, c->xprt_ctx, 0);
 }
 
+/* Returns the conn from a cs. If cs is NULL, returns NULL */
+static inline struct connection *cs_conn(const struct conn_stream *cs)
+{
+       return cs ? cs->conn : NULL;
+}
+
 /* shut read */
 static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
 {
-       if (cs->flags & CS_FL_SHR)
+       if (!cs_conn(cs) || cs->flags & CS_FL_SHR)
                return;
 
        /* clean data-layer shutdown */
@@ -261,7 +267,7 @@ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
 /* shut write */
 static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
 {
-       if (cs->flags & CS_FL_SHW)
+       if (!cs_conn(cs) || cs->flags & CS_FL_SHW)
                return;
 
        /* clean data-layer shutdown */
@@ -387,29 +393,25 @@ static inline void conn_force_unsubscribe(struct connection *conn)
 /* Release a conn_stream */
 static inline void cs_destroy(struct conn_stream *cs)
 {
-       if (cs->conn->mux)
-               cs->conn->mux->detach(cs);
-       else {
-               /* It's too early to have a mux, let's just destroy
-                * the connection
-                */
-               struct connection *conn = cs->conn;
-
-               conn_stop_tracking(conn);
-               conn_full_close(conn);
-               if (conn->destroy_cb)
-                       conn->destroy_cb(conn);
-               conn_free(conn);
+       if (cs_conn(cs)) {
+               if (cs->conn->mux)
+                       cs->conn->mux->detach(cs);
+               else {
+                       /* It's too early to have a mux, let's just destroy
+                        * the connection
+                        */
+                       struct connection *conn = cs->conn;
+
+                       conn_stop_tracking(conn);
+                       conn_full_close(conn);
+                       if (conn->destroy_cb)
+                               conn->destroy_cb(conn);
+                       conn_free(conn);
+               }
        }
        cs_free(cs);
 }
 
-/* Returns the conn from a cs. If cs is NULL, returns NULL */
-static inline struct connection *cs_conn(const struct conn_stream *cs)
-{
-       return cs ? cs->conn : NULL;
-}
-
 /* Returns the source address of the connection or NULL if not set */
 static inline const struct sockaddr_storage *conn_src(struct connection *conn)
 {
index c1c2b03fa8dc07c492853db4767c1f8279a6f917..55c622b961757ba5948eeef960a0ddf988046565 100644 (file)
@@ -183,9 +183,9 @@ static inline void si_release_endpoint(struct stream_interface *si)
                appctx_free(appctx);
        }
        else if ((cs = objt_cs(si->end))) {
-               if (si->wait_event.events != 0)
+               if (cs_conn(cs) && si->wait_event.events != 0)
                        cs->conn->mux->unsubscribe(cs, si->wait_event.events,
-                           &si->wait_event);
+                                                  &si->wait_event);
                cs_destroy(cs);
        }
        si_detach_endpoint(si);
@@ -481,7 +481,7 @@ static inline int si_sync_recv(struct stream_interface *si)
                return 0;
 
        cs = objt_cs(si->end);
-       if (!cs || !cs->conn->mux)
+       if (!cs_conn(cs) || !cs->conn->mux)
                return 0; // only conn_streams are supported
 
        if (si->wait_event.events & SUB_RETRY_RECV)
@@ -578,7 +578,7 @@ static inline const struct sockaddr_storage *si_src(struct stream_interface *si)
        else {
                struct conn_stream *cs = objt_cs(si->end);
 
-               if (cs && cs->conn)
+               if (cs_conn(cs))
                        return conn_src(cs->conn);
        }
        return NULL;
@@ -598,7 +598,7 @@ static inline const struct sockaddr_storage *si_dst(struct stream_interface *si)
        else {
                struct conn_stream *cs = objt_cs(si->end);
 
-               if (cs && cs->conn)
+               if (cs_conn(cs))
                        return conn_dst(cs->conn);
        }
        return NULL;
@@ -622,7 +622,7 @@ static inline int si_get_src(struct stream_interface *si)
        else {
                struct conn_stream *cs = objt_cs(si->end);
 
-               if (cs && cs->conn)
+               if (cs_conn(cs))
                        src = conn_src(cs->conn);
        }
        if (!src)
@@ -653,7 +653,7 @@ static inline int si_get_dst(struct stream_interface *si)
        else {
                struct conn_stream *cs = objt_cs(si->end);
 
-               if (cs && cs->conn)
+               if (cs_conn(cs))
                        dst = conn_dst(cs->conn);
        }
        if (!dst)
index fb6313181f1410b8157133ec0e32c7ba6fbc74e3..6a793a0c7ba77ac5658427b194601a8784f29ea8 100644 (file)
@@ -2220,8 +2220,6 @@ void back_handle_st_con(struct stream *s)
 void back_handle_st_cer(struct stream *s)
 {
        struct stream_interface *si = &s->si[1];
-       struct conn_stream *cs = objt_cs(si->end);
-       struct connection *conn = cs_conn(cs);
 
        DBG_TRACE_ENTER(STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
 
@@ -2230,6 +2228,8 @@ void back_handle_st_cer(struct stream *s)
 
        /* we probably have to release last stream from the server */
        if (objt_server(s->target)) {
+               struct connection *conn = cs_conn(objt_cs(si->end));
+
                health_adjust(__objt_server(s->target), HANA_STATUS_L4_ERR);
 
                if (s->flags & SF_CURR_SESS) {
index cb1be9b91dad717e9c4b1f6db8565c5cd717db61..97d340a57e6b1078e08578c933b3a9bc986222a6 100644 (file)
@@ -233,7 +233,9 @@ static void check_trace(enum trace_level level, uint64_t mask,
 
 
        if (check->cs) {
-               chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", check->cs->conn, check->cs->conn->flags);
+               struct connection *conn = cs_conn(check->cs);
+
+               chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn ? conn->flags : 0);
                chunk_appendf(&trace_buf, " cs=%p(0x%08x)", check->cs, check->cs->flags);
        }
 
@@ -791,7 +793,7 @@ void chk_report_conn_err(struct check *check, int errno_bck, int expired)
                retrieve_errno_from_socket(conn);
 
        if (conn && !(conn->flags & CO_FL_ERROR) &&
-           !(cs->flags & CS_FL_ERROR) && !expired)
+           cs && !(cs->flags & CS_FL_ERROR) && !expired)
                return;
 
        TRACE_ENTER(CHK_EV_HCHK_END|CHK_EV_HCHK_ERR, check, 0, 0, (size_t[]){expired});
@@ -904,7 +906,7 @@ void chk_report_conn_err(struct check *check, int errno_bck, int expired)
                set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg);
        }
 
-       if (!conn || !conn->ctrl) {
+       if (!cs || !conn || !conn->ctrl) {
                /* error before any connection attempt (connection allocation error or no control layer) */
                set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg);
        }
@@ -1016,7 +1018,7 @@ int httpchk_build_status_header(struct server *s, struct buffer *buf)
  */
 static int wake_srv_chk(struct conn_stream *cs)
 {
-       struct connection *conn = cs->conn;
+       struct connection *conn;
        struct check *check = cs->data;
        struct email_alertq *q = container_of(check, typeof(*q), check);
        int ret = 0;
@@ -1031,9 +1033,9 @@ static int wake_srv_chk(struct conn_stream *cs)
        ret = tcpcheck_main(check);
 
        cs = check->cs;
-       conn = cs->conn;
+       conn = cs_conn(cs);
 
-       if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
+       if (unlikely(!conn || !cs || conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
                /* We may get error reports bypassing the I/O handlers, typically
                 * the case when sending a pure TCP check which fails, then the I/O
                 * handlers above are not called. This is completely handled by the
@@ -1053,7 +1055,7 @@ static int wake_srv_chk(struct conn_stream *cs)
                ret = -1;
 
                if (check->wait_list.events)
-                       cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
+                       conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
 
                /* We may have been scheduled to run, and the
                 * I/O handler expects to have a cs, so remove
@@ -1171,6 +1173,8 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
        TRACE_STATE("health-check complete or aborted", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END, check);
 
        check->current_step = NULL;
+       cs = check->cs;
+       conn = cs_conn(cs);
 
        if (conn && conn->xprt) {
                /* The check was aborted and the connection was not yet closed.
@@ -1182,8 +1186,8 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
        }
 
        if (cs) {
-               if (check->wait_list.events)
-                       cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
+               if (conn && check->wait_list.events)
+                       conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
                /* We may have been scheduled to run, and the
                 * I/O handler expects to have a cs, so remove
                 * the tasklet
@@ -1352,7 +1356,10 @@ void free_check(struct check *check)
        check_release_buf(check, &check->bi);
        check_release_buf(check, &check->bo);
        if (check->cs) {
-               ha_free(&check->cs->conn);
+               struct connection *conn = cs_conn(check->cs);
+
+               if (conn)
+                       conn_free(conn);
                cs_free(check->cs);
                check->cs = NULL;
        }
index c2d9d9b439aedaecaaee7dbc080fd41bd2f53aa4..6cb248c415a5e7421d2f05c980f4dd5d0f6630af 100644 (file)
@@ -1325,10 +1325,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
        if (unlikely(htx_is_empty(htx) || htx->first == -1)) {
                /* 1: have we encountered a read error ? */
                if (rep->flags & CF_READ_ERROR) {
-                       struct connection *conn = NULL;
-
-                       if (objt_cs(s->si[1].end))
-                               conn = __objt_cs(s->si[1].end)->conn;
+                       struct connection *conn = cs_conn(objt_cs(s->si[1].end));
 
                        /* Perform a L7 retry because server refuses the early data. */
                        if ((si_b->flags & SI_FL_L7_RETRY) &&
@@ -5007,7 +5004,7 @@ static void http_debug_stline(const char *dir, struct stream *s, const struct ht
         chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
                      dir,
                      objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1,
-                     objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1);
+                     cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1);
 
         max = HTX_SL_P1_LEN(sl);
         UBOUND(max, trash.size - trash.data - 3);
@@ -5038,7 +5035,7 @@ static void http_debug_hdr(const char *dir, struct stream *s, const struct ist n
         chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
                      dir,
                      objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1,
-                     objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1);
+                     cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1);
 
         max = n.len;
         UBOUND(max, trash.size - trash.data - 3);
index 628bdf56b60778b68d87c3401613b1e2074c841c..98e34a993b6f311611548d09c5a78789c8ac17e3 100644 (file)
@@ -460,7 +460,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu
        si_set_state(&s->si[0], SI_ST_EST);
        s->si[0].hcto = sess->fe->timeout.clientfin;
 
-       if (cs && cs->conn->mux) {
+       if (cs_conn(cs) && cs->conn->mux) {
                if (cs->conn->mux->flags & MX_FL_CLEAN_ABRT)
                        s->si[0].flags |= SI_FL_CLEAN_ABRT;
                if (cs->conn->mux->flags & MX_FL_HTX)
@@ -883,8 +883,7 @@ int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout
 static void back_establish(struct stream *s)
 {
        struct stream_interface *si = &s->si[1];
-       struct conn_stream *srv_cs = objt_cs(si->end);
-       struct connection *conn = srv_cs ? srv_cs->conn : objt_conn(si->end);
+       struct connection *conn = cs_conn(objt_cs(si->end));
        struct channel *req = &s->req;
        struct channel *rep = &s->res;
 
@@ -930,7 +929,7 @@ static void back_establish(struct stream *s)
 
        si_rx_endp_more(si);
        rep->flags |= CF_READ_ATTACHED; /* producer is now attached */
-       if (objt_cs(si->end)) {
+       if (conn) {
                /* real connections have timeouts
                 * if already defined, it means that a set-timeout rule has
                 * been executed so do not overwrite them
@@ -2164,9 +2163,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
            req->to_forward &&
            (global.tune.options & GTUNE_USE_SPLICE) &&
-           (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe &&
+           (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe &&
             __objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->rcv_pipe) &&
-           (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe &&
+           (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe &&
             __objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->snd_pipe) &&
            (pipes_used < global.maxpipes) &&
            (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
@@ -2357,9 +2356,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
            res->to_forward &&
            (global.tune.options & GTUNE_USE_SPLICE) &&
-           (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe &&
+           (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe &&
             __objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->snd_pipe) &&
-           (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe &&
+           (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe &&
             __objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->rcv_pipe) &&
            (pipes_used < global.maxpipes) &&
            (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
@@ -2436,18 +2435,18 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
                if (si_b->state == SI_ST_CLO &&
                    si_b->prev_state == SI_ST_EST) {
                        chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
-                                     s->uniq_id, s->be->id,
-                                     objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
-                                     objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+                                    s->uniq_id, s->be->id,
+                                    cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+                                    cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
                        DISGUISE(write(1, trash.area, trash.data));
                }
 
                if (si_f->state == SI_ST_CLO &&
                    si_f->prev_state == SI_ST_EST) {
                        chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
-                                     s->uniq_id, s->be->id,
-                                     objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
-                                     objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+                                    s->uniq_id, s->be->id,
+                                    cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+                                    cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
                        DISGUISE(write(1, trash.area, trash.data));
                }
        }
@@ -2513,9 +2512,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        if (unlikely((global.mode & MODE_DEBUG) &&
                     (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
                chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
-                             s->uniq_id, s->be->id,
-                             objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
-                             objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+                            s->uniq_id, s->be->id,
+                            cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+                            cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
                DISGUISE(write(1, trash.area, trash.data));
        }
 
@@ -3291,7 +3290,8 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
                                             TICKS_TO_MS(1000)) : "<NEVER>",
                             strm->si[1].err_type, strm->si[1].wait_event.events);
 
-               if ((cs = objt_cs(strm->si[0].end)) != NULL) {
+               if (cs_conn(objt_cs(strm->si[0].end)) != NULL) {
+                       cs = __objt_cs(strm->si[0].end);
                        conn = cs->conn;
 
                        chunk_appendf(&trash,
@@ -3327,7 +3327,8 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
                                      (unsigned long long)tmpctx->t->cpu_time, (unsigned long long)tmpctx->t->lat_time);
                }
 
-               if ((cs = objt_cs(strm->si[1].end)) != NULL) {
+               if (cs_conn(objt_cs(strm->si[1].end)) != NULL) {
+                       cs = __objt_cs(strm->si[1].end);
                        conn = cs->conn;
 
                        chunk_appendf(&trash,
index c32d566c5f69ed5afca9127f8a0d080f9ace59a1..e6254de379d93c9d7269c8f59aac81c40a73bf70 100644 (file)
@@ -354,12 +354,11 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
 
                if (cs && cs->data_cb == &si_conn_cb) {
                        struct stream_interface *si = cs->data;
-                       struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
                        struct stream *strm = si_strm(si);
 
                        ret = make_proxy_line(trash.area, trash.size,
                                              objt_server(conn->target),
-                                             remote_cs ? remote_cs->conn : NULL,
+                                             cs_conn(objt_cs(si_opposite(si)->end)),
                                              strm);
                }
                else {
@@ -434,7 +433,7 @@ static void stream_int_notify(struct stream_interface *si)
 
        /* process consumer side */
        if (channel_is_empty(oc)) {
-               struct connection *conn = objt_cs(si->end) ? __objt_cs(si->end)->conn : NULL;
+               struct connection *conn = cs_conn(objt_cs(si->end));
 
                if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
                    (si->state == SI_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
@@ -800,7 +799,7 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state)
        struct conn_stream *cs = objt_cs(si->end);
        int ret = 0;
 
-       if (!cs)
+       if (!cs_conn(cs))
                return t;
 
        if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
@@ -927,7 +926,7 @@ void si_sync_send(struct stream_interface *si)
                return;
 
        cs = objt_cs(si->end);
-       if (!cs || !cs->conn->mux)
+       if (!cs_conn(cs) || !cs->conn->mux)
                return;
 
        si_cs_send(cs);