]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
BUG/MEDIUM: mux-fcgi: Properly handle read0 on partial records
authorChristopher Faulet <cfaulet@haproxy.com>
Mon, 27 Jan 2025 14:18:14 +0000 (15:18 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Mon, 3 Feb 2025 06:49:50 +0000 (07:49 +0100)
A Read0 event could be ignored by the FCGI multiplexer if it is blocked on a
partial record. Instead of handling the event, it remained blocked, waiting
for the end of the record.

To fix the issue, the same solution than the H2 multiplexer is used. Two
flags are introduced. The first one, FCGI_CF_END_REACHED, is used to
acknowledge a read0. This flag is set when a read0 was received AND the FCGI
multiplexer must handle it. The second one, FCGI_CF_DEM_SHORT_READ, is set
when the demux is interrupted on a partial record. A short read and a read0
lead to set the FCGI_CF_END_REACHED flag.

With these changes, the FCGI mux should be able to properly handle read0 on
partial records.

This patch should be backported to all stable versions after a period of
observation.

include/haproxy/mux_fcgi-t.h
src/mux_fcgi.c

index 27973dbf41cdb83637cef6e1025414a975627681..a1c66205b540784d6910e7ac68faf9ec6d08ebd6 100644 (file)
@@ -58,6 +58,9 @@
 #define FCGI_CF_ERR_PENDING     0x00008000  /* A write error was detected (block sends but not reads) */
 #define FCGI_CF_ERROR           0x00010000  /* A read error was detected (handled has an abort) */
 
+#define FCGI_CF_DEM_SHORT_READ  0x00020000  /* demux blocked on incomplete frame */
+#define FCGI_CF_END_REACHED     0x00040000  /* pending data too short with RCVD_SHUT present */
+
 
 /* This function is used to report flags in debugging tools. Please reflect
  * below any single-bit flag addition above in the same order via the
index 4adf658b55651e98002fbf5655adb8a6f2968578..85d30efc7613899adc588ad236d370780fe5f199 100644 (file)
@@ -437,41 +437,57 @@ static void fcgi_trace(enum trace_level level, uint64_t mask, const struct trace
 /* Indicates whether or not the we may call the fcgi_recv() function to attempt
  * to receive data into the buffer and/or demux pending data. The condition is
  * a bit complex due to some API limits for now. The rules are the following :
- *   - if an error or a shutdown was detected on the connection and the buffer
- *     is empty, we must not attempt to receive
+ *   - if an error or a shutdown was detected on the connection,
+       we must not attempt to receive
+ *   - if we're subscribed for receving, no need to try again
  *   - if the demux buf failed to be allocated, we must not try to receive and
- *     we know there is nothing pending
- *   - if no flag indicates a blocking condition, we may attempt to receive,
- *     regardless of whether the demux buffer is full or not, so that only
- *     de demux part decides whether or not to block. This is needed because
- *     the connection API indeed prevents us from re-enabling receipt that is
- *     already enabled in a polled state, so we must always immediately stop
- *     as soon as the demux can't proceed so as never to hit an end of read
- *     with data pending in the buffers.
- *   - otherwise must may not attempt
+ *     we know there is nothing pending (we'll be woken up once allocated)
+ *   - if the demux buf is full, we will not be able to receive.
+ *   - otherwise we may attempt to receive
  */
 static inline int fcgi_recv_allowed(const struct fcgi_conn *fconn)
 {
-       if (fconn->flags & (FCGI_CF_EOS|FCGI_CF_ERROR))
+       if (fconn->flags & (FCGI_CF_EOS|FCGI_CF_ERROR) || fconn->state == FCGI_CS_CLOSED)
                return 0;
 
-       if (b_data(&fconn->dbuf) == 0 && fconn->state == FCGI_CS_CLOSED)
+       if ((fconn->wait_event.events & SUB_RETRY_RECV))
                return 0;
 
-       if (!(fconn->flags & FCGI_CF_DEM_DALLOC) &&
-           !(fconn->flags & FCGI_CF_DEM_BLOCK_ANY))
-               return 1;
+       if (!(fconn->flags & (FCGI_CF_DEM_DALLOC | FCGI_CF_DEM_DFULL)))
+           return 1;
 
        return 0;
 }
 
-/* Restarts reading on the connection if it was not enabled */
+/* Indicates whether it's worth waking up the I/O handler to restart demuxing.
+ * Its conditions are the following:
+ *   - if the buffer is empty and the connection is closed, there's nothing
+ *     to demux
+ *   - if a short read was reported, no need to try demuxing again
+ *   - if some blocking conditions remain, no need to try again
+ *   - otherwise it's safe to try demuxing again
+ */
+static inline int fcgi_may_demux(const struct fcgi_conn *fconn)
+{
+       if (fconn->state == FCGI_CS_CLOSED && !b_data(&fconn->dbuf))
+               return 0;
+
+       if (fconn->flags & FCGI_CF_DEM_SHORT_READ)
+               return 0;
+
+       if (fconn->flags & FCGI_CF_DEM_BLOCK_ANY)
+               return 0;
+
+       return 1;
+}
+
+
+/* restarts reading/processing on the connection if we can receive or demux
+ * (both are called from the same tasklet).
+ */
 static inline void fcgi_conn_restart_reading(const struct fcgi_conn *fconn, int consider_buffer)
 {
-       if (!fcgi_recv_allowed(fconn))
-               return;
-       if ((!consider_buffer || !b_data(&fconn->dbuf)) &&
-           (fconn->wait_event.events & SUB_RETRY_RECV))
+       if (!fcgi_recv_allowed(fconn) && !fcgi_may_demux(fconn))
                return;
        tasklet_wakeup(fconn->wait_event.tasklet);
 }
@@ -782,15 +798,15 @@ static void fcgi_release(struct fcgi_conn *fconn)
        }
 }
 
