]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: stream: Pass an optional input buffer when a stream is created
authorChristopher Faulet <cfaulet@haproxy.com>
Mon, 14 Sep 2020 09:40:13 +0000 (11:40 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Fri, 4 Dec 2020 13:41:48 +0000 (14:41 +0100)
It is now possible to set the buffer used by the channel request buffer when
a stream is created. It may be useful if input data are already received,
instead of waiting the first call to the mux rcv_buf() callback. This change
is mandatory to support H1 connection with no stream attached.

For now, the multiplexers don't pass any buffer. BUF_NULL is thus used to
call stream_create_from_cs().

include/haproxy/channel.h
include/haproxy/stream.h
src/flt_spoe.c
src/hlua.c
src/mux_h1.c
src/mux_h2.c
src/mux_pt.c
src/peers.c
src/sink.c
src/stream.c

index de51d046789e2e24d5923d606799d056588ef0ba..65fd57d2f9df06c97051f62fa511bacd259e9893 100644 (file)
@@ -312,16 +312,16 @@ static inline size_t ci_contig_data(const struct channel *c)
 }
 
 /* Initialize all fields in the channel. */
-static inline void channel_init(struct channel *chn)
+static inline void channel_init(struct channel *chn, struct buffer *input)
 {
-       chn->buf = BUF_NULL;
+       chn->buf = *input;
        chn->to_forward = 0;
        chn->last_read = now_ms;
        chn->xfer_small = chn->xfer_large = 0;
-       chn->total = 0;
+       chn->total = (IS_HTX_STRM(chn_strm(chn)) ? htxbuf(input)->data : b_data(input));
        chn->pipe = NULL;
        chn->analysers = 0;
-       chn->flags = 0;
+       chn->flags = (chn->total ? CF_READ_PARTIAL : 0);
        chn->output = 0;
 }
 
index e50fbc8af90ae8c55667b6a10aa98016a246538a..e5c9c2937f7631bcd7dc0cfedd80e9526ac241f4 100644 (file)
@@ -58,8 +58,8 @@ extern struct list streams;
 
 extern struct data_cb sess_conn_cb;
 
