]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: tevt/stream/stconn: Report termination events for stream and sc
authorChristopher Faulet <cfaulet@haproxy.com>
Mon, 23 Dec 2024 13:30:33 +0000 (14:30 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Fri, 31 Jan 2025 09:41:50 +0000 (10:41 +0100)
In this patch, events for the stream location are reported. These events are
first reported on the corresponding stream-connector. So front events on scf
and back event on scb. Then all events are both merged in the stream. But
only 4 events are saved on the stream.

Several internal events are for now grouped with the type
"tevt_type_intercepted". More events will be added to have a better
resolution. But at least the place to report these events are identified.

For now, when a event is reported on a SC, it is also reported on the stream
and vice versa.

include/haproxy/stconn-t.h
include/haproxy/stconn.h
include/haproxy/stream-t.h
include/haproxy/stream.h
src/http_ana.c
src/stconn.c
src/stream.c
src/tcp_rules.c

index 6db88f709ec054e6e4f9ca6bb62169df2175ca15..52fa3e88e90c7bf9b44de84f14931b195a967841 100644 (file)
@@ -347,6 +347,7 @@ struct stconn {
 
        unsigned int flags;                  /* SC_FL_* */
        unsigned int ioto;                   /* I/O activity timeout */
+       uint32_t term_evts_log;              /* termination events log aggregating SE + connection events */
        ssize_t room_needed;                 /* free space in the input buffer required to receive more data.
                                              *    -1   : the SC is waiting for room but not on a specific amount of data
                                              *    >= 0 : min free space required to progress. 0 means SC must be unblocked ASAP
index 53f761a2898800ed755e14252c66895fcd6801ca..0299815fdcf90b790a15b7cdceec5ca0797d42c8 100644 (file)
@@ -565,4 +565,13 @@ static inline size_t se_done_ff(struct sedesc *se)
        return ret;
 }
 
+static inline void sc_report_term_evt(struct stconn *sc, enum term_event_loc loc, enum term_event_type type)
+{
+       if (sc->flags & SC_FL_ISBACK)
+               loc += 8;
+       sc->term_evts_log = tevt_report_event(sc->term_evts_log, loc, type);
+       if (sc_strm(sc))
+               __sc_strm(sc)->term_evts_log = tevt_report_event(__sc_strm(sc)->term_evts_log, loc, type);
+}
+
 #endif /* _HAPROXY_STCONN_H */
index 0211adaaed80dd44ea8d25e4acc84d80a7773835..335fed8c94fbdb102bcabfaba3993de57aefe9da 100644 (file)
@@ -324,6 +324,7 @@ struct stream {
        } waiting_entity;                       /* The entity waiting to continue its processing and interrupted by an error/timeout */
 
        unsigned int stream_epoch;              /* copy of stream_epoch when the stream was created */
+       uint32_t term_evts_log;                 /* termination events log */
        struct hlua *hlua[2];                   /* lua runtime context (0: global, 1: per-thread) */
 
        /* Context */
index 4e503a32d247b77c85a88477d59cf75245413179..4070b603afe6a4e15714ae339963406f16f53a9b 100644 (file)
@@ -418,6 +418,19 @@ static inline unsigned int stream_map_task_state(unsigned int state)
                0;
 }
 
+static inline void stream_report_term_evt(struct stconn *sc, enum term_event_loc loc, enum term_event_type type)
+{
+       struct stream *s = sc_strm(sc);
+
+       if (!s)
+               return;
+
+       if (sc->flags & SC_FL_ISBACK)
+               loc += 8;
+       s->term_evts_log = tevt_report_event(s->term_evts_log, loc, type);
+       sc->term_evts_log = tevt_report_event(sc->term_evts_log, loc, type);
+}
+
 
 int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout);
 void stream_retnclose(struct stream *s, const struct buffer *msg);
index 41c5effd65f15729453aee75a48671bfe82c10c8..353333402fb942cc35605d6bedd4786e1207de56 100644 (file)
@@ -353,6 +353,8 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
        /* fall through */
 
  return_prx_cond:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
        http_set_term_flags(s);
        http_reply_and_close(s, txn->status, http_error_message(s));
 
@@ -505,6 +507,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
                if (!http_apply_redirect_rule(rule, s, txn)) {
                        goto return_int_err;
                }
+               stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
                goto done;
        }
 
@@ -610,8 +613,9 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
        /* fall through */
 
  return_prx_cond:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
        http_set_term_flags(s);
-
        req->analysers &= AN_REQ_FLT_END;
        req->analyse_exp = TICK_ETERNITY;
        s->current_rule = s->current_rule_list = NULL;
@@ -747,6 +751,8 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit)
        if (sess->listener && sess->listener->counters)
                _HA_ATOMIC_INC(&sess->listener->counters->internal_errors);
 
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
        http_set_term_flags(s);
        http_reply_and_close(s, txn->status, http_error_message(s));
 