-/* Detect a pending read0 for a FCGI connection. It happens if a read0 is
- * pending on the connection AND if there is no more data in the demux
- * buffer. The function returns 1 to report a read0 or 0 otherwise.
+/* Detect a pending read0 for a FCGI connection. It happens if a read0 was
+ * already reported on a previous xprt->rcvbuf() AND a record parser failed
+ * to parse pending data, confirming no more progress is possible because
+ * we're facing a truncated frame. The function returns 1 to report a read0
+ * or 0 otherwise.
  */
 static int fcgi_conn_read0_pending(struct fcgi_conn *fconn)
 {
-       if ((fconn->flags & FCGI_CF_EOS) && !b_data(&fconn->dbuf))
-               return 1;
-       return 0;
+       return !!(fconn->flags & FCGI_CF_END_REACHED);
 }
 
 
@@ -1531,6 +1547,7 @@ static int fcgi_conn_handle_values_result(struct fcgi_conn *fconn)
 
        /* process full record only */
        if (b_data(dbuf) < (fconn->drl + fconn->drp)) {
+               fconn->flags |= FCGI_CF_DEM_SHORT_READ;
                TRACE_DEVEL("leaving on missing data", FCGI_EV_RX_RECORD|FCGI_EV_RX_GETVAL, fconn->conn);
                return 0;
        }
