]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: mux-h1: Drain requests on client side before shut a stream down
authorChristopher Faulet <cfaulet@haproxy.com>
Mon, 26 Feb 2024 06:50:23 +0000 (07:50 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 28 Feb 2024 15:02:33 +0000 (16:02 +0100)
unlike for H2 and H3, there is no mechanism in H1 to notify the client it
must stop to upload data when a response is replied before the end of the
request without closing the connection. There is no RST_STREAM frame
equivalent.

Thus, there is only two ways to deal with this situation: closing the
connection or draining the request. Until now, HAProxy didn't support
draining H1 messages. Closing the connection in this case has however a
major drawback. It leads to send a TCP reset, dropping this way all in-fly
data. There is no warranty the client has fully received the response.

Draining H1 messages was never implemented because in old versions it was a
bit tricky to implement. However, it is now far simplier to support this
feature because it is possible to have a H1 stream without any applicative
stream. It is the purpose of this patch. Now, when a shutdown is requested
and the stream is detached from the connection, if the request is unfinished
while the response was fully sent, the request in drained.

To do so, in this case the shutdown and the detach are delayed. From the
upper layer point of view, there is no changes. The endpoint is shut down
and detached as usual. But on H1 mux point of view, the H1 stream is still
alive and is being able to drain data. However the stream-endpoint
descriptor is orphan. Once the request is fully received (and drained), the
connection is shut down if it cannot be reused for a new transaction and the
H1 stream is destroyed.

include/haproxy/mux_h1-t.h
src/mux_h1.c

index 2f49a495eb0aaf609e53fef91cf7151b6762a30e..5f4cd7f5f0ba7d132f460c893811e17bf5320471 100644 (file)
@@ -134,6 +134,7 @@ enum h1_cs {
        H1_CS_EMBRYONIC,   /* Connection is waiting for the message headers (H1S is not NULL, not attached to a SC - Frontend connection only) */
        H1_CS_UPGRADING,   /* TCP>H1 upgrade in-progress (H1S is not NULL and attached to a SC - Frontend connection only) */
        H1_CS_RUNNING,     /* Connection fully established and the H1S is processing data (H1S is not NULL and attached to a SC) */
+       H1_CS_DRAINING,    /* H1C is draining the message before destroying the H1S (H1S is not NULL but no SC attached) */
        H1_CS_CLOSING,     /* Send pending outgoing data and close the connection ASAP  (H1S may be NULL) */
        H1_CS_CLOSED,      /* Connection must be closed now and H1C must be released (H1S is NULL) */
        H1_CS_ENTRIES,
@@ -150,6 +151,7 @@ static inline const char *h1c_st_to_str(enum h1_cs st)
        case H1_CS_EMBRYONIC: return "EMB";
        case H1_CS_UPGRADING: return "UPG";
        case H1_CS_RUNNING:   return "RUN";
+       case H1_CS_DRAINING:  return "DRN";
        case H1_CS_CLOSING:   return "CLI";
        case H1_CS_CLOSED:    return "CLD";
        default:              return "???";
index 0f91ec016fd04438b7ea1c0001641a8899404adc..581e59d729394bc5abaf421d855bb7fc606a73b9 100644 (file)
@@ -556,11 +556,11 @@ static inline void h1_release_buf(struct h1c *h1c, struct buffer *bptr)
 }
 
 /* Returns 1 if the H1 connection is alive (IDLE, EMBRYONIC, RUNNING or
- * RUNNING). Ortherwise 0 is returned.
+ * DRAINING). Ortherwise 0 is returned.
  */
 static inline int h1_is_alive(const struct h1c *h1c)
 {
-       return (h1c->state <= H1_CS_RUNNING);
+       return (h1c->state <= H1_CS_DRAINING);
 }
 
 /* Switch the H1 connection to CLOSING or CLOSED mode, depending on the output
@@ -952,6 +952,10 @@ static int h1s_must_shut_conn(struct h1s *h1s)
                TRACE_STATE("keep connection alive (UPGRADING)", H1_EV_STRM_SHUT, h1c->conn, h1s);
                ret = 0;
        }
+       else if (!(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) {
+               TRACE_STATE("defer shutdown to drain request first", H1_EV_STRM_SHUT, h1c->conn, h1s);
+               ret = 0;
+       }
        else if (((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) {
                TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s);
                ret = 0;
@@ -3485,6 +3489,11 @@ static int h1_handle_internal_err(struct h1c *h1c)
        struct session *sess = h1c->conn->owner;
        int ret = 0;
 
+       if (h1c->state == H1_CS_DRAINING) {
+               h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
+               h1s_destroy(h1c->h1s);
+               goto end;
+       }
        session_inc_http_req_ctr(sess);
        proxy_inc_fe_req_ctr(sess->listener, sess->fe, 1);
        _HA_ATOMIC_INC(&sess->fe->fe_counters.p.http.rsp[5]);
@@ -3495,6 +3504,7 @@ static int h1_handle_internal_err(struct h1c *h1c)
        h1c->errcode = 500;
        ret = h1_send_error(h1c);
        sess_log(sess);
+  end:
        return ret;
 }
 
@@ -3508,6 +3518,11 @@ static int h1_handle_parsing_error(struct h1c *h1c)
        struct session *sess = h1c->conn->owner;
        int ret = 0;
 
+       if (h1c->state == H1_CS_DRAINING) {
+               h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
+               h1s_destroy(h1c->h1s);
+               goto end;
+       }
        if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
                h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
                h1_close(h1c);
@@ -3541,6 +3556,11 @@ static int h1_handle_not_impl_err(struct h1c *h1c)
        struct session *sess = h1c->conn->owner;
        int ret = 0;
 
+       if (h1c->state == H1_CS_DRAINING) {
+               h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
+               h1s_destroy(h1c->h1s);
+               goto end;
+       }
        if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
                h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
                h1_close(h1c);
@@ -3571,6 +3591,11 @@ static int h1_handle_req_tout(struct h1c *h1c)
        struct session *sess = h1c->conn->owner;
        int ret = 0;
 
+       if (h1c->state == H1_CS_DRAINING) {
+               h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
+               h1s_destroy(h1c->h1s);
+               goto end;
+       }
        if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
                h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
                h1_close(h1c);
@@ -3788,7 +3813,7 @@ static int h1_process(struct h1c * h1c)
 
        /* Try to parse now the first block of a request, creating the H1 stream if necessary */
        if (b_data(&h1c->ibuf) &&                                                /* Input data to be processed */
-           (h1c->state < H1_CS_RUNNING) &&                                      /* IDLE, EMBRYONIC or UPGRADING */
+           ((h1c->state < H1_CS_RUNNING) || (h1c->state == H1_CS_DRAINING)) &&  /* IDLE, EMBRYONIC, UPGRADING or DRAINING */
            !(h1c->flags & (H1C_F_IN_SALLOC|H1C_F_ABRT_PENDING))) {              /* No allocation failure on the stream rxbuf and no ERROR on the H1C */
                struct h1s *h1s = h1c->h1s;
                struct buffer *buf;
@@ -3799,7 +3824,8 @@ static int h1_process(struct h1c * h1c)
                        goto release;
 
                /* First of all handle H1 to H2 upgrade (no need to create the H1 stream) */
-               if (!(h1c->flags & H1C_F_WAIT_NEXT_REQ) &&         /* First request */
+               if (h1c->state != H1_CS_DRAINING &&                /* Not draining message */
+                   !(h1c->flags & H1C_F_WAIT_NEXT_REQ) &&         /* First request */
                    !(h1c->px->options2 & PR_O2_NO_H2_UPGRADE) &&  /* H2 upgrade supported by the proxy */
                    !(conn->mux->flags & MX_FL_NO_UPG)) {          /* the current mux supports upgrades */
                        /* Try to match H2 preface before parsing the request headers. */
@@ -3840,7 +3866,7 @@ static int h1_process(struct h1c * h1c)
                h1_process_demux(h1c, buf, count);
                h1_release_buf(h1c, &h1s->rxbuf);
                h1_set_idle_expiration(h1c);
-               if (h1c->state < H1_CS_RUNNING) {
+               if (h1c->state != H1_CS_RUNNING) { // TODO: be sure state cannot change in h1_process_demux
                        if (h1s->flags & H1S_F_INTERNAL_ERROR) {
                                h1_handle_internal_err(h1c);
                                TRACE_ERROR("internal error detected", H1_EV_H1C_WAKE|H1_EV_H1C_ERR);
@@ -3883,6 +3909,11 @@ static int h1_process(struct h1c * h1c)
                                if (h1_send_error(h1c))
                                        h1_send(h1c);
                        }
+                       else if (h1c->state == H1_CS_DRAINING) {
+                               BUG_ON(h1c->h1s->sd && !se_fl_test(h1c->h1s->sd, SE_FL_ORPHAN));
+                               h1s_destroy(h1c->h1s);
+                               TRACE_STATE("abort/error when draining message. destroy h1s and close h1c", H1_EV_H1S_END, h1c->conn);
+                       }
                        else {
                                h1_close(h1c);
                                TRACE_STATE("close h1c", H1_EV_H1S_END, h1c->conn);
@@ -3911,6 +3942,17 @@ static int h1_process(struct h1c * h1c)
                        h1_alert(h1s);
                }
        }
+       else if (h1c->state == H1_CS_DRAINING) {
+               BUG_ON(!h1c->h1s);
+               if (se_fl_test(h1c->h1s->sd, SE_FL_EOI)) {
+                       if (h1s_must_shut_conn(h1c->h1s)) {
+                               h1_shutw_conn(conn);
+                               goto release;
+                       }
+                       h1s_finish_detach(h1c->h1s);
+                       goto end;
+               }
+       }
 
        if (!b_data(&h1c->ibuf))
                h1_release_buf(h1c, &h1c->ibuf);
@@ -4218,6 +4260,7 @@ static void h1_destroy(void *ctx)
 static void h1_detach(struct sedesc *sd)
 {
        struct h1s *h1s = sd->se;
+       struct h1c *h1c;
 
        TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s);
 
@@ -4225,7 +4268,25 @@ static void h1_detach(struct sedesc *sd)
                TRACE_LEAVE(H1_EV_STRM_END);
                return;
        }
-       h1s_finish_detach(h1s);
+       h1c = h1s->h1c;
+
+       if (h1c->state == H1_CS_RUNNING && !(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE) {
+               h1c->state = H1_CS_DRAINING;
+               TRACE_DEVEL("Deferring H1S destroy to drain message", H1_EV_STRM_END, h1s->h1c->conn, h1s);
+               /* If we have a pending data, process it immediately or
+                * subscribe for reads waiting for new data
+                */
+               if (unlikely(b_data(&h1c->ibuf))) {
+                       if (h1_process(h1c) == -1)
+                               goto end;
+               }
+               else
+                       h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+               h1_set_idle_expiration(h1c);
+               h1_refresh_timeout(h1c);
+       }
+       else
+               h1s_finish_detach(h1s);
 
   end:
        TRACE_LEAVE(H1_EV_STRM_END);