From: Willy Tarreau Date: Fri, 11 Oct 2013 17:34:20 +0000 (+0200) Subject: MAJOR: stream interface: dynamically allocate the outgoing connection X-Git-Tag: v1.5-dev20~96 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=32e3c6a6079d15abed4075308afddaf2590f627d;p=thirdparty%2Fhaproxy.git MAJOR: stream interface: dynamically allocate the outgoing connection The outgoing connection is now allocated dynamically upon the first attempt to touch the connection's source or destination address. If this allocation fails, we fail on SN_ERR_RESOURCE. As we didn't use si->conn anymore, it was removed. The endpoints are released upon session_free(), on the error path, and upon a new transaction. That way we are able to carry the existing server's address across retries. The stream interfaces are not initialized anymore before session_complete(), so we could even think about allocating them dynamically as well, though that would not provide much savings. The session initialization now makes use of conn_new()/conn_free(). This slightly simplifies the code and makes it more logical. The connection initialization code is now shorter by about 120 bytes because it's done at once, allowing the compiler to remove all redundant initializations. The si_attach_applet() function now takes care of first detaching the existing endpoint, and it is called from stream_int_register_handler(), so we can safely remove the calls to si_release_endpoint() in the application code around this call. A call to si_detach() was made upon stream_int_unregister_handler() to ensure we always free the allocated connection if one was allocated in parallel to setting an applet (eg: detect HTTP proxy while proceeding with stats maybe). --- diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 511099cd17..2c6964982e 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -70,13 +70,24 @@ static inline void si_set_state(struct stream_interface *si, int state) si->state = si->prev_state = state; } -static inline void si_detach(struct stream_interface *si) +/* release the endpoint if it's a connection, then nullify it */ +static inline void si_release_endpoint(struct stream_interface *si) { - si->ops = &si_embedded_ops; + struct connection *conn; + + conn = objt_conn(si->end); + if (conn) + pool_free2(pool2_connection, conn); si->end = NULL; si->appctx.applet = NULL; } +static inline void si_detach(struct stream_interface *si) +{ + si_release_endpoint(si); + si->ops = &si_embedded_ops; +} + /* Attach connection to the stream interface . The stream interface * is configured to work with a connection and the connection it configured * with a stream interface data layer. @@ -90,6 +101,7 @@ static inline void si_attach_conn(struct stream_interface *si, struct connection static inline void si_attach_applet(struct stream_interface *si, struct si_applet *applet) { + si_release_endpoint(si); si->ops = &si_embedded_ops; si->appctx.applet = applet; si->appctx.obj_type = OBJ_TYPE_APPCTX; @@ -129,6 +141,28 @@ static inline void si_applet_release(struct stream_interface *si) applet->release(si); } +/* Returns the stream interface's existing connection if one such already + * exists, or tries to allocate and initialize a new one which is then + * assigned to the stream interface. + */ +static inline struct connection *si_alloc_conn(struct stream_interface *si) +{ + struct connection *conn; + + /* we return the connection whether it's a real connection or NULL + * in case another entity (an applet) is registered instead. + */ + conn = objt_conn(si->end); + if (si->end) + return conn; + + conn = conn_new(); + if (conn) + si_attach_conn(si, conn); + + return conn; +} + /* Sends a shutr to the connection using the data layer */ static inline void si_shutr(struct stream_interface *si) { diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index bb3eb180ac..47bfc54a78 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -164,7 +164,6 @@ struct stream_interface { unsigned int err_type; /* first error detected, one of SI_ET_* */ enum obj_type *end; /* points to the end point (connection or appctx) */ - struct connection *conn; /* pre-allocated connection */ struct si_ops *ops; /* general operations at the stream interface layer */ /* struct members below are the "remote" part, as seen from the buffer side */ diff --git a/src/backend.c b/src/backend.c index a06a332a2a..fdfbb9bb96 100644 --- a/src/backend.c +++ b/src/backend.c @@ -708,14 +708,14 @@ int assign_server(struct session *s) * Upon successful return, the session flag SN_ADDR_SET is set. This flag is * not cleared, so it's to the caller to clear it if required. * - * The address is set on si->conn only. This connection is expected to be - * already allocated and initialized. + * The caller is responsible for having already assigned a connection + * to si->end. * */ int assign_server_address(struct session *s) { struct connection *cli_conn = objt_conn(s->req->prod->end); - struct connection *srv_conn = s->req->cons->conn; + struct connection *srv_conn = objt_conn(s->req->cons->end); #ifdef DEBUG_FULL fprintf(stderr,"assign_server_address : s=%p\n",s); @@ -907,7 +907,7 @@ int assign_server_and_queue(struct session *s) /* If an explicit source binding is specified on the server and/or backend, and * this source makes use of the transparent proxy, then it is extracted now and * assigned to the session's pending connection. This function assumes that an - * outgoing connection has already been allocated into s->req->cons->conn. + * outgoing connection has already been assigned to s->req->cons->end. */ static void assign_tproxy_address(struct session *s) { @@ -915,7 +915,7 @@ static void assign_tproxy_address(struct session *s) struct server *srv = objt_server(s->target); struct conn_src *src; struct connection *cli_conn; - struct connection *srv_conn = s->req->cons->conn; + struct connection *srv_conn = objt_conn(s->req->cons->end); if (srv && srv->conn_src.opts & CO_SRC_BIND) src = &srv->conn_src; @@ -982,10 +982,13 @@ static void assign_tproxy_address(struct session *s) int connect_server(struct session *s) { struct connection *cli_conn; - struct connection *srv_conn = s->req->cons->conn; + struct connection *srv_conn = si_alloc_conn(s->req->cons); struct server *srv; int err; + if (!srv_conn) + return SN_ERR_RESOURCE; + if (!(s->flags & SN_ADDR_SET)) { err = assign_server_address(s); if (err != SRV_STATUS_OK) diff --git a/src/dumpstats.c b/src/dumpstats.c index b3d49bfe18..9b1f2fb743 100644 --- a/src/dumpstats.c +++ b/src/dumpstats.c @@ -156,12 +156,6 @@ extern const char *stat_status_codes[]; */ static int stats_accept(struct session *s) { - /* we have a dedicated I/O handler for the CLI/stats, so we can safely - * release the pre-allocated connection that we will never use. - */ - pool_free2(pool2_connection, s->si[1].conn); - s->si[1].conn = NULL; - stream_int_register_handler(&s->si[1], &cli_applet); s->target = &cli_applet.obj_type; // for logging only s->si[1].appctx.st1 = 0; @@ -4034,13 +4028,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se http_msg_state_str(sess->txn.req.msg_state), http_msg_state_str(sess->txn.rsp.msg_state)); chunk_appendf(&trash, - " si[0]=%p (state=%s flags=0x%02x endp0=%s:%p conn0=%p exp=%s, et=0x%03x)\n", + " si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n", &sess->si[0], si_state_str(sess->si[0].state), sess->si[0].flags, obj_type_name(sess->si[0].end), obj_base_ptr(sess->si[0].end), - sess->si[0].conn, sess->si[0].exp ? tick_is_expired(sess->si[0].exp, now_ms) ? "" : human_time(TICKS_TO_MS(sess->si[0].exp - now_ms), @@ -4048,13 +4041,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se sess->si[0].err_type); chunk_appendf(&trash, - " si[1]=%p (state=%s flags=0x%02x endp1=%s:%p conn1=%p exp=%s, et=0x%03x)\n", + " si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s, et=0x%03x)\n", &sess->si[1], si_state_str(sess->si[1].state), sess->si[1].flags, obj_type_name(sess->si[1].end), obj_base_ptr(sess->si[1].end), - sess->si[1].conn, sess->si[1].exp ? tick_is_expired(sess->si[1].exp, now_ms) ? "" : human_time(TICKS_TO_MS(sess->si[1].exp - now_ms), diff --git a/src/peers.c b/src/peers.c index 33f8eaa6a2..0cedaad885 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1083,12 +1083,6 @@ static void peer_session_forceshutdown(struct session * session) */ int peer_accept(struct session *s) { - /* we have a dedicated I/O handler for the peers, so we can safely - * release the pre-allocated connection that we will never use. - */ - pool_free2(pool2_connection, s->si[1].conn); - s->si[1].conn = NULL; - stream_int_register_handler(&s->si[1], &peer_applet); s->target = &peer_applet.obj_type; // for logging only s->si[1].appctx.ctx.peers.ptr = s; @@ -1122,15 +1116,13 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio struct session *s; struct http_txn *txn; struct task *t; + struct connection *conn; if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */ Alert("out of memory in peer_session_create().\n"); goto out_close; } - if (unlikely((s->si[1].conn = pool_alloc2(pool2_connection)) == NULL)) - goto out_fail_conn1; - LIST_ADDQ(&sessions, &s->list); LIST_INIT(&s->back_refs); @@ -1151,7 +1143,6 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio t->context = s; t->nice = l->nice; - memcpy(&s->si[1].conn->addr.to, &peer->addr, sizeof(s->si[1].conn->addr.to)); s->task = t; s->listener = l; @@ -1163,7 +1154,6 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio s->req = s->rep = NULL; /* will be allocated later */ - s->si[0].conn = NULL; si_reset(&s->si[0], t); si_set_state(&s->si[0], SI_ST_EST); @@ -1183,15 +1173,19 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio if (s->be->options2 & PR_O2_INDEPSTR) s->si[1].flags |= SI_FL_INDEP_STR; - /* will automatically prepare the stream interface to connect to the + /* automatically prepare the stream interface to connect to the * pre-initialized connection in si->conn. */ - conn_init(s->si[1].conn); - conn_prepare(s->si[1].conn, peer->proto, peer->xprt); - si_attach_conn(&s->si[1], s->si[1].conn); + if (unlikely((conn = conn_new()) == NULL)) + goto out_fail_conn1; + + conn_prepare(conn, peer->proto, peer->xprt); + si_attach_conn(&s->si[1], conn); + + conn->target = s->target = &s->be->obj_type; + memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to)); session_init_srv_conn(s); - s->si[1].conn->target = s->target = &s->be->obj_type; s->pend_pos = NULL; /* init store persistence */ @@ -1302,11 +1296,11 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio out_fail_req_buf: pool_free2(pool2_channel, s->req); out_fail_req: + conn_free(conn); + out_fail_conn1: task_free(t); out_free_session: LIST_DEL(&s->list); - pool_free2(pool2_connection, s->si[1].conn); - out_fail_conn1: pool_free2(pool2_session, s); out_close: return s; diff --git a/src/proto_http.c b/src/proto_http.c index 70b3a13c09..64de612d48 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -3723,7 +3723,22 @@ int http_process_request(struct session *s, struct channel *req, int an_bit) * allocated on the server side. */ if ((s->be->options & PR_O_HTTP_PROXY) && !(s->flags & SN_ADDR_SET)) { - url2sa(req->buf->p + msg->sl.rq.u, msg->sl.rq.u_l, &s->req->cons->conn->addr.to); + struct connection *conn; + + if (unlikely((conn = si_alloc_conn(req->cons)) == NULL)) { + txn->req.msg_state = HTTP_MSG_ERROR; + txn->status = 500; + req->analysers = 0; + stream_int_retnclose(req->prod, http_error_message(s, HTTP_ERR_500)); + + if (!(s->flags & SN_ERR_MASK)) + s->flags |= SN_ERR_RESOURCE; + if (!(s->flags & SN_FINST_MASK)) + s->flags |= SN_FINST_R; + + return 0; + } + url2sa(req->buf->p + msg->sl.rq.u, msg->sl.rq.u_l, &conn->addr.to); } /* @@ -4291,11 +4306,8 @@ void http_end_txn_clean_session(struct session *s) s->target = NULL; - /* reinitialize the connection to the server */ - conn_init(s->req->cons->conn); - s->req->cons->state = s->req->cons->prev_state = SI_ST_INI; - s->req->cons->end = NULL; + si_release_endpoint(s->req->cons); s->req->cons->err_type = SI_ET_NONE; s->req->cons->conn_retries = 0; /* used for logging too */ s->req->cons->exp = TICK_ETERNITY; diff --git a/src/session.c b/src/session.c index 7218dae4a6..3122380a37 100644 --- a/src/session.c +++ b/src/session.c @@ -81,14 +81,18 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) ret = -1; /* assume unrecoverable error by default */ - if (unlikely((s = pool_alloc2(pool2_session)) == NULL)) + if (unlikely((cli_conn = conn_new()) == NULL)) goto out_close; - if (unlikely((cli_conn = s->si[0].conn = pool_alloc2(pool2_connection)) == NULL)) - goto out_fail_conn0; + conn_prepare(cli_conn, l->proto, l->xprt); - if (unlikely((s->si[1].conn = pool_alloc2(pool2_connection)) == NULL)) - goto out_fail_conn1; + cli_conn->t.sock.fd = cfd; + cli_conn->addr.from = *addr; + cli_conn->flags |= CO_FL_ADDR_FROM_SET; + cli_conn->target = &l->obj_type; + + if (unlikely((s = pool_alloc2(pool2_session)) == NULL)) + goto out_free_conn; /* minimum session initialization required for an embryonic session is * fairly low. We need very little to execute L4 ACLs, then we need a @@ -105,16 +109,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) s->listener = l; s->fe = p; - /* OK, we're keeping the session, so let's properly initialize the session. - * We first have to initialize the client-side connection. - */ - conn_init(cli_conn); - cli_conn->t.sock.fd = cfd; - cli_conn->flags |= CO_FL_ADDR_FROM_SET; - conn_prepare(cli_conn, l->proto, l->xprt); - cli_conn->addr.from = *addr; - cli_conn->target = &l->obj_type; - /* On a mini-session, the connection is directly attached to the * session's target so that we don't need to initialize the stream * interfaces. Another benefit is that it's easy to detect a mini- @@ -123,11 +117,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) */ s->target = &cli_conn->obj_type; - /* The server side is not used yet, but just initialize it to avoid - * confusing some debugging or "show sess" for example. - */ - s->si[1].conn->obj_type = OBJ_TYPE_NONE; - s->logs.accept_date = date; /* user-visible date for logging */ s->logs.tv_accept = now; /* corrected date for internal use */ s->uniq_id = totalconn; @@ -185,7 +174,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) goto out_free_session; } - /* wait for a PROXY protocol header */ if (l->options & LI_O_ACC_PROXY) { cli_conn->flags |= CO_FL_ACCEPT_PROXY; @@ -239,13 +227,11 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) p->feconn--; if (s->stkctr[0].entry || s->stkctr[1].entry) session_store_counters(s); - pool_free2(pool2_connection, s->si[1].conn); - out_fail_conn1: - s->si[0].conn->flags &= ~CO_FL_XPRT_TRACKED; - conn_xprt_close(s->si[0].conn); - pool_free2(pool2_connection, s->si[0].conn); - out_fail_conn0: pool_free2(pool2_session, s); + out_free_conn: + cli_conn->flags &= ~CO_FL_XPRT_TRACKED; + conn_xprt_close(cli_conn); + conn_free(cli_conn); out_close: if (ret < 0 && l->xprt == &raw_sock && p->mode == PR_MODE_HTTP) { /* critical error, no more memory, try to emit a 500 response */ @@ -354,8 +340,7 @@ static void kill_mini_session(struct session *s) task_delete(s->task); task_free(s->task); - pool_free2(pool2_connection, s->si[1].conn); - pool_free2(pool2_connection, s->si[0].conn); + pool_free2(pool2_connection, conn); pool_free2(pool2_session, s); } @@ -425,17 +410,6 @@ int session_complete(struct session *s) LIST_ADDQ(&sessions, &s->list); LIST_INIT(&s->back_refs); - /* attach the incoming connection to the stream interface now */ - si_reset(&s->si[0], t); - si_set_state(&s->si[0], SI_ST_EST); - - if (likely(s->fe->options2 & PR_O2_INDEPSTR)) - s->si[0].flags |= SI_FL_INDEP_STR; - - s->si[0].conn = conn; - conn_prepare(conn, l->proto, l->xprt); - si_attach_conn(&s->si[0], conn); - s->flags |= SN_INITIALIZED; s->unique_id = NULL; @@ -470,12 +444,20 @@ int session_complete(struct session *s) s->stkctr[i].table->data_arg[STKTABLE_DT_SESS_RATE].u, 1); } + /* this part should be common with other protocols */ + si_reset(&s->si[0], t); + si_set_state(&s->si[0], SI_ST_EST); + + /* attach the incoming connection to the stream interface now */ + si_attach_conn(&s->si[0], conn); + + if (likely(s->fe->options2 & PR_O2_INDEPSTR)) + s->si[0].flags |= SI_FL_INDEP_STR; + /* pre-initialize the other side's stream interface to an INIT state. The * callbacks will be initialized before attempting to connect. */ si_reset(&s->si[1], t); - conn_init(s->si[1].conn); - s->si[1].conn->target = NULL; si_detach(&s->si[1]); if (likely(s->fe->options2 & PR_O2_INDEPSTR)) @@ -673,8 +655,8 @@ static void session_free(struct session *s) bref->ref = s->list.n; } LIST_DEL(&s->list); - pool_free2(pool2_connection, s->si[1].conn); - pool_free2(pool2_connection, s->si[0].conn); + si_release_endpoint(&s->si[1]); + si_release_endpoint(&s->si[0]); pool_free2(pool2_session, s); /* We may want to free the maximum amount of pools if the proxy is stopping */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 00b28e6077..69902d74f0 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -364,8 +364,8 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_ */ void stream_int_unregister_handler(struct stream_interface *si) { + si_detach(si); si->owner = NULL; - si->end = NULL; } /* This callback is used to send a valid PROXY protocol line to a socket being