@@ -788,6 +794,8 @@ int http_process_tarpit(struct stream *s, struct channel *req, int an_bit)
         */
        s->logs.t_queue = ns_to_ms(now_ns - s->logs.accept_ts);
 
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
        http_set_term_flags(s);
        http_reply_and_close(s, txn->status, (!(s->scf->flags & SC_FL_ERROR) ? http_error_message(s) : NULL));
 
@@ -868,6 +876,8 @@ int http_wait_for_request_body(struct stream *s, struct channel *req, int an_bit
        /* fall through */
 
  return_prx_cond:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
        http_set_term_flags(s);
 
        req->analysers &= AN_REQ_FLT_END;
@@ -1102,6 +1112,8 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
        goto return_prx_cond;
 
   return_int_err:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
        if (!(s->flags & SF_ERR_MASK))
                s->flags |= SF_ERR_INTERNAL;
        _HA_ATOMIC_INC(&sess->fe->fe_counters.internal_errors);
@@ -1115,6 +1127,8 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
        goto return_prx_cond;
 
   return_bad_req:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
        _HA_ATOMIC_INC(&sess->fe->fe_counters.failed_req);
        if (sess->listener && sess->listener->counters)
                _HA_ATOMIC_INC(&sess->listener->counters->failed_req);
@@ -1669,6 +1683,8 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
        /* fall through */
 
  return_prx_cond:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
        http_set_term_flags(s);
        http_reply_and_close(s, txn->status, http_error_message(s));
 
@@ -2004,6 +2020,8 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
  return_prx_cond:
        s->scb->flags |= SC_FL_NOLINGER;
 
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
        http_set_term_flags(s);
 
        rep->analysers &= AN_RES_FLT_END;
@@ -2245,6 +2263,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
        goto return_error;
 
   return_int_err:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
        _HA_ATOMIC_INC(&sess->fe->fe_counters.internal_errors);
        _HA_ATOMIC_INC(&s->be->be_counters.internal_errors);
        if (sess->listener && sess->listener->counters)
@@ -2257,6 +2277,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
        goto return_error;
 
   return_bad_res:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
        _HA_ATOMIC_INC(&s->be->be_counters.failed_resp);
        if (objt_server(s->target)) {
                _HA_ATOMIC_INC(&__objt_server(s->target)->counters.failed_resp);
index 1e69abc48f7846653678723c473d1ae62bec1e3a..ef149c573b647e7cb68eec1b7489db9883ddf4e9 100644 (file)
@@ -100,7 +100,7 @@ void sedesc_init(struct sedesc *sedesc)
        sedesc->fsb = TICK_ETERNITY;
        sedesc->xref.peer = NULL;
        se_fl_setall(sedesc, SE_FL_NONE);
-
+       sedesc->term_evts_log = 0;
        sedesc->abort_info.info = 0;
        sedesc->abort_info.code = 0;
 
@@ -146,8 +146,10 @@ void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode)
        struct se_abort_info *reason = NULL;
        unsigned int flags = 0;
 
-       if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW))
+       if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW)) {
+               sc_report_term_evt(sedesc->sc, tevt_loc_strm, tevt_type_shutw);
                flags |= (mode & SE_SHW_NORMAL) ? SE_FL_SHWN : SE_FL_SHWS;
+       }
        if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR))
                flags |= (mode & SE_SHR_DRAIN) ? SE_FL_SHRD : SE_FL_SHRR;
 
@@ -208,6 +210,8 @@ static struct stconn *sc_new(struct sedesc *sedesc)
        sc->wait_event.tasklet = NULL;
        sc->wait_event.events = 0;
 
+       sc->term_evts_log = 0;
+
        /* If there is no endpoint, allocate a new one now */
        if (!sedesc) {
                sedesc = sedesc_new();
@@ -1233,7 +1237,7 @@ static void sc_conn_eos(struct stconn *sc)
        sc->flags |= SC_FL_EOS;
        ic->flags |= CF_READ_EVENT;
        sc_ep_report_read_activity(sc);
-
+       sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_shutr: tevt_type_truncated_shutr));
        if (sc->state != SC_ST_EST)
                return;
 
@@ -1520,6 +1524,8 @@ int sc_conn_recv(struct stconn *sc)
        }
        if (sc_ep_test(sc, SE_FL_ERROR)) {
                sc->flags |= SC_FL_ERROR;
+               if (!(sc->flags & SC_FL_EOS))
+                       sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_rcv_err: tevt_type_truncated_rcv_err));
                ret = 1;
        }
 
@@ -1745,6 +1751,7 @@ int sc_conn_send(struct stconn *sc)
        if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
                oc->flags |= CF_WRITE_EVENT;
                BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
+               sc_report_term_evt(sc, tevt_loc_strm, tevt_type_snd_err);
                if (sc_ep_test(sc, SE_FL_ERROR))
                        sc->flags |= SC_FL_ERROR;
                return 1;
@@ -1856,35 +1863,38 @@ int sc_conn_process(struct stconn *sc)
                        sc->state = SC_ST_RDY;
        }
 
