#include <haproxy/istbuf.h>
#include <haproxy/log.h>
#include <haproxy/mux_h1-t.h>
-#include <haproxy/pipe-t.h>
+#include <haproxy/pipe.h>
#include <haproxy/proxy.h>
#include <haproxy/session-t.h>
#include <haproxy/stats.h>
#include <haproxy/stconn.h>
#include <haproxy/stream.h>
#include <haproxy/trace.h>
+#include <haproxy/xref.h>
/* H1 connection descriptor */
struct h1c {
}
/* Here h1s_sc(h1s) is always defined */
- if ((!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) &&
+ if (!(h1c->flags & H1C_F_CANT_FASTFWD) &&
+ (!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) &&
(h1m->state == H1_MSG_DATA || h1m->state == H1_MSG_TUNNEL)) {
TRACE_STATE("notify the mux can use fast-forward", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
se_fl_set(h1s->sd, SE_FL_MAY_FASTFWD);
h1c->flags &= ~H1C_F_WANT_FASTFWD;
}
- se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
- h1c->flags &= ~H1C_F_WANT_FASTFWD;
-
/* Set EOI on stream connector in DONE state iff:
* - it is a response
* - it is a request but no a protocol upgrade nor a CONNECT
return total;
}
+static inline struct sedesc *h1s_opposite_sd(struct h1s *h1s)
+{
+ struct xref *peer;
+ struct sedesc *sdo;
+
+ peer = xref_get_peer_and_lock(&h1s->sd->xref);
+ if (!peer)
+ return NULL;
+
+ sdo = container_of(peer, struct sedesc, xref);
+ xref_unlock(&h1s->sd->xref, peer);
+ return sdo;
+}
+
+static size_t h1_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
+{
+ struct h1s *h1s = __sc_mux_strm(sc);
+ struct h1c *h1c = h1s->h1c;
+ struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
+ size_t ret = 0;
+
+ TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){count});
+
+ /* TODO: add check on curr_len if CLEN */
+
+ if (h1m->flags & H1_MF_CHNK) {
+ if (h1m->curr_len) {
+ BUG_ON(h1m->state != H1_MSG_DATA);
+ if (count > h1m->curr_len)
+ count = h1m->curr_len;
+ }
+ else {
+ BUG_ON(h1m->state != H1_MSG_CHUNK_CRLF && h1m->state != H1_MSG_CHUNK_SIZE);
+ if (!h1_make_chunk(h1s, h1m, count))
+ goto out;
+ h1m->curr_len = count;
+ }
+ }
+
+ /* Use kernel splicing if it is supported by the sender and if there
+ * are no input data _AND_ no output data.
+ *
+ * TODO: It may be good to add a flag to send obuf data first if any,
+ * and then data in pipe, or the opposite. For now, it is not
+ * supported to mix data.
+ */
+ if (!b_data(input) && !b_data(&h1c->obuf) && may_splice) {
+ if (h1c->conn->xprt->snd_pipe && (h1s->sd->iobuf.pipe || (pipes_used < global.maxpipes && (h1s->sd->iobuf.pipe = get_pipe())))) {
+ h1s->sd->iobuf.offset = 0;
+ h1s->sd->iobuf.data = 0;
+ ret = count;
+ goto out;
+ }
+ h1s->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
+ TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", H1_EV_STRM_SEND, h1c->conn, h1s);
+ }
+
+ if (!h1_get_buf(h1c, &h1c->obuf)) {
+ h1c->flags |= H1C_F_OUT_ALLOC;
+ TRACE_STATE("waiting for opposite h1c obuf allocation", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s);
+ goto out;
+ }
+
+ if (b_space_wraps(&h1c->obuf))
+ b_slow_realign(&h1c->obuf, trash.area, b_data(&h1c->obuf));
+
+ h1s->sd->iobuf.buf = &h1c->obuf;
+ h1s->sd->iobuf.offset = 0;
+ h1s->sd->iobuf.data = 0;
+
+ /* Cannot forward more than available room in output buffer */
+ if (count > b_room(&h1c->obuf))
+ count = b_room(&h1c->obuf);
+
+ if (!count) {
+ h1c->flags |= H1C_F_OUT_FULL;
+ h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+ TRACE_STATE("output buffer full", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s);
+ goto out;
+ }
+
+ /* forward remaining input data */
+ if (b_data(input)) {
+ size_t xfer = count;
+
+ if (xfer > b_data(input))
+ xfer = b_data(input);
+ h1s->sd->iobuf.data = b_xfer(&h1c->obuf, input, xfer);
+
+ /* Cannot forward more data, wait for room */
+ if (b_data(input))
+ goto out;
+ }
+
+ ret = count - h1s->sd->iobuf.data;
+
+ out:
+ TRACE_LEAVE(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){ret});
+ return ret;
+}
+
+static void h1_done_ff(struct stconn *sc)
+{
+ struct h1s *h1s = __sc_mux_strm(sc);
+ struct h1c *h1c = h1s->h1c;
+ struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
+ struct sedesc *sd = h1s->sd;
+ size_t total = 0;
+
+ TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s);
+
+
+ if (sd->iobuf.pipe) {
+ total = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
+ if (total > 0)
+ HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, total);
+ if (!sd->iobuf.pipe->data) {
+ put_pipe(sd->iobuf.pipe);
+ sd->iobuf.pipe = NULL;
+ }
+ }
+ else {
+ if (b_room(&h1c->obuf) == sd->iobuf.offset)
+ h1c->flags |= H1C_F_OUT_FULL;
+
+ total = sd->iobuf.data;
+ sd->iobuf.buf = NULL;
+ sd->iobuf.offset = 0;
+ sd->iobuf.data = 0;
+
+ /* Perform a synchronous send but in all cases, consider
+ * everything was already sent from the SC point of view.
+ */
+ h1_send(h1c);
+ }
+
+ if (h1m->curr_len)
+ h1m->curr_len -= total;
+
+ if (!h1m->curr_len && (h1m->flags & H1_MF_CLEN))
+ h1m->state = H1_MSG_DONE;
+ else if (!h1m->curr_len && (h1m->flags & H1_MF_CHNK)) {
+ if (h1m->state == H1_MSG_DATA)
+ h1m->state = H1_MSG_CHUNK_CRLF;
+ }
+
+ HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, total);
+
+ out:
+ // TODO: should we call h1_process() instead ?
+ if (h1c->conn->flags & CO_FL_ERROR) {
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING;
+ if (h1c->flags & H1C_F_EOS)
+ h1c->flags |= H1C_F_ERROR;
+ else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
+ /* EOS not seen, so subscribe for reads to be able to
+ * catch the error on the reading path. It is especially
+ * important if EOI was reached.
+ */
+ h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+ }
+ se_fl_set_error(h1s->sd);
+ TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+ }
+
+ TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){total});
+}
+
+static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
+{
+ struct h1s *h1s = __sc_mux_strm(sc);
+ struct h1c *h1c = h1s->h1c;
+ struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
+ struct sedesc *sdo = NULL;
+ size_t total = 0, try = 0;
+ int ret = 0;
+
+ TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count});
+
+ if (h1m->state != H1_MSG_DATA && h1m->state != H1_MSG_TUNNEL) {
+ h1c->flags &= ~H1C_F_WANT_FASTFWD;
+ TRACE_STATE("Cannot fast-forwad data now !(msg_data|msg_tunnel)", H1_EV_STRM_RECV, h1c->conn, h1s);
+ goto end;
+ }
+
+ se_fl_clr(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+ h1c->conn->flags &= ~CO_FL_WAIT_ROOM;
+ h1c->flags |= H1C_F_WANT_FASTFWD;
+
+ if (h1c->flags & (H1C_F_EOS|H1C_F_ERROR)) {
+ h1c->flags &= ~H1C_F_WANT_FASTFWD;
+ TRACE_DEVEL("leaving on (EOS|ERROR)", H1_EV_STRM_RECV, h1c->conn, h1s);
+ goto end;
+ }
+
+ sdo = h1s_opposite_sd(h1s);
+ if (!sdo) {
+ TRACE_STATE("Opposite endpoint not available yet", H1_EV_STRM_RECV, h1c->conn, h1s);
+ goto out;
+ }
+
+ if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) && count > h1m->curr_len)
+ count = h1m->curr_len;
+
+ try = se_init_ff(sdo, &h1c->ibuf, count, h1c->conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING));
+ if (b_room(&h1c->ibuf) && (h1c->flags & H1C_F_IN_FULL)) {
+ h1c->flags &= ~H1C_F_IN_FULL;
+ TRACE_STATE("h1c ibuf not full anymore", H1_EV_STRM_RECV|H1_EV_H1C_BLK);
+ }
+
+ if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
+ /* Fast forwading is not supported by the consumer */
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD;
+ TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", H1_EV_STRM_RECV, h1c->conn, h1s);
+ goto end;
+ }
+ if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
+ se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+ TRACE_STATE("waiting for more room", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s);
+ goto out;
+ }
+
+ total += sdo->iobuf.data;
+ if (sdo->iobuf.pipe) {
+ /* Here, not data was xferred */
+ ret = h1c->conn->xprt->rcv_pipe(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.pipe, try);
+ if (ret < 0) {
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD;
+ TRACE_ERROR("Error when trying to fast-forward data, disable it and abort",
+ H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+ BUG_ON(sdo->iobuf.pipe->data);
+ put_pipe(sdo->iobuf.pipe);
+ sdo->iobuf.pipe = NULL;
+ goto end;
+ }
+ total += ret;
+ if (!ret) {
+ TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn);
+ h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+ }
+ HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_in, ret);
+ }
+ else {
+ b_add(sdo->iobuf.buf, sdo->iobuf.offset);
+ ret = h1c->conn->xprt->rcv_buf(h1c->conn, h1c->conn->xprt_ctx, sdo->iobuf.buf, try, flags);
+ if (ret < try) {
+ TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn);
+ h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+ }
+ b_sub(sdo->iobuf.buf, sdo->iobuf.offset);
+ total += ret;
+ sdo->iobuf.data += ret;
+ }
+
+ if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN))) {
+ if (total > h1m->curr_len) {
+ h1s->flags |= H1S_F_PARSING_ERROR;
+ se_fl_set(h1s->sd, SE_FL_ERROR);
+ TRACE_ERROR("too much payload, more than announced",
+ H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+ goto out;
+ }
+ h1m->curr_len -= total;
+ if (!h1m->curr_len) {
+ if (h1m->flags & H1_MF_CLEN) {
+ h1m->state = H1_MSG_DONE;
+ se_fl_set(h1s->sd, SE_FL_EOI); /* TODO: this line is tricky and must be evaluated first
+ * Its purpose is to avoid to set CO_SFL_MSG_MORE on the
+ * next calls to ->complete_fastfwd().
+ */
+ }
+ else
+ h1m->state = H1_MSG_CHUNK_CRLF;
+ h1c->flags &= ~H1C_F_WANT_FASTFWD;
+
+ if (!(h1c->flags & H1C_F_IS_BACK)) {
+ /* The request was fully received. It means the H1S now
+ * expect data from the opposite side
+ */
+ se_expect_data(h1s->sd);
+ }
+
+ TRACE_STATE("payload fully received", H1_EV_STRM_RECV, h1c->conn, h1s);
+ }
+ }
+
+ HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, total);
+ ret = total;
+ se_done_ff(sdo);
+
+ if (sdo->iobuf.pipe) {
+ se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+ }
+
+ TRACE_DEVEL("Data fast-forwarded", H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
+
+ out:
+ if (conn_xprt_read0_pending(h1c->conn)) {
+ se_fl_set(h1s->sd, SE_FL_EOS);
+ TRACE_STATE("report EOS to SE", H1_EV_STRM_RECV, h1c->conn, h1s);
+ if (h1m->state >= H1_MSG_DONE || !(h1m->flags & H1_MF_XFER_LEN)) {
+ /* DONE or TUNNEL or SHUTR without XFER_LEN, set
+ * EOI on the stream connector */
+ se_fl_set(h1s->sd, SE_FL_EOI);
+ TRACE_STATE("report EOI to SE", H1_EV_STRM_RECV, h1c->conn, h1s);
+ }
+ else {
+ se_fl_set(h1s->sd, SE_FL_ERROR);
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR;
+ TRACE_ERROR("message aborted, set error on SC", H1_EV_STRM_RECV|H1_EV_H1S_ERR, h1c->conn, h1s);
+ }
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_EOS;
+ TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, h1c->conn, h1s);
+ }
+ if (h1c->conn->flags & CO_FL_ERROR) {
+ se_fl_set(h1s->sd, SE_FL_ERROR);
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERROR;
+ TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+ }
+
+ end:
+ if (!(h1c->flags & H1C_F_WANT_FASTFWD)) {
+ TRACE_STATE("notify the mux can't use fast-forward anymore", H1_EV_STRM_RECV, h1c->conn, h1s);
+ se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD);
+ if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
+ TRACE_STATE("restart receiving data, subscribing", H1_EV_STRM_RECV, h1c->conn, h1s);
+ h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+ }
+ }
+
+ TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
+ return ret;
+}
+
+static int h1_resume_fastfwd(struct stconn *sc, unsigned int flags)
+{
+ struct h1s *h1s = __sc_mux_strm(sc);
+ struct h1c *h1c = h1s->h1c;
+ struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
+ struct sedesc *sd = h1s->sd;
+ int ret = 0;
+
+ TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){flags});
+
+ if (sd->iobuf.pipe) {
+ ret = h1c->conn->xprt->snd_pipe(h1c->conn, h1c->conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
+ if (ret > 0)
+ HA_ATOMIC_ADD(&h1c->px_counters->spliced_bytes_out, ret);
+
+ if (!sd->iobuf.pipe->data) {
+ put_pipe(sd->iobuf.pipe);
+ sd->iobuf.pipe = NULL;
+ }
+ }
+
+ h1m->curr_len -= ret;
+
+ if (!h1m->curr_len && (h1m->flags & H1_MF_CLEN))
+ h1m->state = H1_MSG_DONE;
+ else if (!h1m->curr_len && (h1m->flags & H1_MF_CHNK)) {
+ if (h1m->state == H1_MSG_DATA)
+ h1m->state = H1_MSG_CHUNK_CRLF;
+ }
+
+ HA_ATOMIC_ADD(&h1c->px_counters->bytes_out, ret);
+
+ out:
+ // TODO: should we call h1_process() instead ?
+ if (h1c->conn->flags & CO_FL_ERROR) {
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_ERR_PENDING;
+ if (h1c->flags & H1C_F_EOS)
+ h1c->flags |= H1C_F_ERROR;
+ else if (!(h1c->wait_event.events & SUB_RETRY_RECV)) {
+ /* EOS not seen, so subscribe for reads to be able to
+ * catch the error on the reading path. It is especially
+ * important if EOI was reached.
+ */
+ h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+ }
+ se_fl_set_error(h1s->sd);
+ TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+ }
+
+ TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret});
+ return ret;
+}
+
static int h1_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output)
{
const struct h1c *h1c = conn->ctx;
.used_streams = h1_used_streams,
.rcv_buf = h1_rcv_buf,
.snd_buf = h1_snd_buf,
+ .init_fastfwd = h1_init_ff,
+ .done_fastfwd = h1_done_ff,
+ .fastfwd = h1_fastfwd,
+ .resume_fastfwd = h1_resume_fastfwd,
.subscribe = h1_subscribe,
.unsubscribe = h1_unsubscribe,
.shutr = h1_shutr,
.used_streams = h1_used_streams,
.rcv_buf = h1_rcv_buf,
.snd_buf = h1_snd_buf,
+ .init_fastfwd = h1_init_ff,
+ .done_fastfwd = h1_done_ff,
+ .fastfwd = h1_fastfwd,
+ .resume_fastfwd = h1_resume_fastfwd,
.subscribe = h1_subscribe,
.unsubscribe = h1_unsubscribe,
.shutr = h1_shutr,