#include <common/config.h>
#include <proto/connection.h>
+#include <proto/h1.h>
+#include <proto/log.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
#define H1C_F_CS_SHUTW_NOW 0x00002000 /* connection must be shut down for writes ASAP */
#define H1C_F_CS_SHUTW 0x00004000 /* connection is already shut down */
+
/*
* H1 Stream flags (32 bits)
*/
-// TODO
+#define H1S_F_NONE 0x00000000
+#define H1S_F_ERROR 0x00000001 /* An error occurred on the H1 stream */
+#define H1S_F_MSG_XFERED 0x00000002 /* current message was transferred to the data layer */
+
/* H1 connection descriptor */
-//struct h1s;
struct h1c {
struct connection *conn;
struct proxy *px;
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
struct h1s *h1s; /* H1 stream descriptor */
- int timeout; /* idle timeout */
struct task *task; /* timeout management task */
+
+ int idle_exp; /* expiration date for idle connections, in ticks (client-side only)*/
+ int http_exp; /* expiration date for HTTP headers parsing (client-side only) */
};
/* H1 stream descriptor */
struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */
struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */
+
+ struct h1m req;
+ struct h1m res;
+
+ enum http_meth_t meth; /* HTTP resquest method */
+ uint16_t status; /* HTTP response status */
};
/* the h1c and h1s pools */
h1s->cs = NULL;
h1s->rxbuf = BUF_NULL;
+ h1s->flags = H1S_F_NONE;
h1s->recv_wait = NULL;
h1s->send_wait = NULL;
+
+ h1m_init_req(&h1s->req);
+ h1m_init_res(&h1s->res);
+
+ h1s->status = 0;
+ h1s->meth = HTTP_METH_OTHER;
+
+ if (!conn_is_back(h1c->conn)) {
+ if (h1c->px->options2 & PR_O2_REQBUG_OK)
+ h1s->req.err_pos = -1;
+ }
+ else {
+ if (h1c->px->options2 & PR_O2_RSPBUG_OK)
+ h1s->res.err_pos = -1;
+ }
end:
return h1s;
}
h1c->ibuf = BUF_NULL;
h1c->obuf = BUF_NULL;
h1c->h1s = NULL;
- h1c->timeout = 0;
t = task_new(tid_bit);
if (!t)
t->context = h1c;
t->expire = TICK_ETERNITY;
+ h1c->idle_exp = TICK_ETERNITY;
+ h1c->http_exp = TICK_ETERNITY;
+
LIST_INIT(&h1c->buf_wait.list);
h1c->wait_event.task = tasklet_new();
if (!h1c->wait_event.task)
h1c->wait_event.task->context = h1c;
h1c->wait_event.wait_reason = 0;
- conn->mux_ctx = h1c;
+ /* For backend mux connection, the CS already exists. In such case,
+ * create h1s and attached the cs to it.
+ */
if (cs) {
- struct h1s *h1s;
-
- h1s = h1s_create(h1c);
- if (!h1s)
- goto fail;
- h1s->cs = cs;
- cs->ctx = h1s;
+ struct h1s *h1s = cs->ctx;
+
+ if (!h1s) {
+ h1s = h1s_create(h1c);
+ if (!h1s)
+ goto fail;
+ cs->ctx = h1s;
+ h1s->cs = cs;
+ }
}
+ conn->mux_ctx = h1c;
+ task_wakeup(t, TASK_WOKEN_INIT);
+
/* Try to read, if nothing is available yet we'll just subscribe */
if (h1_recv(h1c))
h1_process(h1c);
if (h1c->wait_event.task)
tasklet_free(h1c->wait_event.task);
+ if (h1c->h1s)
+ h1s_destroy(h1c->h1s);
+
if (h1c->wait_event.wait_reason != 0)
conn->xprt->unsubscribe(conn, h1c->wait_event.wait_reason,
&h1c->wait_event);
/******************************************************/
/* functions below are for the H1 protocol processing */
/******************************************************/
-static void h1_process_input(struct h1c *h1c)
+/*
+ * Set the appropriate error message. It first tries to get it from the proxy if
+ * it exists. Otherwise, it falls back on default one.
+ */
+static void h1_cpy_error_message(struct h1c *h1c, struct buffer *dst, int status)
+{
+ const int msgnum = http_get_status_idx(status);
+ const struct buffer *err;
+
+ err = (h1c->px->errmsg[msgnum].area
+ ? &h1c->px->errmsg[msgnum]
+ : &http_err_chunks[msgnum]);
+ b_putblk(dst, b_head(err), b_data(err));
+}
+
+/*
+ * Parse HTTP/1 headers. It returns the number of bytes parsed if > 0, or 0 if
+ * it couldn't proceed. Parsing errors are reported by setting H1S_F_ERROR flag
+ * and filling h1s->err_pos and h1s->err_state fields. This functions is
+ * responsibile to update the parser state <h1m>.
+ */
+static size_t h1_process_headers(struct h1s *h1s, struct h1m *h1m,
+ struct buffer *buf, size_t ofs, size_t max)
+{
+ struct http_hdr hdrs[MAX_HTTP_HDR];
+ union h1_sl sl;
+ int ret = 0;
+
+ /* Realing input buffer if necessary */
+ if (b_head(buf) + b_data(buf) > b_wrap(buf))
+ b_slow_realign(buf, trash.area, 0);
+
+ ret = h1_headers_to_hdr_list(b_peek(buf, ofs), b_peek(buf, ofs) + max,
+ hdrs, sizeof(hdrs)/sizeof(hdrs[0]), h1m, &sl);
+ if (ret <= 0) {
+ /* Incomplete or invalid message. If the buffer is full, it's an
+ * error because headers are too large to be handled by the
+ * parser. */
+ if (ret < 0 || (!ret && b_full(buf))) {
+ h1s->flags |= H1S_F_ERROR;
+ h1m->err_state = h1m->state;
+ h1m->err_pos = h1m->next;
+ ret = 0;
+ }
+ goto end;
+ }
+
+ /* messages headers fully parsed, do some checks to prepare the body
+ * parsing.
+ */
+
+ /* Be sure to keep some space to do headers rewritting */
+ if (ret > (b_size(buf) - global.tune.maxrewrite)) {
+ h1s->flags |= H1S_F_ERROR;
+ h1m->err_state = h1m->state;
+ h1m->err_pos = h1m->next;
+ ret = 0;
+ goto end;
+ }
+
+ /* Save the request's method or the response's status and check if the
+ * body length is known */
+ if (!(h1m->flags & H1_MF_RESP)) {
+ h1s->meth = sl.rq.meth;
+ /* Request have always a known length */
+ h1m->flags |= H1_MF_XFER_LEN;
+ if (!(h1m->flags & H1_MF_CHNK) && !h1m->body_len)
+ h1m->state = H1_MSG_DONE;
+ }
+ else {
+ h1s->status = sl.st.status;
+
+ if ((h1s->meth == HTTP_METH_HEAD) ||
+ (h1s->status >= 100 && h1s->status < 200) ||
+ (h1s->status == 204) || (h1s->status == 304) ||
+ (h1s->meth == HTTP_METH_CONNECT && h1s->status == 200)) {
+ h1m->flags &= ~(H1_MF_CLEN|H1_MF_CHNK);
+ h1m->flags |= H1_MF_XFER_LEN;
+ h1m->curr_len = h1m->body_len = 0;
+ h1m->state = H1_MSG_DONE;
+ }
+ else if (h1m->flags & (H1_MF_CLEN|H1_MF_CHNK)) {
+ h1m->flags |= H1_MF_XFER_LEN;
+ if ((h1m->flags & H1_MF_CLEN) && !h1m->body_len)
+ h1m->state = H1_MSG_DONE;
+ }
+ else
+ h1m->state = H1_MSG_TUNNEL;
+ }
+
+ end:
+ return ret;
+}
+
+/*
+ * Parse HTTP/1 body. It returns the number of bytes parsed if > 0, or 0 if
+ * it couldn't proceed. Parsing errors are reported by setting H1S_F_ERROR flag
+ * and filling h1s->err_pos and h1s->err_state fields. This functions is
+ * responsibile to update the parser state <h1m>.
+ */
+static size_t h1_process_data(struct h1s *h1s, struct h1m *h1m,
+ struct buffer *buf, size_t ofs, size_t max)
+{
+ size_t total = 0;
+ int ret = 0;
+
+ if (h1m->flags & H1_MF_XFER_LEN) {
+ if (h1m->flags & H1_MF_CLEN) {
+ /* content-length: read only h2m->body_len */
+ ret = max;
+ if ((uint64_t)ret > h1m->curr_len)
+ ret = h1m->curr_len;
+ h1m->curr_len -= ret;
+ total += ret;
+ if (!h1m->curr_len)
+ h1m->state = H1_MSG_DONE;
+ }
+ else if (h1m->flags & H1_MF_CHNK) {
+ new_chunk:
+ /* te:chunked : parse chunks */
+ if (h1m->state == H1_MSG_CHUNK_CRLF) {
+ ret = h1_skip_chunk_crlf(buf, ofs, ofs + max);
+ if (ret <= 0)
+ goto end;
+ max -= ret;
+ ofs += ret;
+ total += ret;
+ h1m->state = H1_MSG_CHUNK_SIZE;
+ }
+
+ if (h1m->state == H1_MSG_CHUNK_SIZE) {
+ unsigned int chksz;
+
+ ret = h1_parse_chunk_size(buf, ofs, ofs + max, &chksz);
+ if (ret <= 0)
+ goto end;
+ h1m->curr_len = chksz;
+ h1m->body_len += chksz;
+ max -= ret;
+ ofs += ret;
+ total += ret;
+ h1m->state = (!chksz ? H1_MSG_TRAILERS : H1_MSG_DATA);
+ }
+
+ if (h1m->state == H1_MSG_DATA) {
+ ret = max;
+ if (!ret)
+ goto end;
+ if ((uint64_t)ret > h1m->curr_len)
+ ret = h1m->curr_len;
+ h1m->curr_len -= ret;
+ max -= ret;
+ ofs += ret;
+ total += ret;
+ if (h1m->curr_len)
+ goto end;
+ h1m->state = H1_MSG_CHUNK_CRLF;
+ goto new_chunk;
+ }
+
+ if (h1m->state == H1_MSG_TRAILERS) {
+ ret = h1_measure_trailers(buf, ofs, ofs + max);
+ if (ret <= 0)
+ goto end;
+ max -= ret;
+ ofs += ret;
+ total += ret;
+ h1m->state = H1_MSG_DONE;
+ }
+ }
+ else {
+ /* XFER_LEN is set but not CLEN nor CHNK, it means there
+ * is no body. Switch the message in DONE state
+ */
+ h1m->state = H1_MSG_DONE;
+ }
+ }
+ else {
+ /* no content length, read till SHUTW */
+ total = max;
+ }
+
+ end:
+ if (ret < 0) {
+ h1s->flags |= H1S_F_ERROR;
+ h1m->err_state = h1m->state;
+ h1m->err_pos = ofs + max + ret;
+ return 0;
+ }
+
+ return total;
+}
+
+/*
+ * Synchronize the request and the response before reseting them. Except for 1xx
+ * responses, we wait that the request and the response are in DONE state and
+ * that all data are forwarded for both. For 1xx responses, only the response is
+ * reset, waiting the final one. Many 1xx messages can be sent.
+ */
+static void h1_sync_messages(struct h1c *h1c)
+{
+ if (!h1c->h1s)
+ return;
+
+ if (h1c->h1s->res.state >= H1_MSG_DONE &&
+ (h1c->h1s->status < 200 && (h1c->h1s->status == 100 || h1c->h1s->status >= 102)) &&
+ ((conn_is_back(h1c->conn) && !b_data(&h1c->obuf)) || !b_data(&h1c->h1s->rxbuf))) {
+ /* For 100-Continue response or any other informational 1xx
+ * response which is non-final, don't reset the request, the
+ * transaction is not finished. We take care the response was
+ * transferred before.
+ */
+ h1m_init_res(&h1c->h1s->res);
+ }
+ else if (!b_data(&h1c->h1s->rxbuf) && !b_data(&h1c->obuf) &&
+ h1c->h1s->req.state >= H1_MSG_DONE && h1c->h1s->res.state >= H1_MSG_DONE) {
+ h1m_init_req(&h1c->h1s->req);
+ h1m_init_res(&h1c->h1s->res);
+
+ // TODO: For now, the Keep-alive timeout is handled by the stream.
+ //if (h1c->task && !conn_is_back(h1c->conn))
+ // h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpka);
+ }
+}
+
+/*
+ * Process incoming data. It parses data and transfer them from h1c->ibuf into
+ * h1s->rxbuf. It returns the number of bytes parsed and transferred if > 0, or
+ * 0 if it couldn't proceed.
+ */
+static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count)
{
struct h1s *h1s = h1c->h1s;
struct conn_stream *cs = NULL;
+ struct h1m *h1m;
+ size_t total = 0;
+ size_t ret = 0;
if (h1c->flags & H1C_F_CS_ERROR)
goto end;
h1s = h1s_create(h1c);
if (h1s == NULL)
goto err;
-
- cs = cs_new(h1c->conn);
- if (!cs)
- goto err;
-
- h1s->cs = cs;
- cs->ctx = h1s;
- if (stream_create_from_cs(cs) < 0)
- goto err;
}
if (!h1_get_buf(h1c, &h1s->rxbuf)) {
goto end;
}
- b_xfer(&h1s->rxbuf, &h1c->ibuf, b_room(&h1s->rxbuf));
+ if (count > b_room(&h1s->rxbuf))
+ count = b_room(&h1s->rxbuf);
- if (!b_full(&h1c->ibuf)) {
- h1c->flags &= ~H1C_F_IN_FULL;
- if (!b_data(&h1c->ibuf))
- h1_release_buf(h1c, &h1c->ibuf);
+ h1m = (!conn_is_back(h1c->conn) ? &h1s->req : &h1s->res);
+ while (h1m->state < H1_MSG_DONE && count) {
+ if (h1m->state <= H1_MSG_LAST_LF) {
+ if (h1m->state == H1_MSG_RQBEFORE) {
+ if (h1c->task && !conn_is_back(h1c->conn))
+ if (!h1s->cs)
+ h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpreq);
+ }
+ ret = h1_process_headers(h1s, h1m, buf, total, count);
+ if (!ret)
+ break;
+
+ /* Create the CS if not already attached to the H1S */
+ if (!h1s->cs) {
+ cs = cs_new(h1c->conn);
+ if (!cs)
+ goto err;
+ h1s->cs = cs;
+ cs->ctx = h1s;
+ if (stream_create_from_cs(cs) < 0)
+ goto err;
+ }
+
+ if (h1c->task && !conn_is_back(h1c->conn))
+ h1c->http_exp = TICK_ETERNITY;
+ }
+ else if (h1m->state <= H1_MSG_TRAILERS) {
+ /* Do not parse the body if the header part is not yet
+ * transferred to the stream.
+ */
+ if (!(h1s->flags & H1S_F_MSG_XFERED))
+ break;
+ ret = h1_process_data(h1s, h1m, buf, total, count);
+ if (!ret)
+ break;
+ }
+ else {
+ h1s->flags |= H1S_F_ERROR;
+ break;
+ }
+
+ total += ret;
+ count -= ret;
+
+ if ((h1s->flags & H1S_F_ERROR))
+ break;
}
+
+ if (h1s->flags & H1S_F_ERROR) {
+ /* For now, if an error occurred during the message parsing when
+ * a stream is already attached to the mux, we transfer
+ * everything to let the stream handle the error itself. We
+ * suppose the stream will detect the same error of
+ * course. Otherwise, we generate the error here.
+ */
+ if (!h1s->cs) {
+ if (!h1_get_buf(h1c, &h1c->obuf)) {
+ h1c->flags |= H1C_F_OUT_ALLOC;
+ goto err;
+ }
+ h1_cpy_error_message(h1c, &h1c->obuf, 400);
+ goto err;
+ }
+ total += count;
+ }
+
+ ret = b_xfer(&h1s->rxbuf, buf, total);
+
if (b_data(&h1s->rxbuf)) {
h1s->cs->flags |= CS_FL_RCV_MORE;
if (b_full(&h1s->rxbuf))
h1c->flags |= H1C_F_RX_FULL;
}
+
end:
- return;
+ return ret;
err:
if (cs)
if (h1s)
h1s_destroy(h1s);
h1c->flags |= H1C_F_CS_ERROR;
+ sess_log(h1c->conn->owner);
+ ret = 0;
goto end;
}
+/*
+ * Process outgoing data. It parses data and transfer them from the channel buffer into
+ * h1c->obuf. It returns the number of bytes parsed and transferred if > 0, or
+ * 0 if it couldn't proceed.
+ */
static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t count)
{
+ struct h1s *h1s = h1c->h1s;
+ struct h1m *h1m;
+ size_t total = 0;
size_t ret = 0;
if (!h1_get_buf(h1c, &h1c->obuf)) {
if (count > b_room(&h1c->obuf))
count = b_room(&h1c->obuf);
- ret = b_xfer(&h1c->obuf, buf, count);
+ h1m = (!conn_is_back(h1c->conn) ? &h1s->res : &h1s->req);
+ while (h1m->state < H1_MSG_DONE && count) {
+ if (h1m->state <= H1_MSG_LAST_LF) {
+ ret = h1_process_headers(h1s, h1m, buf, total, count);
+ if (!ret) {
+ /* incomplete or invalid response, this is abnormal coming from
+ * haproxy and may only result in a bad errorfile or bad Lua code
+ * so that won't be fixed, raise an error now.
+ */
+ h1s->flags |= H1S_F_ERROR;
+ break;
+ }
+ }
+ else if (h1m->state <= H1_MSG_TRAILERS) {
+ ret = h1_process_data(h1s, h1m, buf, total, count);
+ if (!ret)
+ break;
+ }
+ else {
+ h1s->flags |= H1S_F_ERROR;
+ break;
+ }
+
+ total += ret;
+ count -= ret;
+
+ if ((h1s->flags & H1S_F_ERROR))
+ break;
+ }
+
+ // TODO: Handle H1S errors
+ ret = b_xfer(&h1c->obuf, buf, total);
if (b_full(&h1c->obuf))
h1c->flags |= H1C_F_OUT_FULL;
return ret;
}
+/*
+ * Transfer data from h1s->rxbuf into the channel buffer. It returns the number
+ * of bytes transferred.
+ */
static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, size_t count)
{
struct h1c *h1c = h1s->h1c;
cs->flags |= CS_FL_RCV_MORE;
}
else {
+ if (!(h1s->flags & H1S_F_MSG_XFERED))
+ h1s->flags |= H1S_F_MSG_XFERED;
+
h1c->flags &= ~H1C_F_RX_FULL;
h1_release_buf(h1c, &h1s->rxbuf);
+ h1_sync_messages(h1c);
+
cs->flags &= ~CS_FL_RCV_MORE;
- if (!b_data(&h1c->ibuf) && cs->flags & CS_FL_REOS)
+ if (!b_data(&h1c->ibuf) && (cs->flags & CS_FL_REOS))
cs->flags |= CS_FL_EOS;
}
return ret;
if (ret > 0) {
h1c->flags &= ~H1C_F_OUT_FULL;
b_del(&h1c->obuf, ret);
+ h1_sync_messages(h1c);
sent = 1;
}
{
struct connection *conn = h1c->conn;
- if (b_data(&h1c->ibuf) && !(h1c->flags & (H1C_F_RX_FULL|H1C_F_RX_ALLOC)))
- h1_process_input(h1c);
+ if (b_data(&h1c->ibuf) && !(h1c->flags & (H1C_F_RX_FULL|H1C_F_RX_ALLOC))) {
+ size_t ret;
+
+ ret = h1_process_input(h1c, &h1c->ibuf, b_data(&h1c->ibuf));
+ if (ret > 0) {
+ h1c->flags &= ~H1C_F_IN_FULL;
+ if (!b_data(&h1c->ibuf))
+ h1_release_buf(h1c, &h1c->ibuf);
+ }
+ }
h1_send(h1c);
}
}
- if (h1c->task) {
- // TODO: update task's timeout and queue it if necessary
+ if (h1c->task && !conn_is_back(conn)) {
+ if (!h1c->h1s || !h1c->h1s->cs)
+ h1c->idle_exp = tick_add_ifset(now_ms, h1c->px->timeout.client);
+ else
+ h1c->idle_exp = TICK_ETERNITY;
+ h1c->task->expire = tick_first(h1c->http_exp, h1c->idle_exp);
}
return 0;
}
struct h1c *h1c = context;
int expired = tick_is_expired(t->expire, now_ms);
- if (!expired && h1c)
- return t;
+ if (!h1c)
+ goto end;
- task_delete(t);
- task_free(t);
+ if (!expired) {
+ /* For now, do not handle timeout for server-side mux */
+ if (!conn_is_back(h1c->conn))
+ t->expire = tick_first(t->expire, tick_first(h1c->idle_exp, h1c->http_exp));
+ return t;
+ }
- if (!h1c) {
- /* resources were already deleted */
- return NULL;
+ if (!(h1c->px->options & PR_O_IGNORE_PRB) && h1_get_buf(h1c, &h1c->obuf)) {
+ // TODO: do not send error if ka timeout
+ h1_cpy_error_message(h1c, &h1c->obuf, 408);
+ h1c->flags |= H1C_F_CS_ERROR;
+ h1c->idle_exp = TICK_ETERNITY;
+ h1c->http_exp = TICK_ETERNITY;
+ t->expire = TICK_ETERNITY;
+ tasklet_wakeup(h1c->wait_event.task);
+ sess_log(h1c->conn->owner);
+ return t;
}
h1c->task = NULL;
-
- // TODO
-
- /* either we can release everything now or it will be done later once
- * the stream closes.
- */
- if (!h1c->h1s)
+ if (!h1c->h1s || !h1c->h1s->cs)
h1_release(h1c->conn);
-
+ end:
+ task_delete(t);
+ task_free(t);
return NULL;
}