/* Flags indicating why reading input data are blocked. */
#define H1C_F_IN_ALLOC 0x00000010 /* mux is blocked on lack of input buffer */
#define H1C_F_IN_FULL 0x00000020 /* mux is blocked on input buffer full */
-/* 0x00000040 - 0x00000080 unused */
-
-/* Flags indicating why parsing data are blocked */
-#define H1C_F_RX_ALLOC 0x00000100 /* mux is blocked on lack of rx buffer */
-#define H1C_F_RX_FULL 0x00000200 /* mux is blocked on rx buffer full */
-/* 0x00000400 - 0x00000800 unused */
+/* 0x00000040 - 0x00000800 unused */
#define H1C_F_CS_ERROR 0x00001000 /* connection must be closed ASAP because an error occurred */
#define H1C_F_CS_SHUTW_NOW 0x00002000 /* connection must be shut down for writes ASAP */
#define H1S_F_ERROR 0x00000001 /* An error occurred on the H1 stream */
#define H1S_F_REQ_ERROR 0x00000002 /* An error occurred during the request parsing/xfer */
#define H1S_F_RES_ERROR 0x00000004 /* An error occurred during the response parsing/xfer */
-#define H1S_F_MSG_XFERED 0x00000008 /* current message was transferred to the data layer */
+/* 0x00000008 unused */
#define H1S_F_WANT_KAL 0x00000010
#define H1S_F_WANT_TUN 0x00000020
#define H1S_F_WANT_CLO 0x00000040
#define H1S_F_WANT_MSK 0x00000070
#define H1S_F_NOT_FIRST 0x00000080 /* The H1 stream is not the first one */
-#define H1S_F_BUF_FLUSH 0x00000100 /* Flush input buffers (ibuf and rxbuf) and don't read more data */
+#define H1S_F_BUF_FLUSH 0x00000100 /* Flush input buffer and don't read more data */
/* H1 connection descriptor */
struct conn_stream *cs;
uint32_t flags; /* Connection flags: H1S_F_* */
- struct buffer rxbuf; /*receive buffer, always valid (buf_empty or real buffer) */
-
struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */
struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */
return 1;
}
- if ((h1c->flags & H1C_F_RX_ALLOC) && h1c->h1s && b_alloc_margin(&h1c->h1s->rxbuf, 0)) {
- h1c->flags &= ~H1C_F_RX_ALLOC;
- if (h1_recv_allowed(h1c))
- tasklet_wakeup(h1c->wait_event.task);
- return 1;
- }
-
return 0;
}
h1c->h1s = h1s;
h1s->cs = NULL;
- h1s->rxbuf = BUF_NULL;
h1s->flags = H1S_F_NONE;
h1s->recv_wait = NULL;
h1s->res.err_pos = -1;
}
+ /* If a conn_stream already exists, attach it to this H1S. Otherwise we
+ * create a new one.
+ */
if (cs) {
- /* If a conn_stream already exists, attach it to this H1S */
cs->ctx = h1s;
h1s->cs = cs;
}
-#if 1
else {
cs = h1s_new_cs(h1s);
if (!cs)
goto fail;
}
-#endif
return h1s;
fail:
struct h1c *h1c = h1s->h1c;
h1c->h1s = NULL;
- h1c->flags &= ~(H1C_F_RX_FULL|H1C_F_RX_ALLOC);
if (h1s->recv_wait != NULL)
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR))
h1c->flags |= H1C_F_CS_ERROR;
- h1_release_buf(h1c, &h1s->rxbuf);
cs_free(h1s->cs);
pool_free(pool_head_h1s, h1s);
}
if (h1s->res.state == H1_MSG_DONE &&
(h1s->status < 200 && (h1s->status == 100 || h1s->status >= 102)) &&
- ((!conn_is_back(h1c->conn) && !b_data(&h1c->obuf)) || !b_data(&h1s->rxbuf))) {
+ (conn_is_back(h1c->conn) || !b_data(&h1c->obuf))) {
/* For 100-Continue response or any other informational 1xx
* response which is non-final, don't reset the request, the
* transaction is not finished. We take care the response was
h1m_init_res(&h1s->res);
h1s->res.flags |= H1_MF_NO_PHDR;
}
- else if (!b_data(&h1s->rxbuf) && !b_data(&h1c->obuf) &&
+ else if (!b_data(&h1c->obuf) &&
h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) {
if (h1s->flags & H1S_F_WANT_TUN) {
h1m_init_req(&h1s->req);
/*
* Process incoming data. It parses data and transfer them from h1c->ibuf into
- * h1s->rxbuf. It returns the number of bytes parsed and transferred if > 0, or
- * 0 if it couldn't proceed.
+ * <buf>. It returns the number of bytes parsed and transferred if > 0, or 0 if
+ * it couldn't proceed.
*/
-static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count)
+static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, int flags)
{
- struct h1s *h1s = NULL;
+ struct h1s *h1s = h1c->h1s;
struct h1m *h1m;
struct htx *htx;
size_t total = 0;
size_t ret = 0;
- size_t max;
+ size_t count, max;
int errflag;
- h1s = NULL;
-
- /* Create a new H1S if not already done */
- if (!h1c->h1s && !h1s_create(h1c, NULL))
- goto fatal_err;
- h1s = h1c->h1s;
-#if 0
- /* Create the CS if not already attached to the H1S */
- if (!h1s->cs && !h1s_new_cs(h1s))
- goto fatal_err;
-#endif
- if (!count)
- goto end;
- if (!h1_get_buf(h1c, &h1s->rxbuf)) {
- h1c->flags |= H1C_F_RX_ALLOC;
- goto end;
+ htx = htx_from_buf(buf);
+ count = b_data(&h1c->ibuf);
+ max = htx_free_space(htx);
+ if (flags & CO_RFL_KEEP_RSV) {
+ if (max < global.tune.maxrewrite)
+ goto end;
+ max -= global.tune.maxrewrite;
}
-
- htx = htx_from_buf(&h1s->rxbuf);
+ if (count > max)
+ count = max;
if (!conn_is_back(h1c->conn)) {
h1m = &h1s->req;
errflag = H1S_F_RES_ERROR;
}
- max = count;
- while (!(h1s->flags & errflag) && max) {
+ while (!(h1s->flags & errflag) && count) {
if (h1m->state <= H1_MSG_LAST_LF) {
- ret = h1_process_headers(h1s, h1m, htx, buf, &total, max);
+ ret = h1_process_headers(h1s, h1m, htx, &h1c->ibuf, &total, count);
if (!ret)
break;
-#if 0
- /* Create the CS if not already attached to the H1S */
- if (!h1s->cs && !h1s_new_cs(h1s))
- goto fatal_err;
-#endif
}
else if (h1m->state <= H1_MSG_TRAILERS) {
- /* Do not parse the body if the header part is not yet
- * transferred to the stream.
- */
- if (!(h1s->flags & H1S_F_MSG_XFERED))
- break;
- ret = h1_process_data(h1s, h1m, htx, buf, &total, max);
+ ret = h1_process_data(h1s, h1m, htx, &h1c->ibuf, &total, count);
if (!ret)
break;
}
else if (h1m->state == H1_MSG_DONE)
break;
else if (h1m->state == H1_MSG_TUNNEL) {
- ret = h1_process_data(h1s, h1m, htx, buf, &total, max);
+ ret = h1_process_data(h1s, h1m, htx, &h1c->ibuf, &total, count);
if (!ret)
break;
}
break;
}
- max -= ret;
+ count -= ret;
}
if (h1s->flags & errflag)
goto parsing_err;
- b_del(buf, total);
- if (htx_is_not_empty(htx)) {
- b_set_data(&h1s->rxbuf, b_size(&h1s->rxbuf));
- if (!htx_free_data_space(htx))
- h1c->flags |= H1C_F_RX_FULL;
+ b_del(&h1c->ibuf, total);
+
+ end:
+ if (htx_is_not_empty(htx))
+ b_set_data(buf, b_size(buf));
+ else {
+ htx_reset(htx);
+ b_set_data(buf, 0);
}
- else
- h1_release_buf(h1c, &h1s->rxbuf);
- ret = count - max;
- if (h1s->recv_wait) {
- h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
- tasklet_wakeup(h1s->recv_wait->task);
- h1s->recv_wait = NULL;
+ if (h1c->flags & H1C_F_IN_FULL && b_room(&h1c->ibuf)) {
+ h1c->flags &= ~H1C_F_IN_FULL;
+ tasklet_wakeup(h1c->wait_event.task);
}
- end:
- return ret;
- fatal_err:
- h1c->flags |= H1C_F_CS_ERROR;
- sess_log(h1c->conn->owner);
- return 0;
+ if (b_data(&h1c->ibuf))
+ h1s->cs->flags |= CS_FL_RCV_MORE;
+ else {
+ h1_release_buf(h1c, &h1c->ibuf);
+ h1_sync_messages(h1c);
+
+ h1s->cs->flags &= ~CS_FL_RCV_MORE;
+ if (h1s->cs->flags & CS_FL_REOS)
+ h1s->cs->flags |= CS_FL_EOS;
+ }
+ return total;
parsing_err:
// FIXME: create an error snapshot here
b_reset(&h1c->ibuf);
- h1s->cs->flags |= CS_FL_REOS;
- if (h1s->recv_wait) {
- h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
- tasklet_wakeup(h1s->recv_wait->task);
- h1s->recv_wait = NULL;
- }
+ htx->flags |= HTX_FL_PARSING_ERROR;
+ b_set_data(buf, b_size(buf));
+ h1s->cs->flags |= CS_FL_EOS;
return 0;
}
return total;
}
-/*
- * Transfer data from h1s->rxbuf into the channel buffer. It returns the number
- * of bytes transferred.
- */
-static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, int flags)
-{
- struct h1c *h1c = h1s->h1c;
- struct h1m *h1m;
- struct conn_stream *cs = h1s->cs;
- struct htx *mux_htx, *chn_htx;
- struct htx_ret htx_ret;
- size_t count, ret = 0;
-
- h1m = (!conn_is_back(h1c->conn) ? &h1s->req : &h1s->res);
- mux_htx = htx_from_buf(&h1s->rxbuf);
- chn_htx = htx_from_buf(buf);
-
- if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR)) {
- chn_htx->flags |= HTX_FL_PARSING_ERROR;
- b_set_data(buf, b_size(buf));
- goto end;
- }
- if (htx_is_empty(mux_htx))
- goto end;
- count = htx_free_space(chn_htx);
- if (flags & CO_RFL_KEEP_RSV) {
- if (count < global.tune.maxrewrite)
- goto end;
- count -= global.tune.maxrewrite;
- }
-
- // FIXME: if chn empty and count > htx => b_xfer !
- if (!(h1s->flags & H1S_F_MSG_XFERED)) {
- htx_ret = htx_xfer_blks(chn_htx, mux_htx, count,
- ((h1m->state == H1_MSG_DONE) ? HTX_BLK_EOM : HTX_BLK_EOH));
- ret = htx_ret.ret;
- if (htx_ret.blk && htx_get_blk_type(htx_ret.blk) >= HTX_BLK_EOH)
- h1s->flags |= H1S_F_MSG_XFERED;
- }
- else {
- htx_ret = htx_xfer_blks(chn_htx, mux_htx, count, HTX_BLK_EOM);
- ret = htx_ret.ret;
- }
- chn_htx->extra = mux_htx->extra;
- if (h1m->flags & H1_MF_XFER_LEN)
- chn_htx->extra += mux_htx->data;
-
- if (htx_is_not_empty(chn_htx))
- b_set_data(buf, b_size(buf));
- end:
- if (h1c->flags & H1C_F_RX_FULL && htx_free_data_space(mux_htx)) {
- h1c->flags &= ~H1C_F_RX_FULL;
- tasklet_wakeup(h1c->wait_event.task);
- }
-
- if (htx_is_not_empty(mux_htx)) {
- cs->flags |= CS_FL_RCV_MORE;
- }
- else {
- h1c->flags &= ~H1C_F_RX_FULL;
- h1_release_buf(h1c, &h1s->rxbuf);
- h1_sync_messages(h1c);
-
- cs->flags &= ~CS_FL_RCV_MORE;
- if (!b_data(&h1c->ibuf) && (cs->flags & CS_FL_REOS))
- cs->flags |= CS_FL_EOS;
- }
- return ret;
-}
-
/*********************************************************/
/* functions below are I/O callbacks from the connection */
/*********************************************************/
if (h1c->wait_event.wait_reason & SUB_CAN_RECV)
return 0;
- if (!h1_recv_allowed(h1c)) {
- if (h1c->h1s && b_data(&h1c->h1s->rxbuf))
- rcvd = 1;
+ if (!h1_recv_allowed(h1c))
goto end;
- }
if (h1c->h1s && (h1c->h1s->flags & H1S_F_BUF_FLUSH)) {
rcvd = 1;
h1c->flags &= ~H1C_F_OUT_FULL;
b_del(&h1c->obuf, ret);
sent = 1;
-
- if (h1c->h1s && h1c->h1s->send_wait) {
- h1c->h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
- tasklet_wakeup(h1c->h1s->send_wait->task);
- h1c->h1s->send_wait = NULL;
- }
}
end:
{
struct connection *conn = h1c->conn;
- if (!(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) {
- size_t ret;
-
- ret = h1_process_input(h1c, &h1c->ibuf, b_data(&h1c->ibuf));
- if (ret > 0) {
- h1c->flags &= ~H1C_F_IN_FULL;
- if (!b_data(&h1c->ibuf))
- h1_release_buf(h1c, &h1c->ibuf);
- }
- }
-
- h1_send(h1c);
-
if (!conn->mux_ctx)
return -1;
if (h1c->flags & H1C_F_CS_WAIT_CONN) {
- if (conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR)) {
- h1c->flags &= ~H1C_F_CS_WAIT_CONN;
- h1_wake_stream(h1c);
- }
- goto end;
+ if (!(conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR)))
+ goto end;
+ h1c->flags &= ~H1C_F_CS_WAIT_CONN;
}
- if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn)) {
- h1_wake_stream(h1c);
- if (!h1c->h1s || !h1c->h1s->cs) {
- h1_release(conn);
- return -1;
+ if (!h1c->h1s) {
+ if (h1c->flags & H1C_F_CS_ERROR ||
+ conn->flags & CO_FL_ERROR ||
+ conn_xprt_read0_pending(conn))
+ goto release;
+ if (!(h1c->flags & (H1C_F_CS_SHUTW_NOW|H1C_F_CS_SHUTW))) {
+ if (!h1s_create(h1c, NULL))
+ goto release;
}
}
+ h1_wake_stream(h1c);
end:
return 0;
+
+ release:
+ h1_release(conn);
+ return -1;
}
static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status)
ret = h1_send(h1c);
if (!(h1c->wait_event.wait_reason & SUB_CAN_RECV))
ret |= h1_recv(h1c);
- if (ret || b_data(&h1c->ibuf) || (h1c->h1s && b_data(&h1c->h1s->rxbuf)))
+ if (ret/* || b_data(&h1c->ibuf)*/)
h1_process(h1c);
return NULL;
}
{
struct h1c *h1c = conn->mux_ctx;
- return (h1_process(h1c));
+ h1_send(h1c);
+ return h1_process(h1c);
}
/*******************************************/
static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
struct h1s *h1s = cs->ctx;
+ struct h1c *h1c = h1s->h1c;
size_t ret = 0;
- if (!h1s)
- return ret;
-
- if (!(h1s->h1c->flags & H1C_F_RX_ALLOC))
- ret = h1_xfer(h1s, buf, flags);
+ if (!(h1c->flags & H1C_F_IN_ALLOC))
+ ret = h1_process_input(h1c, buf, flags);
if (flags & CO_RFL_BUF_FLUSH)
h1s->flags |= H1S_F_BUF_FLUSH;
else if (ret > 0 || (h1s->flags & H1S_F_BUF_FLUSH)) {
h1s->flags &= ~H1S_F_BUF_FLUSH;
- if (!(h1s->h1c->wait_event.wait_reason & SUB_CAN_RECV))
- tasklet_wakeup(h1s->h1c->wait_event.task);
+ if (!(h1c->wait_event.wait_reason & SUB_CAN_RECV))
+ tasklet_wakeup(h1c->wait_event.task);
}
return ret;
}
if (h1c->flags & H1C_F_CS_WAIT_CONN)
return 0;
- if (!(h1c->flags & (H1C_F_OUT_FULL|H1C_F_OUT_ALLOC)) && b_data(buf))
+ if (!(h1c->flags & (H1C_F_OUT_FULL|H1C_F_OUT_ALLOC)))
ret = h1_process_output(h1c, buf, count);
if (ret > 0) {
h1_send(h1c);
- /* We need to do that because of the infinite forwarding. */
+ /* We need to do that because of the infinite forwarding. <buf>
+ * contains HTX messages so when infinite forwarding is enabled,
+ * count is equal to the buffer size. From outside, the buffer
+ * appears as full.
+ */
if (!b_data(buf))
ret = count;
}
struct h1m *h1m = (!conn_is_back(cs->conn) ? &h1s->req : &h1s->res);
int ret = 0;
- if (b_data(&h1s->rxbuf) || b_data(&h1s->h1c->ibuf))
+ if (b_data(&h1s->h1c->ibuf))
goto end;
if (h1m->state == H1_MSG_DATA && count > h1m->curr_len)
count = h1m->curr_len;