@@ -2256,10 +2273,10 @@ static int fcgi_strm_handle_stdout(struct fcgi_conn *fconn, struct fcgi_strm *fs
        if (fconn->state == FCGI_CS_RECORD_P)
                goto end_transfer;
 
-       if (b_data(dbuf) < (fconn->drl + fconn->drp) &&
-           b_size(dbuf) > (fconn->drl + fconn->drp) &&
-           buf_room_for_htx_data(dbuf))
+       if (b_data(dbuf) < (fconn->drl + fconn->drp) && !b_full(dbuf)) {
+               fconn->flags |= FCGI_CF_DEM_SHORT_READ;
                goto fail; // incomplete record
+       }
 
        if (!fcgi_get_buf(fconn, &fstrm->rxbuf)) {
                fconn->flags |= FCGI_CF_DEM_SALLOC;
@@ -2319,23 +2336,31 @@ static int fcgi_strm_handle_empty_stdout(struct fcgi_conn *fconn, struct fcgi_st
 
        TRACE_ENTER(FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
 
+       if (b_data(&fconn->dbuf) < (fconn->drl + fconn->drp) && !b_full(&fconn->dbuf)) {
+               fconn->flags |= FCGI_CF_DEM_SHORT_READ;
+               goto fail; // incomplete record
+       }
+
        fconn->state = FCGI_CS_RECORD_P;
        TRACE_STATE("switching to RECORD_P", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
+
        fconn->drl += fconn->drp;
        fconn->drp = 0;
        ret = MIN(b_data(&fconn->dbuf), fconn->drl);
        b_del(&fconn->dbuf, ret);
        fconn->drl -= ret;
-       if (fconn->drl) {
-               TRACE_DEVEL("leaving on missing data or error", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
-               return 0;
-       }
+       if (fconn->drl)
+               goto fail;
        fconn->state = FCGI_CS_RECORD_H;
        fstrm->flags |= FCGI_SF_ES_RCVD;
        TRACE_PROTO("FCGI STDOUT record rcvd", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm, 0, (size_t[]){0});
        TRACE_STATE("stdout data fully send, switching to RECORD_H", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR|FCGI_EV_RX_EOI, fconn->conn, fstrm);
        TRACE_LEAVE(FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
        return 1;
+
+  fail:
+       TRACE_DEVEL("leaving on missing data or error", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
+       return 0;
 }
 
 /* Processes a STDERR record. Returns > 0 on success, 0 if it couldn't do
@@ -2354,13 +2379,13 @@ static int fcgi_strm_handle_stderr(struct fcgi_conn *fconn, struct fcgi_strm *fs
        if (fconn->state == FCGI_CS_RECORD_P || !fconn->drl)
                goto end_transfer;
 
-       if (b_data(dbuf) < (fconn->drl + fconn->drp) &&
-           b_size(dbuf) > (fconn->drl + fconn->drp) &&
-           buf_room_for_htx_data(dbuf))
+       if (b_data(dbuf) < (fconn->drl + fconn->drp) && !b_full(dbuf)) {
+               fconn->flags |= FCGI_CF_DEM_SHORT_READ;
                goto fail; // incomplete record
+       }
 
        chunk_reset(&trash);
-       ret = b_force_xfer(&trash, dbuf, MIN(b_room(&trash), fconn->drl));
+       ret = b_force_xfer(&trash, dbuf, MIN(b_room(&trash) - 2, fconn->drl));
        if (!ret)
                goto fail;
        fconn->drl -= ret;
@@ -2415,6 +2440,7 @@ static int fcgi_strm_handle_end_request(struct fcgi_conn *fconn, struct fcgi_str
 
        /* process full record only */
        if (b_data(dbuf) < (fconn->drl + fconn->drp)) {
+               fconn->flags |= FCGI_CF_DEM_SHORT_READ;
                TRACE_DEVEL("leaving on missing data", FCGI_EV_RX_RECORD|FCGI_EV_RX_ENDREQ, fconn->conn);
                return 0;
        }
@@ -2476,6 +2502,7 @@ static void fcgi_process_demux(struct fcgi_conn *fconn)
                        TRACE_STATE("receiving FCGI record header", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn);
                        ret = fcgi_decode_record_hdr(&fconn->dbuf, 0, &hdr);
                        if (!ret) {
+                               fconn->flags |= FCGI_CF_DEM_SHORT_READ;
                                TRACE_ERROR("header record decoding failure", FCGI_EV_RX_RECORD|FCGI_EV_RX_ENDREQ|FCGI_EV_FSTRM_ERR, fconn->conn, fstrm);
                                goto fail;
                        }
@@ -2493,8 +2520,13 @@ static void fcgi_process_demux(struct fcgi_conn *fconn)
 
        /* process as many incoming records as possible below */
        while (1) {
+               /* Make sure to clear DFULL if contents were deleted */
+               if (!b_full(&fconn->dbuf))
+                       fconn->flags &= ~FCGI_CF_DEM_DFULL;
+
                if (!b_data(&fconn->dbuf)) {
                        TRACE_DEVEL("no more Rx data", FCGI_EV_RX_RECORD, fconn->conn);
+                       fconn->flags |= FCGI_CF_DEM_SHORT_READ;
                        break;
                }
 
@@ -2506,8 +2538,10 @@ static void fcgi_process_demux(struct fcgi_conn *fconn)
                if (fconn->state == FCGI_CS_RECORD_H) {
                        TRACE_PROTO("receiving FCGI record header", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn);
                        ret = fcgi_decode_record_hdr(&fconn->dbuf, 0, &hdr);
-                       if (!ret)
+                       if (!ret) {
+                               fconn->flags |= FCGI_CF_DEM_SHORT_READ;
                                break;
+                       }
                        b_del(&fconn->dbuf, ret);
 
                  new_record:
@@ -2519,6 +2553,10 @@ static void fcgi_process_demux(struct fcgi_conn *fconn)
                        TRACE_STATE("FCGI record header rcvd, switching to RECORD_D", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn);
                }
 
+               /* Make sure to clear DFULL if contents were deleted */
+               if (!b_full(&fconn->dbuf))
+                       fconn->flags &= ~FCGI_CF_DEM_DFULL;
+
                /* Only FCGI_CS_RECORD_D or FCGI_CS_RECORD_P */
                tmp_fstrm = fcgi_conn_st_by_id(fconn, fconn->dsi);
 
@@ -2578,6 +2616,12 @@ static void fcgi_process_demux(struct fcgi_conn *fconn)
                                 * larger than the buffer so we drain all of
                                 * their contents until we reach the end.
                                 */
+                               if (b_data(&fconn->dbuf) < (fconn->drl + fconn->drp) && !b_full(&fconn->dbuf)) {
+                                       fconn->flags |= FCGI_CF_DEM_SHORT_READ;
+                                       ret = 0;
+                                       break;
+                               }
+
                                fconn->state = FCGI_CS_RECORD_P;
                                fconn->drl += fconn->drp;
                                fconn->drp = 0;
@@ -2602,6 +2646,15 @@ static void fcgi_process_demux(struct fcgi_conn *fconn)
        }
 
  fail:
+       if (fconn->state == FCGI_CS_CLOSED || (fconn->flags & FCGI_CF_DEM_SHORT_READ)) {
+               if (fconn->flags & FCGI_CF_EOS)
+                       fconn->flags |= FCGI_CF_END_REACHED;
+       }
+
+       /* Make sure to clear DFULL if contents were deleted */
+       if (!b_full(&fconn->dbuf))
+               fconn->flags &= ~FCGI_CF_DEM_DFULL;
+
        /* we can go here on missing data, blocked response or error */
        if (fstrm && fcgi_strm_sc(fstrm) &&
            (b_data(&fstrm->rxbuf) ||
@@ -2745,14 +2798,17 @@ static int fcgi_recv(struct fcgi_conn *fconn)
                TRACE_DATA("failed to receive data, subscribing", FCGI_EV_FCONN_RECV, conn);
                conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &fconn->wait_event);
        }
-       else
+       else if (ret) {
+               fconn->flags &= ~FCGI_CF_DEM_SHORT_READ;
                TRACE_DATA("recv data", FCGI_EV_FCONN_RECV, conn, 0, 0, (size_t[]){ret});
+       }
 
        if (conn_xprt_read0_pending(conn)) {
                TRACE_DATA("received read0", FCGI_EV_FCONN_RECV, conn);
                fconn->flags |= FCGI_CF_EOS;
        }
-       if (conn->flags & CO_FL_ERROR) {
+       if (conn->flags & CO_FL_ERROR &&
+           (!b_data(&fconn->dbuf) || (fconn->flags & FCGI_CF_DEM_SHORT_READ))) {
                TRACE_DATA("connection error", FCGI_EV_FCONN_RECV, conn);
                fconn->flags |= FCGI_CF_ERROR;
        }
@@ -2999,14 +3055,30 @@ static int fcgi_process(struct fcgi_conn *fconn)
 
        TRACE_POINT(FCGI_EV_FCONN_WAKE, conn);
 
-       if (b_data(&fconn->dbuf) && !(fconn->flags & FCGI_CF_DEM_BLOCK_ANY)) {
-               fcgi_process_demux(fconn);
+       if (!(fconn->flags & FCGI_CF_DEM_BLOCK_ANY) &&
+           (b_data(&fconn->dbuf) || (fconn->flags & FCGI_CF_EOS))) {
+               do {
+                       fcgi_process_demux(fconn);
+
+                       /* hint: if we ended up aligned on a record, we've very
+                        * likely reached the end, no point trying again.
+                        */
+                       if (fconn->state == FCGI_CS_RECORD_H)
+                               break;
+
+                       if (!fcgi_recv_allowed(fconn))
+                               break;
+
+                       /* OK, it's worth trying to grab a few more records */
+                       fcgi_recv(fconn);
+
+               } while ((b_data(&fconn->dbuf) && fcgi_may_demux(fconn)) || (fconn->flags & FCGI_CF_EOS));
+
+               /* now's time to wake the task up */
+               fcgi_conn_restart_reading(fconn, 0);
 
                if (fconn->state == FCGI_CS_CLOSED || (fconn->flags & FCGI_CF_ERROR))
                        b_reset(&fconn->dbuf);
-
-               if (buf_room_for_htx_data(&fconn->dbuf))
-                       fconn->flags &= ~FCGI_CF_DEM_DFULL;
        }
        fcgi_send(fconn);