-       /* Report EOS on the channel if it was reached from the mux point of
+       /* Report EOI on the channel if it was reached from the mux point of
         * view.
         *
         * Note: This test is only required because sc_conn_process is also the SI
         *       wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take
         *       care of it.
         */
-       if (sc_ep_test(sc, SE_FL_EOS) && !(sc->flags & SC_FL_EOS)) {
-               /* we received a shutdown */
-               if (ic->flags & CF_AUTO_CLOSE)
-                       sc_schedule_shutdown(sc_opposite(sc));
-               sc_conn_eos(sc);
+       if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
+               sc->flags |= SC_FL_EOI;
+               ic->flags |= CF_READ_EVENT;
+               sc_ep_report_read_activity(sc);
        }
 
-       /* Report EOI on the channel if it was reached from the mux point of
+       /* Report EOS on the channel if it was reached from the mux point of
         * view.
         *
         * Note: This test is only required because sc_conn_process is also the SI
         *       wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take
         *       care of it.
         */
-       if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
-               sc->flags |= SC_FL_EOI;
-               ic->flags |= CF_READ_EVENT;
-               sc_ep_report_read_activity(sc);
+       if (sc_ep_test(sc, SE_FL_EOS) && !(sc->flags & SC_FL_EOS)) {
+               /* we received a shutdown */
+               if (ic->flags & CF_AUTO_CLOSE)
+                       sc_schedule_shutdown(sc_opposite(sc));
+               sc_conn_eos(sc);
        }
 
-       if (sc_ep_test(sc, SE_FL_ERROR))
+       if (sc_ep_test(sc, SE_FL_ERROR) && !(sc->flags & SC_FL_ERROR)) {
+               if (!(sc->flags & SC_FL_EOS))
+                       sc_report_term_evt(sc, tevt_loc_strm, (sc->flags & SC_FL_EOI ? tevt_type_rcv_err: tevt_type_truncated_rcv_err));
                sc->flags |= SC_FL_ERROR;
+       }
 
        /* Second step : update the stream connector and channels, try to forward any
         * pending data, then possibly wake the stream up based on the new
index 819de896798fdda1d41e0ec7c890eb357324e0e3..c61c3fcb6c532ee9ff79ef14a7374f1e33d0125d 100644 (file)
@@ -413,6 +413,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer
 
        s->stream_epoch = _HA_ATOMIC_LOAD(&stream_epoch);
        s->uniq_id = _HA_ATOMIC_FETCH_ADD(&global.req_count, 1);
+       s->term_evts_log = 0;
 
        /* OK, we're keeping the stream, so let's properly initialize the stream */
        LIST_INIT(&s->back_refs);
@@ -1575,21 +1576,25 @@ static void stream_handle_timeouts(struct stream *s)
        channel_check_timeout(&s->res);
 
        if (unlikely(!(s->scb->flags & SC_FL_SHUT_DONE) && (s->req.flags & CF_WRITE_TIMEOUT))) {
+               stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_tout);
                s->scb->flags |= SC_FL_NOLINGER;
                sc_shutdown(s->scb);
        }
 
        if (unlikely(!(s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && (s->req.flags & CF_READ_TIMEOUT))) {
+               stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_tout);
                if (s->scf->flags & SC_FL_NOHALF)
                        s->scf->flags |= SC_FL_NOLINGER;
                sc_abort(s->scf);
        }
        if (unlikely(!(s->scf->flags & SC_FL_SHUT_DONE) && (s->res.flags & CF_WRITE_TIMEOUT))) {
+               stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_tout);
                s->scf->flags |= SC_FL_NOLINGER;
                sc_shutdown(s->scf);
        }
 
        if (unlikely(!(s->scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && (s->res.flags & CF_READ_TIMEOUT))) {
+               stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_tout);
                if (s->scb->flags & SC_FL_NOHALF)
                        s->scb->flags |= SC_FL_NOLINGER;
                sc_abort(s->scb);
index 9d450cfa9413d9d04128e7da4022e5545ca4c603..8bb5d00c96de3e97c6a1f522e1e68d71030d4078 100644 (file)
@@ -269,6 +269,8 @@ resume_execution:
        stream_abort(s);
 
  abort:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scf, tevt_loc_strm, tevt_type_intercepted);
        req->analysers &= AN_REQ_FLT_END;
        s->current_rule = s->current_rule_list = NULL;
        req->analyse_exp = s->rules_exp = TICK_ETERNITY;
@@ -475,6 +477,8 @@ resume_execution:
        stream_abort(s);
 
   abort:
+       // XXX: All errors are handled as intercepted here !
+       stream_report_term_evt(s->scb, tevt_loc_strm, tevt_type_intercepted);
        rep->analysers &= AN_RES_FLT_END;
        s->current_rule = s->current_rule_list = NULL;
        rep->analyse_exp = s->rules_exp = TICK_ETERNITY;