-struct stream *stream_new(struct session *sess, enum obj_type *origin);
-int stream_create_from_cs(struct conn_stream *cs);
+struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input);
+int stream_create_from_cs(struct conn_stream *cs, struct buffer *input);
 
 /* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
 void stream_shutdown(struct stream *stream, int why);
index 58a7e7903c66835fb76fd9bfab2c1c1213bc29e4..2de26b4a59b2fbe040b09ab26bf27940db471ca5 100644 (file)
@@ -1983,7 +1983,7 @@ spoe_create_appctx(struct spoe_config *conf)
        if (!sess)
                goto out_free_spoe;
 
-       if ((strm = stream_new(sess, &appctx->obj_type)) == NULL)
+       if ((strm = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL)
                goto out_free_sess;
 
        stream_set_backend(strm, conf->agent->b.be);
index fbccaaaaaf418832ab284bda8d26fba690d1343a..b6138251aafd3a4d485d324e20157f24feb4ec0a 100644 (file)
@@ -2843,7 +2843,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
                goto out_fail_sess;
        }
 
-       strm = stream_new(sess, &appctx->obj_type);
+       strm = stream_new(sess, &appctx->obj_type, &BUF_NULL);
        if (!strm) {
                hlua_pusherror(L, "socket: out of memory");
                goto out_fail_stream;
index fc4fde7bb1d89a291f5e4f8640bf735402f7e3e4..e2c969361ca36ec1d97b295b5de1de96f02e6dac 100644 (file)
@@ -508,7 +508,7 @@ static inline size_t h1s_data_pending(const struct h1s *h1s)
        return b_data(&h1s->h1c->ibuf);
 }
 
-static struct conn_stream *h1s_new_cs(struct h1s *h1s)
+static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
 {
        struct conn_stream *cs;
 
@@ -529,10 +529,11 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s)
                cs->flags |= CS_FL_MAY_SPLICE;
        }
 
-       if (stream_create_from_cs(cs) < 0) {
+       if (stream_create_from_cs(cs, input) < 0) {
                TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1s->h1c->conn, h1s);
                goto err;
        }
+       *input = BUF_NULL;
 
        TRACE_LEAVE(H1_EV_STRM_NEW, h1s->h1c->conn, h1s);
        return cs;
@@ -597,7 +598,7 @@ static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs, struct se
                h1s->cs = cs;
        }
        else {
-               cs = h1s_new_cs(h1s);
+               cs = h1s_new_cs(h1s, &BUF_NULL);
                if (!cs)
                        goto fail;
        }
index 741708d19df743a6b48df949d32f7a7fd7c36536..75874a38e64406a12733b89b0c59dd1c131d43bd 100644 (file)
@@ -1483,7 +1483,7 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id)
        cs->ctx = h2s;
        h2c->nb_cs++;
 
-       if (stream_create_from_cs(cs) < 0)
+       if (stream_create_from_cs(cs, &BUF_NULL) < 0)
                goto out_free_cs;
 
        /* We want the accept date presented to the next stream to be the one
index 57c1b9ef4178a712a61d6b0b522c42608a748aaa..3161d16768e58393f9e78c3fc0ddeb942e32b086 100644 (file)
@@ -112,7 +112,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
                if (!cs)
                        goto fail_free_ctx;
 
-               if (stream_create_from_cs(cs) < 0)
+               if (stream_create_from_cs(cs, &BUF_NULL) < 0)
                        goto fail_free;
 
        }
index abc2c596b815f1c9de27a2ebcdb57333a6d208c6..b5c1d429f33e52ed2747dfa51c98857929db2074 100644 (file)
@@ -2675,7 +2675,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
                goto out_free_appctx;
        }
 
-       if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
+       if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
                ha_alert("Failed to initialize stream in peer_session_create().\n");
                goto out_free_sess;
        }
index a7f689780745fbccd44ac2140dda3073d425810e..35bf7707c404904470d21b6e1557fd02815814b9 100644 (file)
@@ -640,7 +640,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
                goto out_free_appctx;
        }
 
-       if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
+       if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
                ha_alert("Failed to initialize stream in peer_session_create().\n");
                goto out_free_sess;
        }
index abef21032a0d039df90efa87a6897ca6ec9aada8..0ca54afe03d3d50159d7ca6ca3e912aa203c3ffa 100644 (file)
@@ -269,11 +269,11 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace
  * valid right after the handshake, before the connection's data layer is
  * initialized, because it relies on the session to be in conn->owner.
  */
-int stream_create_from_cs(struct conn_stream *cs)
+int stream_create_from_cs(struct conn_stream *cs, struct buffer *input)
 {
        struct stream *strm;
 
-       strm = stream_new(cs->conn->owner, &cs->obj_type);
+       strm = stream_new(cs->conn->owner, &cs->obj_type, input);
        if (strm == NULL)
                return -1;
 
@@ -313,9 +313,11 @@ int stream_buf_available(void *arg)
  * end point is assigned to <origin>, which must be valid. The stream's task
  * is configured with a nice value inherited from the listener's nice if any.
  * The task's context is set to the new stream, and its function is set to
- * process_stream(). Target and analysers are null.
+ * process_stream(). Target and analysers are null. <input> is always used as
+ * Input buffer and may contain data. It is the caller responsibility to not
+ * reuse it anymore. <input> may point on BUF_NULL.
  */
-struct stream *stream_new(struct session *sess, enum obj_type *origin)
+struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input)
 {
        struct stream *s;
        struct task *t;
@@ -405,8 +407,6 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
         * when the default backend is assigned.
         */
        s->be  = sess->fe;
-       s->req.buf = BUF_NULL;
-       s->res.buf = BUF_NULL;
        s->req_cap = NULL;
        s->res_cap = NULL;
 
@@ -462,7 +462,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
        /* init store persistence */
        s->store_count = 0;
 
-       channel_init(&s->req);
+       channel_init(&s->req, input);
        s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */
        s->req.analysers = sess->listener ? sess->listener->analysers : 0;
 
@@ -477,7 +477,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
        s->req.wex = TICK_ETERNITY;
        s->req.analyse_exp = TICK_ETERNITY;
 
-       channel_init(&s->res);
+       channel_init(&s->res, &BUF_NULL);
        s->res.flags |= CF_ISRESP;
        s->res.analysers = 0;