}
/* Initialize all fields in the channel. */
-static inline void channel_init(struct channel *chn)
+static inline void channel_init(struct channel *chn, struct buffer *input)
{
- chn->buf = BUF_NULL;
+ chn->buf = *input;
chn->to_forward = 0;
chn->last_read = now_ms;
chn->xfer_small = chn->xfer_large = 0;
- chn->total = 0;
+ chn->total = (IS_HTX_STRM(chn_strm(chn)) ? htxbuf(input)->data : b_data(input));
chn->pipe = NULL;
chn->analysers = 0;
- chn->flags = 0;
+ chn->flags = (chn->total ? CF_READ_PARTIAL : 0);
chn->output = 0;
}
extern struct data_cb sess_conn_cb;
-struct stream *stream_new(struct session *sess, enum obj_type *origin);
-int stream_create_from_cs(struct conn_stream *cs);
+struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input);
+int stream_create_from_cs(struct conn_stream *cs, struct buffer *input);
/* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
void stream_shutdown(struct stream *stream, int why);
if (!sess)
goto out_free_spoe;
- if ((strm = stream_new(sess, &appctx->obj_type)) == NULL)
+ if ((strm = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL)
goto out_free_sess;
stream_set_backend(strm, conf->agent->b.be);
goto out_fail_sess;
}
- strm = stream_new(sess, &appctx->obj_type);
+ strm = stream_new(sess, &appctx->obj_type, &BUF_NULL);
if (!strm) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_stream;
return b_data(&h1s->h1c->ibuf);
}
-static struct conn_stream *h1s_new_cs(struct h1s *h1s)
+static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
{
struct conn_stream *cs;
cs->flags |= CS_FL_MAY_SPLICE;
}
- if (stream_create_from_cs(cs) < 0) {
+ if (stream_create_from_cs(cs, input) < 0) {
TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1s->h1c->conn, h1s);
goto err;
}
+ *input = BUF_NULL;
TRACE_LEAVE(H1_EV_STRM_NEW, h1s->h1c->conn, h1s);
return cs;
h1s->cs = cs;
}
else {
- cs = h1s_new_cs(h1s);
+ cs = h1s_new_cs(h1s, &BUF_NULL);
if (!cs)
goto fail;
}
cs->ctx = h2s;
h2c->nb_cs++;
- if (stream_create_from_cs(cs) < 0)
+ if (stream_create_from_cs(cs, &BUF_NULL) < 0)
goto out_free_cs;
/* We want the accept date presented to the next stream to be the one
if (!cs)
goto fail_free_ctx;
- if (stream_create_from_cs(cs) < 0)
+ if (stream_create_from_cs(cs, &BUF_NULL) < 0)
goto fail_free;
}
goto out_free_appctx;
}
- if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
+ if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in peer_session_create().\n");
goto out_free_sess;
}
goto out_free_appctx;
}
- if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
+ if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in peer_session_create().\n");
goto out_free_sess;
}
* valid right after the handshake, before the connection's data layer is
* initialized, because it relies on the session to be in conn->owner.
*/
-int stream_create_from_cs(struct conn_stream *cs)
+int stream_create_from_cs(struct conn_stream *cs, struct buffer *input)
{
struct stream *strm;
- strm = stream_new(cs->conn->owner, &cs->obj_type);
+ strm = stream_new(cs->conn->owner, &cs->obj_type, input);
if (strm == NULL)
return -1;
* end point is assigned to <origin>, which must be valid. The stream's task
* is configured with a nice value inherited from the listener's nice if any.
* The task's context is set to the new stream, and its function is set to
- * process_stream(). Target and analysers are null.
+ * process_stream(). Target and analysers are null. <input> is always used as
+ * Input buffer and may contain data. It is the caller responsibility to not
+ * reuse it anymore. <input> may point on BUF_NULL.
*/
-struct stream *stream_new(struct session *sess, enum obj_type *origin)
+struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input)
{
struct stream *s;
struct task *t;
* when the default backend is assigned.
*/
s->be = sess->fe;
- s->req.buf = BUF_NULL;
- s->res.buf = BUF_NULL;
s->req_cap = NULL;
s->res_cap = NULL;
/* init store persistence */
s->store_count = 0;
- channel_init(&s->req);
+ channel_init(&s->req, input);
s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */
s->req.analysers = sess->listener ? sess->listener->analysers : 0;
s->req.wex = TICK_ETERNITY;
s->req.analyse_exp = TICK_ETERNITY;
- channel_init(&s->res);
+ channel_init(&s->res, &BUF_NULL);
s->res.flags |= CF_ISRESP;
s->res.analysers = 0;