From: Christopher Faulet Date: Wed, 31 Oct 2018 16:40:50 +0000 (+0100) Subject: MEDIUM: mux-h1: Handle errors and timeouts in the stream X-Git-Tag: v1.9-dev7~40 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=473652733a39604e4e803409cb00d55d02e9a502;p=thirdparty%2Fhaproxy.git MEDIUM: mux-h1: Handle errors and timeouts in the stream To do so, the stream is created as earlier as possible. It means, during the mux creation for the first request, and for others, just at the end of the previous transaction. Because all timeouts are handled by the strream, the mux's task is now useless, so it is removed. Finally, to report errors, flags are set on the HTX message. The HTX message is passed to the stream if there is some content to analyse or if there is some error to handle. All of this will probably be reworked later to handle errors and timeouts directly in the mux. For now, it is the simpler way to handle all of this. --- diff --git a/src/mux_h1.c b/src/mux_h1.c index 178465aa90..94925382a9 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -80,10 +80,6 @@ struct h1c { struct wait_event wait_event; /* To be used if we're waiting for I/Os */ struct h1s *h1s; /* H1 stream descriptor */ - struct task *task; /* timeout management task */ - - int idle_exp; /* expiration date for idle connections, in ticks (client-side only)*/ - int http_exp; /* expiration date for HTTP headers parsing (client-side only) */ }; /* H1 stream descriptor */ @@ -108,7 +104,6 @@ struct h1s { static struct pool_head *pool_head_h1c; static struct pool_head *pool_head_h1s; -static struct task *h1_timeout_task(struct task *t, void *context, unsigned short state); static int h1_recv(struct h1c *h1c); static int h1_send(struct h1c *h1c); static int h1_process(struct h1c *h1c); @@ -225,13 +220,36 @@ static int h1_avail_streams(struct connection *conn) /*****************************************************************/ /* functions below are dedicated to the mux setup and management */ /*****************************************************************/ +static struct conn_stream *h1s_new_cs(struct h1s *h1s) +{ + struct conn_stream *cs; + + cs = cs_new(h1s->h1c->conn); + if (!cs) + goto err; + h1s->cs = cs; + cs->ctx = h1s; + + if (h1s->flags & H1S_F_NOT_FIRST) + cs->flags |= CS_FL_NOT_FIRST; + + if (stream_create_from_cs(cs) < 0) + goto err; + return cs; + + err: + cs_free(cs); + h1s->cs = NULL; + return NULL; +} + static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs) { struct h1s *h1s; h1s = pool_alloc(pool_head_h1s); if (!h1s) - goto end; + goto fail; h1s->h1c = h1c; h1c->h1s = h1s; @@ -252,27 +270,36 @@ static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs) h1s->status = 0; h1s->meth = HTTP_METH_OTHER; + if (h1c->flags & H1C_F_WAIT_NEXT_REQ) + h1s->flags |= H1S_F_NOT_FIRST; + h1c->flags &= ~H1C_F_WAIT_NEXT_REQ; + if (!conn_is_back(h1c->conn)) { if (h1c->px->options2 & PR_O2_REQBUG_OK) h1s->req.err_pos = -1; - - if (h1c->flags & H1C_F_WAIT_NEXT_REQ) - h1s->flags |= H1S_F_NOT_FIRST; - h1c->flags &= ~H1C_F_WAIT_NEXT_REQ; - h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpreq); } else { if (h1c->px->options2 & PR_O2_RSPBUG_OK) h1s->res.err_pos = -1; } - /* If a conn_stream already exists, attach it to this H1S */ if (cs) { + /* If a conn_stream already exists, attach it to this H1S */ cs->ctx = h1s; h1s->cs = cs; } - end: +#if 1 + else { + cs = h1s_new_cs(h1s); + if (!cs) + goto fail; + } +#endif return h1s; + + fail: + pool_free(pool_head_h1s, h1s); + return NULL; } static void h1s_destroy(struct h1s *h1s) @@ -288,10 +315,9 @@ static void h1s_destroy(struct h1s *h1s) if (h1s->send_wait != NULL) h1s->send_wait->wait_reason &= ~SUB_CAN_SEND; - if (!conn_is_back(h1c->conn)) { - h1c->flags |= H1C_F_WAIT_NEXT_REQ; - h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpka); - } + h1c->flags |= H1C_F_WAIT_NEXT_REQ; + if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR)) + h1c->flags |= H1C_F_CS_ERROR; h1_release_buf(h1c, &h1s->rxbuf); cs_free(h1s->cs); @@ -299,29 +325,6 @@ static void h1s_destroy(struct h1s *h1s) } } -static struct conn_stream *h1s_new_cs(struct h1s *h1s) -{ - struct conn_stream *cs; - - cs = cs_new(h1s->h1c->conn); - if (!cs) - goto err; - h1s->cs = cs; - cs->ctx = h1s; - - if (h1s->flags & H1S_F_NOT_FIRST) - cs->flags |= CS_FL_NOT_FIRST; - - if (stream_create_from_cs(cs) < 0) - goto err; - return cs; - - err: - cs_free(cs); - h1s->cs = NULL; - return NULL; -} - /* * Initialize the mux once it's attached. It is expected that conn->mux_ctx * points to the existing conn_stream (for outgoing connections) or NULL (for @@ -330,7 +333,6 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s) static int h1_init(struct connection *conn, struct proxy *proxy) { struct h1c *h1c; - struct task *t = NULL; h1c = pool_alloc(pool_head_h1c); if (!h1c) @@ -343,17 +345,6 @@ static int h1_init(struct connection *conn, struct proxy *proxy) h1c->obuf = BUF_NULL; h1c->h1s = NULL; - t = task_new(tid_bit); - if (!t) - goto fail; - h1c->task = t; - t->process = h1_timeout_task; - t->context = h1c; - t->expire = TICK_ETERNITY; - - h1c->idle_exp = TICK_ETERNITY; - h1c->http_exp = TICK_ETERNITY; - LIST_INIT(&h1c->buf_wait.list); h1c->wait_event.task = tasklet_new(); if (!h1c->wait_event.task) @@ -370,7 +361,6 @@ static int h1_init(struct connection *conn, struct proxy *proxy) goto fail; conn->mux_ctx = h1c; - task_wakeup(t, TASK_WOKEN_INIT); /* Try to read, if nothing is available yet we'll just subscribe */ if (h1_recv(h1c)) @@ -380,9 +370,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy) return 0; fail: - if (t) - task_free(t); - if (h1c && h1c->wait_event.task) + if (h1c->wait_event.task) tasklet_free(h1c->wait_event.task); pool_free(pool_head_h1c, h1c); fail_h1c: @@ -410,11 +398,6 @@ static void h1_release(struct connection *conn) h1_release_buf(h1c, &h1c->ibuf); h1_release_buf(h1c, &h1c->obuf); - if (h1c->task) { - h1c->task->context = NULL; - task_wakeup(h1c->task, TASK_WOKEN_OTHER); - h1c->task = NULL; - } if (h1c->wait_event.task) tasklet_free(h1c->wait_event.task); @@ -438,21 +421,6 @@ static void h1_release(struct connection *conn) /******************************************************/ /* functions below are for the H1 protocol processing */ /******************************************************/ -/* - * Set the appropriate error message. It first tries to get it from the proxy if - * it exists. Otherwise, it falls back on default one. - */ -static void h1_cpy_error_message(struct h1c *h1c, struct buffer *dst, int status) -{ - const int msgnum = http_get_status_idx(status); - const struct buffer *err; - - err = (h1c->px->errmsg[msgnum].area - ? &h1c->px->errmsg[msgnum] - : &http_err_chunks[msgnum]); - b_putblk(dst, b_head(err), b_data(err)); -} - /* Parse the request version and set H1_MF_VER_11 on if the version is * greater or equal to 1.1 */ @@ -1099,22 +1067,22 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count h1s = NULL; - /* Create a new H1S without CS if not already done */ + /* Create a new H1S if not already done */ if (!h1c->h1s && !h1s_create(h1c, NULL)) - goto err; + goto fatal_err; h1s = h1c->h1s; - #if 0 - // FIXME: Use a proxy option to enable early creation of the CS /* Create the CS if not already attached to the H1S */ if (!h1s->cs && !h1s_new_cs(h1s)) - goto err; + goto fatal_err; #endif - + if (!count) + goto end; if (!h1_get_buf(h1c, &h1s->rxbuf)) { h1c->flags |= H1C_F_RX_ALLOC; goto end; } + htx = htx_from_buf(&h1s->rxbuf); if (!conn_is_back(h1c->conn)) { @@ -1132,13 +1100,11 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count ret = h1_process_headers(h1s, h1m, htx, buf, &total, max); if (!ret) break; - - /* Reset request timeout */ - h1s->h1c->http_exp = TICK_ETERNITY; - +#if 0 /* Create the CS if not already attached to the H1S */ if (!h1s->cs && !h1s_new_cs(h1s)) - goto err; + goto fatal_err; +#endif } else if (h1m->state <= H1_MSG_TRAILERS) { /* Do not parse the body if the header part is not yet @@ -1165,59 +1131,41 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count max -= ret; } - if (h1s->flags & errflag) { - if (conn_is_back(h1c->conn)) - goto err; - - // FIXME: Do following actions when an error is catched during - // the request parsing: - // - // * Do same than stream_inc_http_req_ctr, - // stream_inc_http_err_ctr and proxy_inc_fe_req_ctr - // * Capture bad message for snapshots - // * Increment fe->fe_counters.failed_req and - // listeners->counters->failed_req - // - // FIXME: Do following actions when an error is catched during - // the response parsing: - // - // * Capture bad message for snapshots - // * increment be->be_counters.failed_resp - // * increment srv->counters.failed_resp (if srv assigned) - if (!h1_get_buf(h1c, &h1c->obuf)) { - h1c->flags |= H1C_F_OUT_ALLOC; - goto err; - } - h1_cpy_error_message(h1c, &h1c->obuf, 400); - goto err; - } + if (h1s->flags & errflag) + goto parsing_err; b_del(buf, total); - if (htx_is_not_empty(htx)) { b_set_data(&h1s->rxbuf, b_size(&h1s->rxbuf)); if (!htx_free_data_space(htx)) h1c->flags |= H1C_F_RX_FULL; - - if (h1s->recv_wait) { - h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV; - tasklet_wakeup(h1s->recv_wait->task); - h1s->recv_wait = NULL; - } } else h1_release_buf(h1c, &h1s->rxbuf); ret = count - max; - + if (h1s->recv_wait) { + h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(h1s->recv_wait->task); + h1s->recv_wait = NULL; + } end: return ret; - err: - //h1s_destroy(h1s); + fatal_err: h1c->flags |= H1C_F_CS_ERROR; - if (!h1s || !h1s->cs) - sess_log(h1c->conn->owner); + sess_log(h1c->conn->owner); + return 0; + + parsing_err: + // FIXME: create an error snapshot here + b_reset(&h1c->ibuf); + h1s->cs->flags |= CS_FL_REOS; + if (h1s->recv_wait) { + h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(h1s->recv_wait->task); + h1s->recv_wait = NULL; + } return 0; } @@ -1236,6 +1184,8 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun size_t total = 0; int errflag; + if (!count) + goto end; chn_htx = htx_from_buf(buf); if (!h1_get_buf(h1c, &h1c->obuf)) { @@ -1391,7 +1341,6 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun htx_reset(chn_htx); b_set_data(buf, 0); } - end: return total; } @@ -1411,12 +1360,15 @@ static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, int flags) h1m = (!conn_is_back(h1c->conn) ? &h1s->req : &h1s->res); mux_htx = htx_from_buf(&h1s->rxbuf); + chn_htx = htx_from_buf(buf); + if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR)) { + chn_htx->flags |= HTX_FL_PARSING_ERROR; + b_set_data(buf, b_size(buf)); + goto end; + } if (htx_is_empty(mux_htx)) goto end; - - chn_htx = htx_from_buf(buf); - count = htx_free_space(chn_htx); if (flags & CO_RFL_KEEP_RSV) { if (count < global.tune.maxrewrite) @@ -1442,7 +1394,6 @@ static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, int flags) if (htx_is_not_empty(chn_htx)) b_set_data(buf, b_size(buf)); - end: if (h1c->flags & H1C_F_RX_FULL && htx_free_data_space(mux_htx)) { h1c->flags &= ~H1C_F_RX_FULL; @@ -1501,8 +1452,11 @@ static int h1_recv(struct h1c *h1c) h1c->flags &= ~H1C_F_IN_FULL; ret = conn->xprt->rcv_buf(conn, &h1c->ibuf, max, 0); } - if (ret > 0) + if (ret > 0) { rcvd = 1; + if (h1c->h1s && h1c->h1s->cs) + h1c->h1s->cs->flags |= CS_FL_READ_PARTIAL; + } if (h1_recv_allowed(h1c)) conn->xprt->subscribe(conn, SUB_CAN_RECV, &h1c->wait_event); @@ -1609,7 +1563,7 @@ static int h1_process(struct h1c * h1c) { struct connection *conn = h1c->conn; - if (b_data(&h1c->ibuf) && !(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) { + if (!(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) { size_t ret; ret = h1_process_input(h1c, &h1c->ibuf, b_data(&h1c->ibuf)); @@ -1630,7 +1584,7 @@ static int h1_process(struct h1c * h1c) h1c->flags &= ~H1C_F_CS_WAIT_CONN; h1_wake_stream(h1c); } - return 0; + goto end; } if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn)) { @@ -1641,20 +1595,7 @@ static int h1_process(struct h1c * h1c) } } - /* If there is a stream attached to the mux, let it - * handle the timeout. - */ - if (h1c->h1s && h1c->h1s->cs) - h1c->idle_exp = TICK_ETERNITY; - else { - int tout = (!conn_is_back(conn) - ? h1c->px->timeout.client - : h1c->px->timeout.server); - h1c->idle_exp = tick_add_ifset(now_ms, tout); - } - h1c->task->expire = tick_first(h1c->http_exp, h1c->idle_exp); - if (tick_isset(h1c->task->expire)) - task_queue(h1c->task); + end: return 0; } @@ -1677,74 +1618,9 @@ static int h1_wake(struct connection *conn) { struct h1c *h1c = conn->mux_ctx; - //return 0; return (h1_process(h1c)); } - -/* Connection timeout management. The principle is that if there's no receipt - * nor sending for a certain amount of time, the connection is closed. - */ -static struct task *h1_timeout_task(struct task *t, void *context, unsigned short state) -{ - struct h1c *h1c = context; - int expired = tick_is_expired(t->expire, now_ms); - - if (!h1c) - goto end; - - if (!expired) { - t->expire = tick_first(t->expire, tick_first(h1c->idle_exp, h1c->http_exp)); - return t; - } - - h1c->flags |= H1C_F_CS_ERROR; - h1c->idle_exp = TICK_ETERNITY; - h1c->http_exp = TICK_ETERNITY; - t->expire = TICK_ETERNITY; - - /* Don't try send error message on the server-side */ - if (conn_is_back(h1c->conn)) - goto release; - - /* Don't send error message if no input data is pending _AND_ if null - * requests is ignored or it's not the first request. - */ - if (!b_data(&h1c->ibuf) && (h1c->px->options & PR_O_IGNORE_PRB || - h1c->flags & H1C_F_WAIT_NEXT_REQ)) - goto release; - - /* Try to allocate output buffer to store the error message. If - * allocation fails, just go away. - */ - if (!h1_get_buf(h1c, &h1c->obuf)) - goto release; - - // FIXME: Do the following: - // - // * Do same than stream_inc_http_req_ctr, - // stream_inc_http_err_ctr and proxy_inc_fe_req_ctr - // * Capture bad message for snapshots - // * Increment fe->fe_counters.failed_req and - // listeners->counters->failed_req - h1_cpy_error_message(h1c, &h1c->obuf, 408); - tasklet_wakeup(h1c->wait_event.task); - sess_log(h1c->conn->owner); - return t; - - release: - if (h1c->h1s) { - tasklet_wakeup(h1c->wait_event.task); - return t; - } - h1c->task = NULL; - h1_release(h1c->conn); - end: - task_delete(t); - task_free(t); - return NULL; -} - /*******************************************/ /* functions below are used by the streams */ /*******************************************/ diff --git a/src/proto_htx.c b/src/proto_htx.c index 382df6cbba..c16fc1e294 100644 --- a/src/proto_htx.c +++ b/src/proto_htx.c @@ -117,6 +117,16 @@ int htx_wait_for_request(struct stream *s, struct channel *req, int an_bit) * a bad request is. */ if (unlikely(htx_is_empty(htx) || htx_get_tail_type(htx) < HTX_BLK_EOH)) { + /* + * First catch invalid request + */ + if (htx->flags & HTX_FL_PARSING_ERROR) { + stream_inc_http_req_ctr(s); + stream_inc_http_err_ctr(s); + proxy_inc_fe_req_ctr(sess->fe); + goto return_bad_req; + } + /* 1: have we encountered a read error ? */ if (req->flags & CF_READ_ERROR) { if (!(s->flags & SF_ERR_MASK)) @@ -217,8 +227,7 @@ int htx_wait_for_request(struct stream *s, struct channel *req, int an_bit) setsockopt(__objt_conn(sess->origin)->handle.fd, IPPROTO_TCP, TCP_QUICKACK, &one, sizeof(one)); } #endif - - if ((msg->msg_state != HTTP_MSG_RQBEFORE) && (txn->flags & TX_WAIT_NEXT_RQ)) { + if ((req->flags & CF_READ_PARTIAL) && (txn->flags & TX_WAIT_NEXT_RQ)) { /* If the client starts to talk, let's fall back to * request timeout processing. */ @@ -228,9 +237,7 @@ int htx_wait_for_request(struct stream *s, struct channel *req, int an_bit) /* just set the request timeout once at the beginning of the request */ if (!tick_isset(req->analyse_exp)) { - if ((msg->msg_state == HTTP_MSG_RQBEFORE) && - (txn->flags & TX_WAIT_NEXT_RQ) && - tick_isset(s->be->timeout.httpka)) + if ((txn->flags & TX_WAIT_NEXT_RQ) && tick_isset(s->be->timeout.httpka)) req->analyse_exp = tick_add(now_ms, s->be->timeout.httpka); else req->analyse_exp = tick_add_ifset(now_ms, s->be->timeout.httpreq); @@ -1091,6 +1098,9 @@ int htx_wait_for_request_body(struct stream *s, struct channel *req, int an_bit) goto http_end; missing_data: + if (htx->flags & HTX_FL_PARSING_ERROR) + goto return_bad_req; + if ((req->flags & CF_READ_TIMEOUT) || tick_is_expired(req->analyse_exp, now_ms)) { txn->status = 408; htx_reply_and_close(s, txn->status, http_error_message(s)); @@ -1305,6 +1315,8 @@ int htx_request_forward_body(struct stream *s, struct channel *req, int an_bit) if (req->flags & CF_SHUTW) goto aborted_xfer; + if (htx->flags & HTX_FL_PARSING_ERROR) + goto return_bad_req; /* When TE: chunked is used, we need to get there again to parse remaining * chunks even if the client has closed, so we don't want to set CF_DONTCLOSE. @@ -1438,6 +1450,12 @@ int htx_wait_for_response(struct stream *s, struct channel *rep, int an_bit) * errors somewhere else. */ if (unlikely(htx_is_empty(htx) || htx_get_tail_type(htx) < HTX_BLK_EOH)) { + /* + * First catch invalid response + */ + if (htx->flags & HTX_FL_PARSING_ERROR) + goto return_bad_res; + /* 1: have we encountered a read error ? */ if (rep->flags & CF_READ_ERROR) { if (txn->flags & TX_NOT_FIRST) @@ -1704,6 +1722,23 @@ int htx_wait_for_response(struct stream *s, struct channel *rep, int an_bit) channel_auto_close(rep); return 1; + return_bad_res: + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); + if (objt_server(s->target)) { + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1); + health_adjust(objt_server(s->target), HANA_STATUS_HTTP_HDRRSP); + } + txn->status = 502; + s->si[1].flags |= SI_FL_NOLINGER; + htx_reply_and_close(s, txn->status, http_error_message(s)); + rep->analysers &= AN_RES_FLT_END; + + if (!(s->flags & SF_ERR_MASK)) + s->flags |= SF_ERR_PRXCOND; + if (!(s->flags & SF_FINST_MASK)) + s->flags |= SF_FINST_H; + return 0; + abort_keep_alive: /* A keep-alive request to the server failed on a network error. * The client is required to retry. We need to close without returning @@ -2145,6 +2180,9 @@ int htx_response_forward_body(struct stream *s, struct channel *res, int an_bit) if (res->flags & CF_SHUTW) goto aborted_xfer; + if (htx->flags & HTX_FL_PARSING_ERROR) + goto return_bad_res; + /* stop waiting for data if the input is closed before the end. If the * client side was already closed, it means that the client has aborted, * so we don't want to count this as a server abort. Otherwise it's a