From: Christopher Faulet Date: Thu, 3 Aug 2023 13:47:49 +0000 (+0200) Subject: MINOR: connection: Add new mux callbacks to perform data fast-forwarding X-Git-Tag: v2.9-dev8~39 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=242c6f0dedc64defc1de3fb3334f2a349db79321;p=thirdparty%2Fhaproxy.git MINOR: connection: Add new mux callbacks to perform data fast-forwarding To perform the mux-to-mux data fast-forwarding, 4 new callbacks were added into the mux_ops structure. 2 callbacks will be used from the stconn for fast-forward data. The 2 other callbacks will be used by the endpoint to request an iobuf to the opposite endpoint. * fastfwd() callback function is used by a producer to forward data * resume_fastfwd() callback function is used by a consumer if some data are blocked in the iobuf, to resume the data forwarding. * init_fastfwd() must be used by an endpoint (the producer one), inside the fastfwd() callback to request an iobuf to the opposite side (the consumer one). * done_fastfwd() must be used by an endpoint (the producer one) at the end of fastfwd() to notify the opposite endpoint (the consumer one) if data were forwarded or not. This API is still under development, so it may evolved. Especially when the fast-forward will be extended to applets. 2 helper functions were also added into the SE api to wrap init_fastfwd() and done_fastfwd() callback function of the underlying endpoint. For now, this API is unsed and not implemented at all in muxes. --- diff --git a/include/haproxy/connection-t.h b/include/haproxy/connection-t.h index 3bee5fd43a..ce6bf1177e 100644 --- a/include/haproxy/connection-t.h +++ b/include/haproxy/connection-t.h @@ -274,6 +274,7 @@ enum { CO_RFL_READ_ONCE = 0x0004, /* don't loop even if the request/response is small */ CO_RFL_KEEP_RECV = 0x0008, /* Instruct the mux to still wait for read events */ CO_RFL_BUF_NOT_STUCK = 0x0010, /* Buffer is not stuck. Optims are possible during data copy */ + CO_RFL_MAY_SPLICE = 0x0020, /* The producer can use the kernel splicing */ }; /* flags that can be passed to xprt->snd_buf() and mux->snd_buf() */ @@ -422,6 +423,10 @@ struct mux_ops { int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */ size_t (*rcv_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */ size_t (*snd_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to send data */ + size_t (*init_fastfwd)(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice); /* Callback to fill the SD iobuf */ + void (*done_fastfwd)(struct stconn *sc); /* Callback to terminate fast data forwarding */ + int (*fastfwd)(struct stconn *sc, unsigned int count, unsigned int flags); /* Callback to init fast data forwarding */ + int (*resume_fastfwd)(struct stconn *sc, unsigned int flags); /* Callback to resume fast data forwarding */ int (*rcv_pipe)(struct stconn *sc, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */ int (*snd_pipe)(struct stconn *sc, struct pipe *pipe); /* send-to-pipe callback */ void (*shutr)(struct stconn *sc, enum co_shr_mode); /* shutr function */ diff --git a/include/haproxy/stconn.h b/include/haproxy/stconn.h index 12ac81b213..71338ada11 100644 --- a/include/haproxy/stconn.h +++ b/include/haproxy/stconn.h @@ -132,6 +132,40 @@ static inline size_t se_ff_data(struct sedesc *se) return (se->iobuf.data + (se->iobuf.pipe ? se->iobuf.pipe->data : 0)); } +static inline size_t se_init_ff(struct sedesc *se, struct buffer *input, size_t count, unsigned int may_splice) +{ + size_t ret = 0; + + if (se_fl_test(se, SE_FL_T_MUX)) { + const struct mux_ops *mux = se->conn->mux; + + se->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED; + if (mux->init_fastfwd && mux->done_fastfwd) { + ret = mux->init_fastfwd(se->sc, input, count, may_splice); + if ((se->iobuf.flags & IOBUF_FL_FF_BLOCKED) && !(se->sc->wait_event.events & SUB_RETRY_SEND)) { + /* The SC must be subs for send to be notify when some + * space is made + */ + mux->subscribe(se->sc, SUB_RETRY_SEND, &se->sc->wait_event); + } + goto end; + } + } + se->iobuf.flags |= IOBUF_FL_NO_FF; + + end: + return ret; +} + +static inline void se_done_ff(struct sedesc *se) +{ + if (se_fl_test(se, SE_FL_T_MUX)) { + const struct mux_ops *mux = se->conn->mux; + + BUG_ON(!mux->done_fastfwd); + mux->done_fastfwd(se->sc); + } +} /* stream connector version */ static forceinline void sc_ep_zero(struct stconn *sc)