From: Willy Tarreau Date: Mon, 3 Nov 2008 05:26:53 +0000 (+0100) Subject: [MAJOR] add a connection error state to the stream_interface X-Git-Tag: v1.3.16-rc1~147 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cff6411f9;p=thirdparty%2Fhaproxy.git [MAJOR] add a connection error state to the stream_interface Tracking connection status changes was hard, and some code was redundant. A new SI_ST_CER state was added to the stream interface to indicate a past connection error, and an SI_FL_ERR flag was added to report past I/O error. The stream_sock code does not set the connection to SI_ST_CLO anymore in case of I/O error, it's the upper layer which does it. This makes it possible to know exactly when the file descriptors are allocated. The new SI_ST_CER state permitted to split tcp_connection_status() in two parts, one processing SI_ST_CON and the other one SI_ST_CER. Synchronous connection errors now make use of this last state, hence eliminating duplicate code. Some ib<->ob copy paste errors were found and fixed, and all entities setting SI_ST_CLO also shut the buffers down. Some of these stream_interface specific functions and structures have migrated to a new stream_interface.c file. Some types of errors are still not detected by the buffers. For instance, let's assume the following scenario in one single pass of process_session: a connection sits in SI_ST_TAR state during a retry. At TAR expiration, a new connection attempt is made, the connection is obtained and srv->cur_sess is increased. Then the buffer timeout is fires and everything is cleared, the new state becomes SI_ST_CLO. The cleaning code checks that previous state was either SI_ST_CON or SI_ST_EST to release the connection. But that's wrong because last state is still SI_ST_TAR. So the server's connection count does not get decreased. This means that prev_state must not be used, and must be replaced by some transition detection instead of level detection. The following debugging line was useful to track state changes : fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags); --- diff --git a/Makefile b/Makefile index bf172f673f..1cdeac1256 100644 --- a/Makefile +++ b/Makefile @@ -452,7 +452,7 @@ OBJS = src/haproxy.o src/sessionhash.o src/base64.o src/protocols.o \ src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \ src/checks.o src/queue.o src/client.o src/proxy.o src/proto_uxst.o \ src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \ - src/senddata.o src/dumpstats.o src/proto_tcp.o \ + src/stream_interface.o src/senddata.o src/dumpstats.o src/proto_tcp.o \ src/session.o src/hdr_idx.o src/ev_select.o \ src/acl.o src/memory.o \ src/ebtree.o src/eb32tree.o diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h new file mode 100644 index 0000000000..9967a6e331 --- /dev/null +++ b/include/proto/stream_interface.h @@ -0,0 +1,42 @@ +/* + include/proto/stream_interface.h + This file contains stream_interface function prototypes + + Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation, version 2.1 + exclusively. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef _PROTO_STREAM_INTERFACE_H +#define _PROTO_STREAM_INTERFACE_H + +#include + +#include +#include + + +/* main event functions used to move data between sockets and buffers */ +void stream_int_check_timeouts(struct stream_interface *si); +void stream_int_report_error(struct stream_interface *si); + +#endif /* _PROTO_STREAM_INTERFACE_H */ + +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * End: + */ diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h index 7a2dd3d528..ddad638254 100644 --- a/include/proto/stream_sock.h +++ b/include/proto/stream_sock.h @@ -37,7 +37,6 @@ int stream_sock_data_update(int fd); int stream_sock_data_finish(int fd); int stream_sock_shutr(struct stream_interface *si); int stream_sock_shutw(struct stream_interface *si); -int stream_sock_check_timeouts(struct stream_interface *si); /* This either returns the sockname or the original destination address. Code diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index 9e5e4d95bd..fbd2ab06c4 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -38,8 +38,9 @@ enum { SI_ST_TAR, /* interface in turn-around state after failed connect attempt */ SI_ST_ASS, /* server just assigned to this interface */ SI_ST_CON, /* initiated connection request (resource exists) */ + SI_ST_CER, /* previous connection attempt failed (resource released) */ SI_ST_EST, /* connection established (resource exists) */ - SI_ST_CLO, /* stream interface closed, might not existing anymore */ + SI_ST_CLO, /* stream intf closed, might not existing anymore. Buffers shut. */ }; /* error types reported on the streams interface for more accurate reporting */ @@ -61,6 +62,7 @@ enum { enum { SI_FL_NONE = 0x0000, /* nothing */ SI_FL_EXP = 0x0001, /* timeout has expired */ + SI_FL_ERR = 0x0002, /* a non-recoverable error has occurred */ }; struct stream_interface { diff --git a/src/proto_http.c b/src/proto_http.c index b4e7f83017..6f2dab3cf1 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -50,6 +50,7 @@ #include #include #include +#include #include #include @@ -205,7 +206,8 @@ fd_set url_encode_map[(sizeof(fd_set) > (256/8)) ? 1 : ((256/8) / sizeof(fd_set) #error "Check if your OS uses bitfields for fd_sets" #endif -int tcp_connection_status(struct session *t); +int sess_update_st_con_tcp(struct session *s, struct stream_interface *si); +int sess_update_st_cer(struct session *s, struct stream_interface *si); void init_proto_http() { @@ -649,6 +651,7 @@ http_get_path(struct http_txn *txn) /* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE, SI_ST_TAR. * Other input states are simply ignored. + * Possible output states are SI_ST_CLO, SI_ST_TAR, SI_ST_ASS, SI_ST_REQ, SI_ST_CON. * Flags must have previously been updated for timeouts and other conditions. */ void sess_update_stream_int(struct session *s, struct stream_interface *si) @@ -691,62 +694,23 @@ void sess_update_stream_int(struct session *s, struct stream_interface *si) process_srv_queue(s->srv); /* Failed and not retryable. */ - buffer_shutr(s->rep); - buffer_shutw(s->req); - s->req->flags |= BF_WRITE_ERROR; + buffer_shutr(si->ib); + buffer_shutw(si->ob); + si->ob->flags |= BF_WRITE_ERROR; s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now); si->state = SI_ST_CLO; return; } - /* We are facing a retryable error */ - s->conn_retries--; - if (s->conn_retries < 0) { - /* No retries left, abort */ - - if (!si->err_type) { - si->err_type = SI_ET_CONN_ERR; - si->err_loc = s->srv; - } - - if (s->srv) - s->srv->failed_conns++; - s->be->failed_conns++; - - /* We used to have a free connection slot. Since we'll never use it, - * we have to inform the server that it may be used by another session. - */ - if (may_dequeue_tasks(s->srv, s->be)) - process_srv_queue(s->srv); - - buffer_shutr(s->rep); - buffer_shutw(s->req); - s->req->flags |= BF_WRITE_ERROR; - - si->state = SI_ST_CLO; - return; - } - - /* If the "redispatch" option is set on the backend, we are allowed to - * retry on another server for the last retry. In order to achieve this, - * we must mark the session unassigned, and eventually clear the DIRECT - * bit to ignore any persistence cookie. We won't count a retry nor a - * redispatch yet, because this will depend on what server is selected. + /* We are facing a retryable error, but we don't want to run a + * turn-around now, as the problem is likely a source port + * allocation problem, so we want to retry now. */ - if (s->srv && s->conn_retries == 0 && s->be->options & PR_O_REDISP) { - if (may_dequeue_tasks(s->srv, s->be)) - process_srv_queue(s->srv); - - s->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET); - s->prev_srv = s->srv; - si->state = SI_ST_REQ; - } else { - if (s->srv) - s->srv->retries++; - s->be->retries++; - si->state = SI_ST_ASS; - } + si->state = SI_ST_CER; + si->flags &= ~SI_FL_ERR; + sess_update_st_cer(s, si); + /* now si->state is one of SI_ST_CLO, SI_ST_TAR, SI_ST_ASS, SI_ST_REQ */ return; } else if (si->state == SI_ST_QUE) { @@ -775,7 +739,9 @@ void sess_update_stream_int(struct session *s, struct stream_interface *si) if (s->srv) s->srv->failed_conns++; s->be->failed_conns++; - s->req->flags |= BF_WRITE_TIMEOUT; + buffer_shutr(si->ib); + buffer_shutw(si->ob); + si->ob->flags |= BF_WRITE_TIMEOUT; if (!si->err_type) si->err_type = SI_ET_QUEUE_TO; si->state = SI_ST_CLO; @@ -783,14 +749,14 @@ void sess_update_stream_int(struct session *s, struct stream_interface *si) } /* Connection remains in queue, check if we have to abort it */ - if ((s->req->flags & (BF_READ_ERROR|BF_SHUTW_NOW)) | /* abort requested */ - ((s->req->flags & BF_SHUTR) && /* empty and client stopped */ - (s->req->flags & BF_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) { + if ((si->ob->flags & (BF_READ_ERROR|BF_SHUTW_NOW)) || /* abort requested */ + ((si->ob->flags & BF_SHUTR) && /* empty and client stopped */ + (si->ob->flags & BF_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) { /* give up */ si->exp = TICK_ETERNITY; s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now); - buffer_shutr(s->rep); - buffer_shutw(s->req); + buffer_shutr(si->ib); + buffer_shutw(si->ob); si->err_type |= SI_ET_QUEUE_ABRT; si->state = SI_ST_CLO; return; @@ -801,13 +767,13 @@ void sess_update_stream_int(struct session *s, struct stream_interface *si) } else if (si->state == SI_ST_TAR) { /* Connection request might be aborted */ - if ((s->req->flags & (BF_READ_ERROR|BF_SHUTW_NOW)) | /* abort requested */ - ((s->req->flags & BF_SHUTR) && /* empty and client stopped */ - (s->req->flags & BF_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) { + if ((si->ob->flags & (BF_READ_ERROR|BF_SHUTW_NOW)) || /* abort requested */ + ((si->ob->flags & BF_SHUTR) && /* empty and client stopped */ + (si->ob->flags & BF_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) { /* give up */ si->exp = TICK_ETERNITY; - buffer_shutr(s->rep); - buffer_shutw(s->req); + buffer_shutr(si->ib); + buffer_shutw(si->ob); si->err_type |= SI_ET_CONN_ABRT; si->state = SI_ST_CLO; return; @@ -870,6 +836,8 @@ static void perform_http_redirect(struct session *s, struct stream_interface *si rdr.len += 4; /* prepare to return without error. */ + buffer_shutr(si->ib); + buffer_shutw(si->ob); si->err_type = SI_ET_NONE; si->err_loc = NULL; si->state = SI_ST_CLO; @@ -909,9 +877,9 @@ static void sess_prepare_conn_req(struct session *s, struct stream_interface *si return; /* we did not get any server, let's check the cause */ - buffer_shutr(s->rep); - buffer_shutw(s->req); - s->req->flags |= BF_WRITE_ERROR; + buffer_shutr(si->ib); + buffer_shutw(si->ob); + si->ob->flags |= BF_WRITE_ERROR; if (!si->err_type) si->err_type = SI_ET_CONN_OTHER; si->state = SI_ST_CLO; @@ -975,77 +943,94 @@ void process_session(struct task *t, int *next) unsigned int rqf_cli, rpf_cli; unsigned int rqf_srv, rpf_srv; - /* 1a: Check for lower layer timeouts if needed */ - if (unlikely(t->state & TASK_WOKEN_TIMER)) { - stream_sock_check_timeouts(&s->si[0]); - stream_sock_check_timeouts(&s->si[1]); - } + //DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, + // s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s->rep->flags); - /* 1b: Check for upper layer timeouts if needed */ + /* 1a: Check for low level timeouts if needed. We just set a flag on + * buffers and/or stream interfaces when their timeouts have expired. + */ if (unlikely(t->state & TASK_WOKEN_TIMER)) { + stream_int_check_timeouts(&s->si[0]); + stream_int_check_timeouts(&s->si[1]); + buffer_check_timeouts(s->req); buffer_check_timeouts(s->rep); + } - if (unlikely(s->req->flags & (BF_READ_TIMEOUT|BF_WRITE_TIMEOUT))) { - if (s->req->flags & BF_READ_TIMEOUT) { - buffer_shutw(s->req); - s->req->cons->shutr(s->req->prod); - } - if (s->req->flags & BF_WRITE_TIMEOUT) { - buffer_shutw(s->req); - s->req->cons->shutw(s->req->cons); - } + /* 1b: check for low-level errors reported at the stream interface. + * First we check if it's a retryable error (in which case we don't + * want to tell the buffer). Otherwise we report the error one level + * upper by setting flags into the buffers. Note that the side towards + * the client cannot have connect (hence retryable) errors. + */ + if (unlikely(s->si[0].state == SI_ST_EST)) { + if (s->si[0].flags & SI_FL_ERR) { + s->si[0].state = SI_ST_CLO; + fd_delete(s->si[0].fd); + stream_int_report_error(&s->si[0]); } + } - if (unlikely(s->rep->flags & (BF_READ_TIMEOUT|BF_WRITE_TIMEOUT))) { - if (s->rep->flags & BF_READ_TIMEOUT) { - buffer_shutw(s->rep); - s->rep->cons->shutr(s->rep->prod); - } - if (s->rep->flags & BF_WRITE_TIMEOUT) { - buffer_shutw(s->rep); - s->rep->cons->shutw(s->rep->cons); - } + if (s->si[1].state == SI_ST_EST) { + if (s->si[1].flags & SI_FL_ERR) { + s->si[1].state = SI_ST_CLO; + fd_delete(s->si[1].fd); + stream_int_report_error(&s->si[1]); } - /* Note that we don't check nor indicate if we wake up because - * of a timeout on a stream interface. - */ } + else if (s->si[1].state != SI_ST_INI && s->si[1].state != SI_ST_CLO) { + /* Maybe we were trying to establish a connection on the server side ? */ + if (s->si[1].state == SI_ST_CON) + sess_update_st_con_tcp(s, &s->si[1]); + + if (s->si[1].state == SI_ST_CER) + sess_update_st_cer(s, &s->si[1]); + + /* now try to complete any initiated connection setup */ + if (s->si[1].state >= SI_ST_REQ && s->si[1].state < SI_ST_CON) { + do { + /* nb: step 1 might switch from QUE to ASS, but we first want + * to give a chance to step 2 to perform a redirect if needed. + */ + sess_update_stream_int(s, &s->si[1]); + if (s->si[1].state == SI_ST_REQ) + sess_prepare_conn_req(s, &s->si[1]); - /* Maybe we were trying to establish a connection on the server side ? */ - if (s->si[1].state == SI_ST_CON) - tcp_connection_status(s); - - /* now try to complete any initiated connection setup */ - if (s->si[1].state >= SI_ST_REQ && s->si[1].state < SI_ST_CON) { - do { - /* nb: step 1 might switch from QUE to ASS, but we first want - * to give a chance to step 2 to perform a redirect if needed. - */ - sess_update_stream_int(s, &s->si[1]); - if (s->si[1].state == SI_ST_REQ) - sess_prepare_conn_req(s, &s->si[1]); - - if (s->si[1].state == SI_ST_ASS && s->srv && - s->srv->rdr_len && (s->flags & SN_REDIRECTABLE)) - perform_http_redirect(s, &s->si[1]); + if (s->si[1].state == SI_ST_ASS && s->srv && + s->srv->rdr_len && (s->flags & SN_REDIRECTABLE)) + perform_http_redirect(s, &s->si[1]); - } while (s->si[1].state == SI_ST_ASS); + } while (s->si[1].state == SI_ST_ASS); + } } /* FIXME: we might have got an error above, and we should process them below */ - if (s->si[1].state == SI_ST_CLO && s->si[1].err_type != SI_ET_NONE) + if (s->si[1].state == SI_ST_CLO && s->si[1].prev_state != SI_ST_CLO && + s->si[1].err_type != SI_ET_NONE) return_srv_error(s, s->si[1].err_type); - /* Forward errors from stream interface to buffers */ - if (s->si[0].state == SI_ST_CLO && s->si[0].err_type != SI_ET_NONE) { - s->req->flags |= BF_READ_ERROR; - s->rep->flags |= BF_WRITE_ERROR; + + /* 1c: Manage buffer timeouts. */ + if (unlikely(s->req->flags & (BF_READ_TIMEOUT|BF_WRITE_TIMEOUT))) { + if (s->req->flags & BF_READ_TIMEOUT) { + buffer_shutr(s->req); + s->req->cons->shutr(s->req->prod); + } + if (s->req->flags & BF_WRITE_TIMEOUT) { + buffer_shutw(s->req); + s->req->cons->shutw(s->req->cons); + } } - if (s->si[1].state == SI_ST_CLO && s->si[1].err_type != SI_ET_NONE) { - s->req->flags |= BF_WRITE_ERROR; - s->rep->flags |= BF_READ_ERROR; + if (unlikely(s->rep->flags & (BF_READ_TIMEOUT|BF_WRITE_TIMEOUT))) { + if (s->rep->flags & BF_READ_TIMEOUT) { + buffer_shutr(s->rep); + s->rep->cons->shutr(s->rep->prod); + } + if (s->rep->flags & BF_WRITE_TIMEOUT) { + buffer_shutw(s->rep); + s->rep->cons->shutw(s->rep->cons); + } } /* 2: Check if we need to close the write side. This can only happen @@ -1067,9 +1052,14 @@ void process_session(struct task *t, int *next) /* 3: When a server-side connection is released, we have to * count it and check for pending connections on this server. + * FIXME: the test below is not accurate. An audit is needed + * to find all uncaught transitions. We need a way to ensure + * that shutdowns called right after connect() after TAR will + * correctly be caught for instance. In fact we need a way to + * track when the connection is assigned to the server. */ if (unlikely(s->req->cons->state == SI_ST_CLO && - s->req->cons->prev_state == SI_ST_EST)) { + (s->req->cons->prev_state == SI_ST_EST || s->req->cons->prev_state == SI_ST_CON))) { /* Count server-side errors (but not timeouts). */ if (s->req->flags & BF_WRITE_ERROR) { s->be->failed_resp++; @@ -1128,7 +1118,8 @@ void process_session(struct task *t, int *next) if (s->req->cons->state != SI_ST_CLO) { if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) || ((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) { - if (s->req->cons->state == SI_ST_INI && s->req->flags & BF_WRITE_ENA) { + if (s->req->cons->state == SI_ST_INI && + (s->req->flags & (BF_WRITE_ENA|BF_SHUTW|BF_SHUTW_NOW)) == BF_WRITE_ENA) { s->req->cons->state = SI_ST_REQ; do { sess_prepare_conn_req(s, &s->si[1]); @@ -1282,7 +1273,7 @@ void process_session(struct task *t, int *next) s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE & BF_CLEAR_TIMEOUT; s->si[0].prev_state = s->si[0].state; s->si[1].prev_state = s->si[1].state; - s->si[0].flags = s->si[1].flags = 0; + s->si[0].flags = s->si[1].flags = SI_FL_NONE; /* Trick: if a request is being waiting for the server to respond, * and if we know the server can timeout, we don't want the timeout @@ -3714,176 +3705,201 @@ int process_response(struct session *t) } -/* Return 1 if the pending connection has failed AND should be retried, - * otherwise zero. We may only come here in SI_ST_CON state, which means that - * the socket's file descriptor is known. +/* This function is called with (si->state == SI_ST_CON) meaning that a + * connection was attempted and that the file descriptor is already allocated. + * We must check for establishment, error and abort. Possible output states + * are SI_ST_EST (established), SI_ST_CER (error), SI_ST_CLO (abort), and + * SI_ST_CON (no change). The function returns 0 if it switches to SI_ST_CER, + * otherwise 1. */ -int tcp_connection_status(struct session *t) +int sess_update_st_con_tcp(struct session *s, struct stream_interface *si) { - struct buffer *req = t->req; - struct buffer *rep = t->rep; - int conn_err = 0; + struct buffer *req = si->ob; + struct buffer *rep = si->ib; DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x rql=%d rpl=%d, fds=%d\n", now_ms, __FUNCTION__, - cli_stnames[t->cli_state], + cli_stnames[s->cli_state], rep->rex, req->wex, req->flags, rep->flags, req->l, rep->l, - fdtab[req->cons->fd].state); + fdtab[si->fd].state); + - if ((req->flags & BF_SHUTW_NOW) || - (rep->flags & BF_SHUTW) || - ((req->flags & BF_SHUTR) && /* FIXME: this should not prevent a connection from establishing */ - ((req->flags & BF_EMPTY && !(req->flags & BF_WRITE_ACTIVITY)) || - t->be->options & PR_O_ABRT_CLOSE))) { + /* If we got an error, or if nothing happened and the connection timed + * out, we must give up. The CER state handler will take care of retry + * attempts and error reports. + */ + if (unlikely(si->flags & (SI_FL_EXP|SI_FL_ERR))) { + si->state = SI_ST_CER; + fd_delete(si->fd); + + if (s->srv) { + s->srv->cur_sess--; + sess_change_server(s, NULL); + si->err_loc = s->srv; + } + + if (si->err_type) + return 0; + + if (si->flags & SI_FL_ERR) + si->err_type = SI_ET_CONN_ERR; + else + si->err_type = SI_ET_CONN_TO; + return 0; + } + + /* OK, maybe we want to abort */ + if (unlikely((req->flags & BF_SHUTW_NOW) || + (rep->flags & BF_SHUTW) || + ((req->flags & BF_SHUTR) && /* FIXME: this should not prevent a connection from establishing */ + ((req->flags & BF_EMPTY && !(req->flags & BF_WRITE_ACTIVITY)) || + s->be->options & PR_O_ABRT_CLOSE)))) { /* give up */ - trace_term(t, TT_HTTP_SRV_5); req->wex = TICK_ETERNITY; - fd_delete(req->cons->fd); - if (t->srv) { - t->srv->cur_sess--; - sess_change_server(t, NULL); + fd_delete(si->fd); + if (s->srv) { + s->srv->cur_sess--; + sess_change_server(s, NULL); } buffer_shutw(req); buffer_shutr(rep); - req->cons->state = SI_ST_CLO; - req->cons->err_type |= SI_ET_CONN_ABRT; - req->cons->err_loc = t->srv; - return 0; + si->state = SI_ST_CLO; + si->err_type |= SI_ET_CONN_ABRT; + si->err_loc = s->srv; + return 1; } - /* check for timeouts and asynchronous connect errors */ - if (fdtab[req->cons->fd].state == FD_STERROR) { - conn_err = SI_ET_CONN_ERR; - if (!req->cons->err_type) - req->cons->err_type = SI_ET_CONN_ERR; - } - else if (!(req->flags & BF_WRITE_ACTIVITY)) { - /* nothing happened, maybe we timed out */ - if (req->flags & BF_WRITE_TIMEOUT) { - conn_err = SI_ET_CONN_TO; - if (!req->cons->err_type) - req->cons->err_type = SI_ET_CONN_TO; + /* we need to wait a bit more if there was no activity either */ + if (!(req->flags & BF_WRITE_ACTIVITY)) + return 1; + + /* OK, this means that a connection succeeded */ + s->logs.t_connect = tv_ms_elapsed(&s->logs.tv_accept, &now); + si->state = SI_ST_EST; + si->err_type = SI_ET_NONE; + si->err_loc = NULL; + + if (req->flags & BF_EMPTY) { + EV_FD_CLR(si->fd, DIR_WR); + req->wex = TICK_ETERNITY; + } else { + EV_FD_SET(si->fd, DIR_WR); + req->wex = tick_add_ifset(now_ms, s->be->timeout.server); + if (tick_isset(req->wex)) { + /* FIXME: to prevent the server from expiring read + * timeouts during writes, we refresh it. */ + rep->rex = req->wex; } - else - return 0; /* let's wait a bit more */ } - if (conn_err) { - fd_delete(req->cons->fd); + if (s->be->mode == PR_MODE_TCP) { /* let's allow immediate data connection in this case */ + if (!(rep->flags & BF_HIJACK)) { + EV_FD_SET(si->fd, DIR_RD); + rep->rex = tick_add_ifset(now_ms, s->be->timeout.server); + } + buffer_set_rlim(rep, BUFSIZE); /* no rewrite needed */ - if (t->srv) { - t->srv->cur_sess--; - sess_change_server(t, NULL); - req->cons->err_loc = t->srv; + /* if the user wants to log as soon as possible, without counting + * bytes from the server, then this is the right moment. */ + if (s->fe->to_log && !(s->logs.logwait & LW_BYTES)) { + s->logs.t_close = s->logs.t_connect; /* to get a valid end date */ + tcp_sess_log(s); } +#ifdef CONFIG_HAP_TCPSPLICE + if ((s->fe->options & s->be->options) & PR_O_TCPSPLICE) { + /* TCP splicing supported by both FE and BE */ + tcp_splice_splicefd(req->prod->fd, si->fd, 0); + } +#endif + } + else { + rep->analysers |= AN_RTR_HTTP_HDR; + buffer_set_rlim(rep, BUFSIZE - MAXREWRITE); /* rewrite needed */ + s->txn.rsp.msg_state = HTTP_MSG_RPBEFORE; + /* reset hdr_idx which was already initialized by the request. + * right now, the http parser does it. + * hdr_idx_init(&s->txn.hdr_idx); + */ + } - /* ensure that we have enough retries left */ - t->conn_retries--; - if (t->conn_retries < 0) { - if (!t->req->cons->err_type) { - t->req->cons->err_type = SI_ET_CONN_ERR; - t->req->cons->err_loc = t->srv; - } + rep->flags |= BF_READ_ATTACHED; /* producer is now attached */ + req->wex = TICK_ETERNITY; + return 1; +} - if (t->srv) - t->srv->failed_conns++; - t->be->failed_conns++; - if (may_dequeue_tasks(t->srv, t->be)) - process_srv_queue(t->srv); - req->cons->state = SI_ST_CLO; - return 0; + +/* This function is called with (si->state == SI_ST_CER) meaning that a + * previous connection attempt has failed and that the file descriptor + * has already been released. Possible causes include asynchronous error + * notification and time out. Possible output states are SI_ST_CLO when + * retries are exhausted, SI_ST_TAR when a delay is wanted before a new + * connection attempt, SI_ST_ASS when it's wise to retry on the same server, + * and SI_ST_REQ when an immediate redispatch is wanted. The buffers are + * marked as in error state. It returns 0. + */ +int sess_update_st_cer(struct session *s, struct stream_interface *si) +{ + /* ensure that we have enough retries left */ + s->conn_retries--; + if (s->conn_retries < 0) { + if (!si->err_type) { + si->err_type = SI_ET_CONN_ERR; + si->err_loc = s->srv; } - /* If the "redispatch" option is set on the backend, we are allowed to - * retry on another server for the last retry. In order to achieve this, - * we must mark the session unassigned, and eventually clear the DIRECT - * bit to ignore any persistence cookie. We won't count a retry nor a - * redispatch yet, because this will depend on what server is selected. - */ - if (t->srv && t->conn_retries == 0 && t->be->options & PR_O_REDISP) { - if (may_dequeue_tasks(t->srv, t->be)) - process_srv_queue(t->srv); + if (s->srv) + s->srv->failed_conns++; + s->be->failed_conns++; + if (may_dequeue_tasks(s->srv, s->be)) + process_srv_queue(s->srv); - t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET); - t->prev_srv = t->srv; - req->cons->state = SI_ST_REQ; - } else { - if (t->srv) - t->srv->retries++; - t->be->retries++; - req->cons->state = SI_ST_ASS; - } + buffer_shutw(si->ob); + si->ob->flags |= BF_WRITE_ERROR; - if (conn_err == SI_ET_CONN_ERR) { - /* The error was an immediate connection error, and we - * will likely have to retry connecting to the same - * server, most likely leading to the same result. To - * avoid this, we wait one second before retrying. - */ - req->cons->state = SI_ST_TAR; - req->cons->exp = tick_add(now_ms, MS_TO_TICKS(1000)); - return 0; - } + buffer_shutr(si->ib); + si->ib->flags |= BF_READ_ERROR; - /* We'll rely on the caller to try to get a connection again */ - return 1; + si->state = SI_ST_CLO; + return 0; } - else { - /* no error and write OK : connection succeeded */ - t->logs.t_connect = tv_ms_elapsed(&t->logs.tv_accept, &now); - req->cons->state = SI_ST_EST; - req->cons->err_type = SI_ET_NONE; - req->cons->err_loc = NULL; - - if (req->flags & BF_EMPTY) { - EV_FD_CLR(req->cons->fd, DIR_WR); - req->wex = TICK_ETERNITY; - } else { - EV_FD_SET(req->cons->fd, DIR_WR); - req->wex = tick_add_ifset(now_ms, t->be->timeout.server); - if (tick_isset(req->wex)) { - /* FIXME: to prevent the server from expiring read timeouts during writes, - * we refresh it. */ - rep->rex = req->wex; - } - } - if (t->be->mode == PR_MODE_TCP) { /* let's allow immediate data connection in this case */ - if (!(rep->flags & BF_HIJACK)) { - EV_FD_SET(req->cons->fd, DIR_RD); - rep->rex = tick_add_ifset(now_ms, t->be->timeout.server); - } - buffer_set_rlim(rep, BUFSIZE); /* no rewrite needed */ + /* If the "redispatch" option is set on the backend, we are allowed to + * retry on another server for the last retry. In order to achieve this, + * we must mark the session unassigned, and eventually clear the DIRECT + * bit to ignore any persistence cookie. We won't count a retry nor a + * redispatch yet, because this will depend on what server is selected. + */ + if (s->srv && s->conn_retries == 0 && s->be->options & PR_O_REDISP) { + if (may_dequeue_tasks(s->srv, s->be)) + process_srv_queue(s->srv); - /* if the user wants to log as soon as possible, without counting - bytes from the server, then this is the right moment. */ - if (t->fe->to_log && !(t->logs.logwait & LW_BYTES)) { - t->logs.t_close = t->logs.t_connect; /* to get a valid end date */ - tcp_sess_log(t); - } -#ifdef CONFIG_HAP_TCPSPLICE - if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) { - /* TCP splicing supported by both FE and BE */ - tcp_splice_splicefd(req->prod->fd, req->cons->fd, 0); - } -#endif - } - else { - rep->analysers |= AN_RTR_HTTP_HDR; - buffer_set_rlim(rep, BUFSIZE - MAXREWRITE); /* rewrite needed */ - t->txn.rsp.msg_state = HTTP_MSG_RPBEFORE; - /* reset hdr_idx which was already initialized by the request. - * right now, the http parser does it. - * hdr_idx_init(&t->txn.hdr_idx); - */ - } + s->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET); + s->prev_srv = s->srv; + si->state = SI_ST_REQ; + } else { + if (s->srv) + s->srv->retries++; + s->be->retries++; + si->state = SI_ST_ASS; + } - rep->flags |= BF_READ_ATTACHED; /* producer is now attached */ - req->wex = TICK_ETERNITY; + if (si->flags & SI_FL_ERR) { + /* The error was an asynchronous connection error, and we will + * likely have to retry connecting to the same server, most + * likely leading to the same result. To avoid this, we wait + * one second before retrying. + */ + + if (!si->err_type) + si->err_type = SI_ET_CONN_ERR; + + si->state = SI_ST_TAR; + si->exp = tick_add(now_ms, MS_TO_TICKS(1000)); return 0; } + return 0; } diff --git a/src/stream_interface.c b/src/stream_interface.c new file mode 100644 index 0000000000..8f075ba6de --- /dev/null +++ b/src/stream_interface.c @@ -0,0 +1,67 @@ +/* + * Functions managing stream_interface structures + * + * Copyright 2000-2008 Willy Tarreau + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + * + */ + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/* + * This function only has to be called once after a wakeup event in case of + * suspected timeout. It controls the stream interface timeouts and sets + * si->flags accordingly. It does NOT close anything, as this timeout may + * be used for any purpose. It returns 1 if the timeout fired, otherwise + * zero. + */ +int stream_int_check_timeouts(struct stream_interface *si) +{ + if (tick_is_expired(si->exp, now_ms)) { + si->flags |= SI_FL_EXP; + return 1; + } + return 0; +} + +void stream_int_report_error(struct stream_interface *si) +{ + if (!si->err_type) + si->err_type = SI_ET_DATA_ERR; + + buffer_shutw(si->ob); + si->ob->flags |= BF_WRITE_ERROR; + buffer_shutr(si->ib); + si->ib->flags |= BF_READ_ERROR; +} + +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * End: + */ diff --git a/src/stream_sock.c b/src/stream_sock.c index 3418e985d3..4de51ee2cb 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -259,31 +259,22 @@ int stream_sock_read(int fd) { goto out_wakeup; out_error: - /* There was an error. we must wakeup the task. No need to clear - * the events, the task will do it. + /* Read error on the file descriptor. We mark the FD as STERROR so + * that we don't use it anymore. The error is reported to the stream + * interface which will take proper action. We must not perturbate the + * buffer because the stream interface wants to ensure transparent + * connection retries. */ + fdtab[fd].state = FD_STERROR; fdtab[fd].ev &= ~FD_POLL_STICKY; - b->rex = TICK_ETERNITY; - - /* Read error on the file descriptor. We close the FD and set - * the error on both buffers. - * Note: right now we only support connected sockets. - */ - if (si->state != SI_ST_EST) - goto out_wakeup; - - if (!si->err_type) - si->err_type = SI_ET_DATA_ERR; - - buffer_shutr(b); - b->flags |= BF_READ_ERROR; - buffer_shutw(si->ob); - si->ob->flags |= BF_WRITE_ERROR; + si->flags |= SI_FL_ERR; + goto wakeup_return; do_close_and_return: - fd_delete(fd); si->state = SI_ST_CLO; + fd_delete(fd); + wakeup_return: task_wakeup(si->owner, TASK_WOKEN_IO); return 1; } @@ -457,29 +448,22 @@ int stream_sock_write(int fd) { return retval; out_error: - /* There was an error. we must wakeup the task. No need to clear - * the events, the task will do it. + /* Write error on the file descriptor. We mark the FD as STERROR so + * that we don't use it anymore. The error is reported to the stream + * interface which will take proper action. We must not perturbate the + * buffer because the stream interface wants to ensure transparent + * connection retries. */ + fdtab[fd].state = FD_STERROR; fdtab[fd].ev &= ~FD_POLL_STICKY; - b->wex = TICK_ETERNITY; - /* Read error on the file descriptor. We close the FD and set - * the error on both buffers. - * Note: right now we only support connected sockets. - */ - if (si->state != SI_ST_EST) - goto out_wakeup; - - if (!si->err_type) - si->err_type = SI_ET_DATA_ERR; + si->flags |= SI_FL_ERR; + goto wakeup_return; - buffer_shutw(b); - b->flags |= BF_WRITE_ERROR; - buffer_shutr(si->ib); - si->ib->flags |= BF_READ_ERROR; do_close_and_return: - fd_delete(fd); si->state = SI_ST_CLO; + fd_delete(fd); + wakeup_return: task_wakeup(si->owner, TASK_WOKEN_IO); return 1; } @@ -524,7 +508,7 @@ int stream_sock_shutr(struct stream_interface *si) if (si->state != SI_ST_EST && si->state != SI_ST_CON) return 0; - if (si->ib->flags & BF_SHUTW) { + if (si->ob->flags & BF_SHUTW) { fd_delete(si->fd); si->state = SI_ST_CLO; return 1; @@ -533,22 +517,6 @@ int stream_sock_shutr(struct stream_interface *si) return 0; } -/* - * This function only has to be called once after a wakeup event in case of - * suspected timeout. It controls the stream interface timeouts and sets - * si->flags accordingly. It does NOT close anything, as this timeout may - * be used for any purpose. It returns 1 if the timeout fired, otherwise - * zero. - */ -int stream_sock_check_timeouts(struct stream_interface *si) -{ - if (tick_is_expired(si->exp, now_ms)) { - si->flags |= SI_FL_EXP; - return 1; - } - return 0; -} - /* * Manages a stream_sock connection during its data phase. The buffers are * examined for various cases of shutdown, then file descriptor and buffers'