/* Indicates whether or not the we may call the fcgi_recv() function to attempt
* to receive data into the buffer and/or demux pending data. The condition is
* a bit complex due to some API limits for now. The rules are the following :
- * - if an error or a shutdown was detected on the connection and the buffer
- * is empty, we must not attempt to receive
+ * - if an error or a shutdown was detected on the connection,
+ we must not attempt to receive
+ * - if we're subscribed for receving, no need to try again
* - if the demux buf failed to be allocated, we must not try to receive and
- * we know there is nothing pending
- * - if no flag indicates a blocking condition, we may attempt to receive,
- * regardless of whether the demux buffer is full or not, so that only
- * de demux part decides whether or not to block. This is needed because
- * the connection API indeed prevents us from re-enabling receipt that is
- * already enabled in a polled state, so we must always immediately stop
- * as soon as the demux can't proceed so as never to hit an end of read
- * with data pending in the buffers.
- * - otherwise must may not attempt
+ * we know there is nothing pending (we'll be woken up once allocated)
+ * - if the demux buf is full, we will not be able to receive.
+ * - otherwise we may attempt to receive
*/
static inline int fcgi_recv_allowed(const struct fcgi_conn *fconn)
{
- if (fconn->flags & (FCGI_CF_EOS|FCGI_CF_ERROR))
+ if (fconn->flags & (FCGI_CF_EOS|FCGI_CF_ERROR) || fconn->state == FCGI_CS_CLOSED)
return 0;
- if (b_data(&fconn->dbuf) == 0 && fconn->state == FCGI_CS_CLOSED)
+ if ((fconn->wait_event.events & SUB_RETRY_RECV))
return 0;
- if (!(fconn->flags & FCGI_CF_DEM_DALLOC) &&
- !(fconn->flags & FCGI_CF_DEM_BLOCK_ANY))
- return 1;
+ if (!(fconn->flags & (FCGI_CF_DEM_DALLOC | FCGI_CF_DEM_DFULL)))
+ return 1;
return 0;
}
-/* Restarts reading on the connection if it was not enabled */
+/* Indicates whether it's worth waking up the I/O handler to restart demuxing.
+ * Its conditions are the following:
+ * - if the buffer is empty and the connection is closed, there's nothing
+ * to demux
+ * - if a short read was reported, no need to try demuxing again
+ * - if some blocking conditions remain, no need to try again
+ * - otherwise it's safe to try demuxing again
+ */
+static inline int fcgi_may_demux(const struct fcgi_conn *fconn)
+{
+ if (fconn->state == FCGI_CS_CLOSED && !b_data(&fconn->dbuf))
+ return 0;
+
+ if (fconn->flags & FCGI_CF_DEM_SHORT_READ)
+ return 0;
+
+ if (fconn->flags & FCGI_CF_DEM_BLOCK_ANY)
+ return 0;
+
+ return 1;
+}
+
+
+/* restarts reading/processing on the connection if we can receive or demux
+ * (both are called from the same tasklet).
+ */
static inline void fcgi_conn_restart_reading(const struct fcgi_conn *fconn, int consider_buffer)
{
- if (!fcgi_recv_allowed(fconn))
- return;
- if ((!consider_buffer || !b_data(&fconn->dbuf)) &&
- (fconn->wait_event.events & SUB_RETRY_RECV))
+ if (!fcgi_recv_allowed(fconn) && !fcgi_may_demux(fconn))
return;
tasklet_wakeup(fconn->wait_event.tasklet);
}
}
}
-/* Detect a pending read0 for a FCGI connection. It happens if a read0 is
- * pending on the connection AND if there is no more data in the demux
- * buffer. The function returns 1 to report a read0 or 0 otherwise.
+/* Detect a pending read0 for a FCGI connection. It happens if a read0 was
+ * already reported on a previous xprt->rcvbuf() AND a record parser failed
+ * to parse pending data, confirming no more progress is possible because
+ * we're facing a truncated frame. The function returns 1 to report a read0
+ * or 0 otherwise.
*/
static int fcgi_conn_read0_pending(struct fcgi_conn *fconn)
{
- if ((fconn->flags & FCGI_CF_EOS) && !b_data(&fconn->dbuf))
- return 1;
- return 0;
+ return !!(fconn->flags & FCGI_CF_END_REACHED);
}
/* process full record only */
if (b_data(dbuf) < (fconn->drl + fconn->drp)) {
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
TRACE_DEVEL("leaving on missing data", FCGI_EV_RX_RECORD|FCGI_EV_RX_GETVAL, fconn->conn);
return 0;
}
if (fconn->state == FCGI_CS_RECORD_P)
goto end_transfer;
- if (b_data(dbuf) < (fconn->drl + fconn->drp) &&
- b_size(dbuf) > (fconn->drl + fconn->drp) &&
- buf_room_for_htx_data(dbuf))
+ if (b_data(dbuf) < (fconn->drl + fconn->drp) && !b_full(dbuf)) {
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
goto fail; // incomplete record
+ }
if (!fcgi_get_buf(fconn, &fstrm->rxbuf)) {
fconn->flags |= FCGI_CF_DEM_SALLOC;
TRACE_ENTER(FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
+ if (b_data(&fconn->dbuf) < (fconn->drl + fconn->drp) && !b_full(&fconn->dbuf)) {
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
+ goto fail; // incomplete record
+ }
+
fconn->state = FCGI_CS_RECORD_P;
TRACE_STATE("switching to RECORD_P", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
+
fconn->drl += fconn->drp;
fconn->drp = 0;
ret = MIN(b_data(&fconn->dbuf), fconn->drl);
b_del(&fconn->dbuf, ret);
fconn->drl -= ret;
- if (fconn->drl) {
- TRACE_DEVEL("leaving on missing data or error", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
- return 0;
- }
+ if (fconn->drl)
+ goto fail;
fconn->state = FCGI_CS_RECORD_H;
fstrm->flags |= FCGI_SF_ES_RCVD;
TRACE_PROTO("FCGI STDOUT record rcvd", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm, 0, (size_t[]){0});
TRACE_STATE("stdout data fully send, switching to RECORD_H", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR|FCGI_EV_RX_EOI, fconn->conn, fstrm);
TRACE_LEAVE(FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
return 1;
+
+ fail:
+ TRACE_DEVEL("leaving on missing data or error", FCGI_EV_RX_RECORD|FCGI_EV_RX_STDOUT, fconn->conn, fstrm);
+ return 0;
}
/* Processes a STDERR record. Returns > 0 on success, 0 if it couldn't do
if (fconn->state == FCGI_CS_RECORD_P || !fconn->drl)
goto end_transfer;
- if (b_data(dbuf) < (fconn->drl + fconn->drp) &&
- b_size(dbuf) > (fconn->drl + fconn->drp) &&
- buf_room_for_htx_data(dbuf))
+ if (b_data(dbuf) < (fconn->drl + fconn->drp) && !b_full(dbuf)) {
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
goto fail; // incomplete record
+ }
chunk_reset(&trash);
- ret = b_force_xfer(&trash, dbuf, MIN(b_room(&trash), fconn->drl));
+ ret = b_force_xfer(&trash, dbuf, MIN(b_room(&trash) - 2, fconn->drl));
if (!ret)
goto fail;
fconn->drl -= ret;
/* process full record only */
if (b_data(dbuf) < (fconn->drl + fconn->drp)) {
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
TRACE_DEVEL("leaving on missing data", FCGI_EV_RX_RECORD|FCGI_EV_RX_ENDREQ, fconn->conn);
return 0;
}
TRACE_STATE("receiving FCGI record header", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn);
ret = fcgi_decode_record_hdr(&fconn->dbuf, 0, &hdr);
if (!ret) {
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
TRACE_ERROR("header record decoding failure", FCGI_EV_RX_RECORD|FCGI_EV_RX_ENDREQ|FCGI_EV_FSTRM_ERR, fconn->conn, fstrm);
goto fail;
}
/* process as many incoming records as possible below */
while (1) {
+ /* Make sure to clear DFULL if contents were deleted */
+ if (!b_full(&fconn->dbuf))
+ fconn->flags &= ~FCGI_CF_DEM_DFULL;
+
if (!b_data(&fconn->dbuf)) {
TRACE_DEVEL("no more Rx data", FCGI_EV_RX_RECORD, fconn->conn);
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
break;
}
if (fconn->state == FCGI_CS_RECORD_H) {
TRACE_PROTO("receiving FCGI record header", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn);
ret = fcgi_decode_record_hdr(&fconn->dbuf, 0, &hdr);
- if (!ret)
+ if (!ret) {
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
break;
+ }
b_del(&fconn->dbuf, ret);
new_record:
TRACE_STATE("FCGI record header rcvd, switching to RECORD_D", FCGI_EV_RX_RECORD|FCGI_EV_RX_FHDR, fconn->conn);
}
+ /* Make sure to clear DFULL if contents were deleted */
+ if (!b_full(&fconn->dbuf))
+ fconn->flags &= ~FCGI_CF_DEM_DFULL;
+
/* Only FCGI_CS_RECORD_D or FCGI_CS_RECORD_P */
tmp_fstrm = fcgi_conn_st_by_id(fconn, fconn->dsi);
* larger than the buffer so we drain all of
* their contents until we reach the end.
*/
+ if (b_data(&fconn->dbuf) < (fconn->drl + fconn->drp) && !b_full(&fconn->dbuf)) {
+ fconn->flags |= FCGI_CF_DEM_SHORT_READ;
+ ret = 0;
+ break;
+ }
+
fconn->state = FCGI_CS_RECORD_P;
fconn->drl += fconn->drp;
fconn->drp = 0;
}
fail:
+ if (fconn->state == FCGI_CS_CLOSED || (fconn->flags & FCGI_CF_DEM_SHORT_READ)) {
+ if (fconn->flags & FCGI_CF_EOS)
+ fconn->flags |= FCGI_CF_END_REACHED;
+ }
+
+ /* Make sure to clear DFULL if contents were deleted */
+ if (!b_full(&fconn->dbuf))
+ fconn->flags &= ~FCGI_CF_DEM_DFULL;
+
/* we can go here on missing data, blocked response or error */
if (fstrm && fcgi_strm_sc(fstrm) &&
(b_data(&fstrm->rxbuf) ||
TRACE_DATA("failed to receive data, subscribing", FCGI_EV_FCONN_RECV, conn);
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &fconn->wait_event);
}
- else
+ else if (ret) {
+ fconn->flags &= ~FCGI_CF_DEM_SHORT_READ;
TRACE_DATA("recv data", FCGI_EV_FCONN_RECV, conn, 0, 0, (size_t[]){ret});
+ }
if (conn_xprt_read0_pending(conn)) {
TRACE_DATA("received read0", FCGI_EV_FCONN_RECV, conn);
fconn->flags |= FCGI_CF_EOS;
}
- if (conn->flags & CO_FL_ERROR) {
+ if (conn->flags & CO_FL_ERROR &&
+ (!b_data(&fconn->dbuf) || (fconn->flags & FCGI_CF_DEM_SHORT_READ))) {
TRACE_DATA("connection error", FCGI_EV_FCONN_RECV, conn);
fconn->flags |= FCGI_CF_ERROR;
}
TRACE_POINT(FCGI_EV_FCONN_WAKE, conn);
- if (b_data(&fconn->dbuf) && !(fconn->flags & FCGI_CF_DEM_BLOCK_ANY)) {
- fcgi_process_demux(fconn);
+ if (!(fconn->flags & FCGI_CF_DEM_BLOCK_ANY) &&
+ (b_data(&fconn->dbuf) || (fconn->flags & FCGI_CF_EOS))) {
+ do {
+ fcgi_process_demux(fconn);
+
+ /* hint: if we ended up aligned on a record, we've very
+ * likely reached the end, no point trying again.
+ */
+ if (fconn->state == FCGI_CS_RECORD_H)
+ break;
+
+ if (!fcgi_recv_allowed(fconn))
+ break;
+
+ /* OK, it's worth trying to grab a few more records */
+ fcgi_recv(fconn);
+
+ } while ((b_data(&fconn->dbuf) && fcgi_may_demux(fconn)) || (fconn->flags & FCGI_CF_EOS));
+
+ /* now's time to wake the task up */
+ fcgi_conn_restart_reading(fconn, 0);
if (fconn->state == FCGI_CS_CLOSED || (fconn->flags & FCGI_CF_ERROR))
b_reset(&fconn->dbuf);
-
- if (buf_room_for_htx_data(&fconn->dbuf))
- fconn->flags &= ~FCGI_CF_DEM_DFULL;
}
fcgi_send(fconn);