#include <haproxy/hstream-t.h>
#include <haproxy/http_htx.h>
#include <haproxy/http.h>
+#include <haproxy/istbuf.h>
#include <haproxy/pool.h>
#include <haproxy/proxy-t.h>
#include <haproxy/sc_strm.h>
static char *random_resp;
static int random_resp_len = RESPSIZE;
+static size_t hstream_add_ff_data(struct hstream *hs, struct sedesc *sd, unsigned long long len);
static size_t hstream_add_htx_data(struct hstream *hs, struct htx *htx, unsigned long long len);
#define TRACE_SOURCE &trace_haterm
goto leave;
}
+static int hstream_ff_snd(struct connection *conn, struct hstream *hs)
+{
+ size_t len;
+ unsigned int nego_flags = NEGO_FF_FL_NONE;
+ struct sedesc *sd = hs->sc->sedesc;
+ int ret = 0;
+
+ /* First try to resume FF*/
+ if (se_have_ff_data(sd)) {
+ ret = CALL_MUX_WITH_RET(conn->mux, resume_fastfwd(hs->sc, 0));
+ if (ret > 0)
+ sd->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED;
+ }
+
+ nego_flags |= NEGO_FF_FL_EXACT_SIZE;
+ len = se_nego_ff(sd, &BUF_NULL, hs->to_write, nego_flags);
+ if (sd->iobuf.flags & IOBUF_FL_NO_FF) {
+ TRACE_DEVEL("Fast-forwarding not supported by endpoint, disable it", HS_EV_HSTRM_RESP, hs);
+ goto abort;
+ }
+ if (sd->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
+ TRACE_DEVEL("Fast-forwarding blocked", HS_EV_HSTRM_RESP, hs);
+ goto out;
+ }
+
+ hs->to_write -= hstream_add_ff_data(hs, sd, len);
+ if (!hs->to_write)
+ sd->iobuf.flags |= IOBUF_FL_EOI;
+
+ if (se_done_ff(sd) != 0 || !(sd->iobuf.flags & (IOBUF_FL_FF_BLOCKED|IOBUF_FL_FF_WANT_ROOM))) {
+ /* Something was forwarding or the consumer states it is not
+ * blocked anyore, don't reclaim more room */
+ }
+
+ if (se_have_ff_data(sd)) {
+ TRACE_DEVEL("data not fully sent, wait", HS_EV_HSTRM_SEND, hs);
+ conn->mux->subscribe(hs->sc, SUB_RETRY_SEND, &hs->sc->wait_event);
+ }
+ else if (hs->to_write) {
+ TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+ task_wakeup(hs->task, TASK_WOKEN_IO);
+ }
+ out:
+ return ret;
+
+ abort:
+ return -1;
+}
+
/* Send HTX data prepared for <hs> haterm stream from <conn> connection */
static int hstream_htx_buf_snd(struct connection *conn, struct hstream *hs)
{
goto leave;
}
+static size_t hstream_add_ff_data(struct hstream *hs, struct sedesc *sd, unsigned long long len)
+{
+ size_t ret;
+ char *data_ptr;
+ unsigned int offset;
+ char *buffer;
+ size_t buffer_len;
+ int modulo;
+
+ TRACE_ENTER(HS_EV_HSTRM_ADD_DATA, hs);
+ b_add(sd->iobuf.buf, sd->iobuf.offset);
+
+ if (hs->req_random) {
+ buffer = random_resp;
+ buffer_len = random_resp_len;
+ modulo = random_resp_len;
+ }
+ else {
+ buffer = common_response;
+ buffer_len = sizeof(common_response);
+ modulo = HS_COMMON_RESPONSE_LINE_SZ;
+ }
+
+ offset = (hs->req_size - len) % modulo;
+ data_ptr = buffer + offset;
+ if (len > (unsigned long long)(buffer_len - offset))
+ len = (unsigned long long)(buffer_len - offset);
+
+ ret = b_putist(sd->iobuf.buf, ist2(data_ptr, len));
+ if (!ret)
+ TRACE_STATE("unable to fast-forward payload", HS_EV_HSTRM_ADD_DATA, hs);
+
+ b_sub(sd->iobuf.buf, sd->iobuf.offset);
+ sd->iobuf.data += ret;
+ TRACE_LEAVE(HS_EV_HSTRM_ADD_DATA, hs);
+ return ret;
+}
+
/* Add data to HTX response buffer from pre-built responses */
static size_t hstream_add_htx_data(struct hstream *hs, struct htx *htx, unsigned long long len)
{
return ret;
}
+static inline int hstream_is_fastfwd_supported(struct hstream *hs)
+{
+ return (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) &&
+ sc_ep_test(hs->sc, SE_FL_MAY_FASTFWD_CONS) &&
+ !(hs->sc->sedesc->iobuf.flags & IOBUF_FL_NO_FF) &&
+ !hs->req_chunked && hs->to_write);
+}
+
/* haterm stream processing task */
static struct task *process_hstream(struct task *t, void *context, unsigned int state)
{
else {
struct buffer *buf;
struct htx *htx;
+ int ret = 0;
/* HTX RX part */
if (hstream_must_drain(hs)) {
if (!hstream_sl_hdrs_htx_buf_snd(hs, conn))
goto err;
- /* HTX TX part */
+ /* TX part */
+ if (hstream_is_fastfwd_supported(hs)) {
+ if (!htx_is_empty(htxbuf(&hs->res)))
+ goto flush_res_buf;
+ if (!hs->to_write && !se_have_ff_data(hs->sc->sedesc))
+ goto out;
+
+ ret = hstream_ff_snd(conn, hs);
+ if (ret >= 0)
+ goto send_done;
+ /* fallback to regular send */
+ }
+
if (!hs->to_write && htx_is_empty(htxbuf(&hs->res)))
goto out;
if (hs->to_write <= 0)
htx->flags |= HTX_FL_EOM;
htx_to_buf(htx, &hs->res);
+
+ flush_res_buf:
hstream_htx_buf_snd(conn, hs);
+ send_done:
if (hs->req_body && hs->req_after_res && !hs->to_write) {
/* Response sending has just complete. The body will be drained upon
* next wakeup.
}
out:
- if (!hs->to_write && !hs->req_body && htx_is_empty(htxbuf(&hs->res))) {
+ if (!hs->to_write && !hs->req_body && htx_is_empty(htxbuf(&hs->res)) && !se_have_ff_data(hs->sc->sedesc)) {
TRACE_DEVEL("shutting down stream", HS_EV_HSTRM_SEND, hs);
CALL_MUX_NO_RET(conn->mux, shut(hs->sc, SE_SHW_SILENT|SE_SHW_NORMAL, NULL));
}