]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: stream interface: dynamically allocate the outgoing connection
authorWilly Tarreau <w@1wt.eu>
Fri, 11 Oct 2013 17:34:20 +0000 (19:34 +0200)
committerWilly Tarreau <w@1wt.eu>
Mon, 9 Dec 2013 14:40:23 +0000 (15:40 +0100)
The outgoing connection is now allocated dynamically upon the first attempt
to touch the connection's source or destination address. If this allocation
fails, we fail on SN_ERR_RESOURCE.

As we didn't use si->conn anymore, it was removed. The endpoints are released
upon session_free(), on the error path, and upon a new transaction. That way
we are able to carry the existing server's address across retries.

The stream interfaces are not initialized anymore before session_complete(),
so we could even think about allocating them dynamically as well, though
that would not provide much savings.

The session initialization now makes use of conn_new()/conn_free(). This
slightly simplifies the code and makes it more logical. The connection
initialization code is now shorter by about 120 bytes because it's done
at once, allowing the compiler to remove all redundant initializations.

The si_attach_applet() function now takes care of first detaching the
existing endpoint, and it is called from stream_int_register_handler(),
so we can safely remove the calls to si_release_endpoint() in the
application code around this call.

A call to si_detach() was made upon stream_int_unregister_handler() to
ensure we always free the allocated connection if one was allocated in
parallel to setting an applet (eg: detect HTTP proxy while proceeding
with stats maybe).

include/proto/stream_interface.h
include/types/stream_interface.h
src/backend.c
src/dumpstats.c
src/peers.c
src/proto_http.c
src/session.c
src/stream_interface.c

index 511099cd1750f5ba85968cfcd1b2bd904eb4fa22..2c6964982e0c231c4eefd3bf04153d5eddf17209 100644 (file)
@@ -70,13 +70,24 @@ static inline void si_set_state(struct stream_interface *si, int state)
        si->state = si->prev_state = state;
 }
 
-static inline void si_detach(struct stream_interface *si)
+/* release the endpoint if it's a connection, then nullify it */
+static inline void si_release_endpoint(struct stream_interface *si)
 {
-       si->ops = &si_embedded_ops;
+       struct connection *conn;
+
+       conn = objt_conn(si->end);
+       if (conn)
+               pool_free2(pool2_connection, conn);
        si->end = NULL;
        si->appctx.applet = NULL;
 }
 
+static inline void si_detach(struct stream_interface *si)
+{
+       si_release_endpoint(si);
+       si->ops = &si_embedded_ops;
+}
+
 /* Attach connection <conn> to the stream interface <si>. The stream interface
  * is configured to work with a connection and the connection it configured
  * with a stream interface data layer.
@@ -90,6 +101,7 @@ static inline void si_attach_conn(struct stream_interface *si, struct connection
 
 static inline void si_attach_applet(struct stream_interface *si, struct si_applet *applet)
 {
+       si_release_endpoint(si);
        si->ops = &si_embedded_ops;
        si->appctx.applet = applet;
        si->appctx.obj_type = OBJ_TYPE_APPCTX;
@@ -129,6 +141,28 @@ static inline void si_applet_release(struct stream_interface *si)
                applet->release(si);
 }
 
+/* Returns the stream interface's existing connection if one such already
+ * exists, or tries to allocate and initialize a new one which is then
+ * assigned to the stream interface.
+ */
+static inline struct connection *si_alloc_conn(struct stream_interface *si)
+{
+       struct connection *conn;
+
+       /* we return the connection whether it's a real connection or NULL
+        * in case another entity (an applet) is registered instead.
+        */
+       conn = objt_conn(si->end);
+       if (si->end)
+               return conn;
+
+       conn = conn_new();
+       if (conn)
+               si_attach_conn(si, conn);
+
+       return conn;
+}
+
 /* Sends a shutr to the connection using the data layer */
 static inline void si_shutr(struct stream_interface *si)
 {
index bb3eb180ac3060daba1cca3f38e25e6f300e0a69..47bfc54a78f483be0fa8fd75bfe864b460849f4e 100644 (file)
@@ -164,7 +164,6 @@ struct stream_interface {
        unsigned int err_type;  /* first error detected, one of SI_ET_* */
        enum obj_type *end;     /* points to the end point (connection or appctx) */
 
-       struct connection *conn; /* pre-allocated connection */
        struct si_ops *ops;     /* general operations at the stream interface layer */
 
        /* struct members below are the "remote" part, as seen from the buffer side */
index a06a332a2aca29235988444756d71be0c9706625..fdfbb9bb96b99b85312ab34ee409872fcbd4e74b 100644 (file)
@@ -708,14 +708,14 @@ int assign_server(struct session *s)
  * Upon successful return, the session flag SN_ADDR_SET is set. This flag is
  * not cleared, so it's to the caller to clear it if required.
  *
- * The address is set on si->conn only. This connection is expected to be
- * already allocated and initialized.
+ * The caller is responsible for having already assigned a connection
+ * to si->end.
  *
  */
 int assign_server_address(struct session *s)
 {
        struct connection *cli_conn = objt_conn(s->req->prod->end);
-       struct connection *srv_conn = s->req->cons->conn;
+       struct connection *srv_conn = objt_conn(s->req->cons->end);
 
 #ifdef DEBUG_FULL
        fprintf(stderr,"assign_server_address : s=%p\n",s);
@@ -907,7 +907,7 @@ int assign_server_and_queue(struct session *s)
 /* If an explicit source binding is specified on the server and/or backend, and
  * this source makes use of the transparent proxy, then it is extracted now and
  * assigned to the session's pending connection. This function assumes that an
- * outgoing connection has already been allocated into s->req->cons->conn.
+ * outgoing connection has already been assigned to s->req->cons->end.
  */
 static void assign_tproxy_address(struct session *s)
 {
@@ -915,7 +915,7 @@ static void assign_tproxy_address(struct session *s)
        struct server *srv = objt_server(s->target);
        struct conn_src *src;
        struct connection *cli_conn;
-       struct connection *srv_conn = s->req->cons->conn;
+       struct connection *srv_conn = objt_conn(s->req->cons->end);
 
        if (srv && srv->conn_src.opts & CO_SRC_BIND)
                src = &srv->conn_src;
@@ -982,10 +982,13 @@ static void assign_tproxy_address(struct session *s)
 int connect_server(struct session *s)
 {
        struct connection *cli_conn;
-       struct connection *srv_conn = s->req->cons->conn;
+       struct connection *srv_conn = si_alloc_conn(s->req->cons);
        struct server *srv;
        int err;
 
+       if (!srv_conn)
+               return SN_ERR_RESOURCE;
+
        if (!(s->flags & SN_ADDR_SET)) {
                err = assign_server_address(s);
                if (err != SRV_STATUS_OK)
index b3d49bfe1861035ced2760ed6921b8c832ce1233..9b1f2fb743283f9cdbafc1dead584e43aa57a628 100644 (file)
@@ -156,12 +156,6 @@ extern const char *stat_status_codes[];
  */
 static int stats_accept(struct session *s)
 {
-       /* we have a dedicated I/O handler for the CLI/stats, so we can safely
-        * release the pre-allocated connection that we will never use.
-        */
-       pool_free2(pool2_connection, s->si[1].conn);
-       s->si[1].conn = NULL;
-
        stream_int_register_handler(&s->si[1], &cli_applet);
        s->target = &cli_applet.obj_type; // for logging only
        s->si[1].appctx.st1 = 0;
@@ -4034,13 +4028,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
                             http_msg_state_str(sess->txn.req.msg_state), http_msg_state_str(sess->txn.rsp.msg_state));
 
                chunk_appendf(&trash,
-                            "  si[0]=%p (state=%s flags=0x%02x endp0=%s:%p conn0=%p exp=%s, et=0x%03x)\n",
+                            "  si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n",
                             &sess->si[0],
                             si_state_str(sess->si[0].state),
                             sess->si[0].flags,
                             obj_type_name(sess->si[0].end),
                             obj_base_ptr(sess->si[0].end),
-                            sess->si[0].conn,
                             sess->si[0].exp ?
                                     tick_is_expired(sess->si[0].exp, now_ms) ? "<PAST>" :
                                             human_time(TICKS_TO_MS(sess->si[0].exp - now_ms),
@@ -4048,13 +4041,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
                             sess->si[0].err_type);
 
                chunk_appendf(&trash,
-                            "  si[1]=%p (state=%s flags=0x%02x endp1=%s:%p conn1=%p exp=%s, et=0x%03x)\n",
+                            "  si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s, et=0x%03x)\n",
                             &sess->si[1],
                             si_state_str(sess->si[1].state),
                             sess->si[1].flags,
                             obj_type_name(sess->si[1].end),
                             obj_base_ptr(sess->si[1].end),
-                            sess->si[1].conn,
                             sess->si[1].exp ?
                                     tick_is_expired(sess->si[1].exp, now_ms) ? "<PAST>" :
                                             human_time(TICKS_TO_MS(sess->si[1].exp - now_ms),
index 33f8eaa6a246039fb44b214b6fde6db192d755f9..0cedaad885eea8270a789bd1e457306862860bd5 100644 (file)
@@ -1083,12 +1083,6 @@ static void peer_session_forceshutdown(struct session * session)
  */
 int peer_accept(struct session *s)
 {
-       /* we have a dedicated I/O handler for the peers, so we can safely
-        * release the pre-allocated connection that we will never use.
-        */
-       pool_free2(pool2_connection, s->si[1].conn);
-       s->si[1].conn = NULL;
-
        stream_int_register_handler(&s->si[1], &peer_applet);
        s->target = &peer_applet.obj_type; // for logging only
        s->si[1].appctx.ctx.peers.ptr = s;
@@ -1122,15 +1116,13 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
        struct session *s;
        struct http_txn *txn;
        struct task *t;
+       struct connection *conn;
 
        if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */
                Alert("out of memory in peer_session_create().\n");
                goto out_close;
        }
 
-       if (unlikely((s->si[1].conn = pool_alloc2(pool2_connection)) == NULL))
-               goto out_fail_conn1;
-
        LIST_ADDQ(&sessions, &s->list);
        LIST_INIT(&s->back_refs);
 
@@ -1151,7 +1143,6 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
        t->context = s;
        t->nice = l->nice;
 
-       memcpy(&s->si[1].conn->addr.to, &peer->addr, sizeof(s->si[1].conn->addr.to));
        s->task = t;
        s->listener = l;
 
@@ -1163,7 +1154,6 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
 
        s->req = s->rep = NULL; /* will be allocated later */
 
-       s->si[0].conn = NULL;
        si_reset(&s->si[0], t);
        si_set_state(&s->si[0], SI_ST_EST);
 
@@ -1183,15 +1173,19 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
        if (s->be->options2 & PR_O2_INDEPSTR)
                s->si[1].flags |= SI_FL_INDEP_STR;
 
-       /* will automatically prepare the stream interface to connect to the
+       /* automatically prepare the stream interface to connect to the
         * pre-initialized connection in si->conn.
         */
-       conn_init(s->si[1].conn);
-       conn_prepare(s->si[1].conn, peer->proto, peer->xprt);
-       si_attach_conn(&s->si[1], s->si[1].conn);
+       if (unlikely((conn = conn_new()) == NULL))
+               goto out_fail_conn1;
+
+       conn_prepare(conn, peer->proto, peer->xprt);
+       si_attach_conn(&s->si[1], conn);
+
+       conn->target = s->target = &s->be->obj_type;
+       memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
 
        session_init_srv_conn(s);
-       s->si[1].conn->target = s->target = &s->be->obj_type;
        s->pend_pos = NULL;
 
        /* init store persistence */
@@ -1302,11 +1296,11 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
  out_fail_req_buf:
        pool_free2(pool2_channel, s->req);
  out_fail_req:
+       conn_free(conn);
+ out_fail_conn1:
        task_free(t);
  out_free_session:
        LIST_DEL(&s->list);
-       pool_free2(pool2_connection, s->si[1].conn);
- out_fail_conn1:
        pool_free2(pool2_session, s);
  out_close:
        return s;
index 70b3a13c090c028598b92b97095c3e9fbbe2493a..64de612d483db0e7c8bd34ae2172a02e71a64851 100644 (file)
@@ -3723,7 +3723,22 @@ int http_process_request(struct session *s, struct channel *req, int an_bit)
         * allocated on the server side.
         */
        if ((s->be->options & PR_O_HTTP_PROXY) && !(s->flags & SN_ADDR_SET)) {
-               url2sa(req->buf->p + msg->sl.rq.u, msg->sl.rq.u_l, &s->req->cons->conn->addr.to);
+               struct connection *conn;
+
+               if (unlikely((conn = si_alloc_conn(req->cons)) == NULL)) {
+                       txn->req.msg_state = HTTP_MSG_ERROR;
+                       txn->status = 500;
+                       req->analysers = 0;
+                       stream_int_retnclose(req->prod, http_error_message(s, HTTP_ERR_500));
+
+                       if (!(s->flags & SN_ERR_MASK))
+                               s->flags |= SN_ERR_RESOURCE;
+                       if (!(s->flags & SN_FINST_MASK))
+                               s->flags |= SN_FINST_R;
+
+                       return 0;
+               }
+               url2sa(req->buf->p + msg->sl.rq.u, msg->sl.rq.u_l, &conn->addr.to);
        }
 
        /*
@@ -4291,11 +4306,8 @@ void http_end_txn_clean_session(struct session *s)
 
        s->target = NULL;
 
-       /* reinitialize the connection to the server */
-       conn_init(s->req->cons->conn);
-
        s->req->cons->state     = s->req->cons->prev_state = SI_ST_INI;
-       s->req->cons->end = NULL;
+       si_release_endpoint(s->req->cons);
        s->req->cons->err_type  = SI_ET_NONE;
        s->req->cons->conn_retries = 0;  /* used for logging too */
        s->req->cons->exp       = TICK_ETERNITY;
index 7218dae4a6395523ec325f349768c72202d02948..3122380a3792b97d0437e21c9d9ecb9a59ad6f6e 100644 (file)
@@ -81,14 +81,18 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
 
        ret = -1; /* assume unrecoverable error by default */
 
-       if (unlikely((s = pool_alloc2(pool2_session)) == NULL))
+       if (unlikely((cli_conn = conn_new()) == NULL))
                goto out_close;
 
-       if (unlikely((cli_conn = s->si[0].conn = pool_alloc2(pool2_connection)) == NULL))
-               goto out_fail_conn0;
+       conn_prepare(cli_conn, l->proto, l->xprt);
 
-       if (unlikely((s->si[1].conn = pool_alloc2(pool2_connection)) == NULL))
-               goto out_fail_conn1;
+       cli_conn->t.sock.fd = cfd;
+       cli_conn->addr.from = *addr;
+       cli_conn->flags |= CO_FL_ADDR_FROM_SET;
+       cli_conn->target = &l->obj_type;
+
+       if (unlikely((s = pool_alloc2(pool2_session)) == NULL))
+               goto out_free_conn;
 
        /* minimum session initialization required for an embryonic session is
         * fairly low. We need very little to execute L4 ACLs, then we need a
@@ -105,16 +109,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
        s->listener = l;
        s->fe  = p;
 
-       /* OK, we're keeping the session, so let's properly initialize the session.
-        * We first have to initialize the client-side connection.
-        */
-       conn_init(cli_conn);
-       cli_conn->t.sock.fd = cfd;
-       cli_conn->flags |= CO_FL_ADDR_FROM_SET;
-       conn_prepare(cli_conn, l->proto, l->xprt);
-       cli_conn->addr.from = *addr;
-       cli_conn->target = &l->obj_type;
-
        /* On a mini-session, the connection is directly attached to the
         * session's target so that we don't need to initialize the stream
         * interfaces. Another benefit is that it's easy to detect a mini-
@@ -123,11 +117,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
         */
        s->target = &cli_conn->obj_type;
 
-       /* The server side is not used yet, but just initialize it to avoid
-        * confusing some debugging or "show sess" for example.
-        */
-       s->si[1].conn->obj_type = OBJ_TYPE_NONE;
-
        s->logs.accept_date = date; /* user-visible date for logging */
        s->logs.tv_accept = now;  /* corrected date for internal use */
        s->uniq_id = totalconn;
@@ -185,7 +174,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
                goto out_free_session;
        }
 
-
        /* wait for a PROXY protocol header */
        if (l->options & LI_O_ACC_PROXY) {
                cli_conn->flags |= CO_FL_ACCEPT_PROXY;
@@ -239,13 +227,11 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
        p->feconn--;
        if (s->stkctr[0].entry || s->stkctr[1].entry)
                session_store_counters(s);
-       pool_free2(pool2_connection, s->si[1].conn);
- out_fail_conn1:
-       s->si[0].conn->flags &= ~CO_FL_XPRT_TRACKED;
-       conn_xprt_close(s->si[0].conn);
-       pool_free2(pool2_connection, s->si[0].conn);
- out_fail_conn0:
        pool_free2(pool2_session, s);
+ out_free_conn:
+       cli_conn->flags &= ~CO_FL_XPRT_TRACKED;
+       conn_xprt_close(cli_conn);
+       conn_free(cli_conn);
  out_close:
        if (ret < 0 && l->xprt == &raw_sock && p->mode == PR_MODE_HTTP) {
                /* critical error, no more memory, try to emit a 500 response */
@@ -354,8 +340,7 @@ static void kill_mini_session(struct session *s)
        task_delete(s->task);
        task_free(s->task);
 
-       pool_free2(pool2_connection, s->si[1].conn);
-       pool_free2(pool2_connection, s->si[0].conn);
+       pool_free2(pool2_connection, conn);
        pool_free2(pool2_session, s);
 }
 
@@ -425,17 +410,6 @@ int session_complete(struct session *s)
        LIST_ADDQ(&sessions, &s->list);
        LIST_INIT(&s->back_refs);
 
-       /* attach the incoming connection to the stream interface now */
-       si_reset(&s->si[0], t);
-       si_set_state(&s->si[0], SI_ST_EST);
-
-       if (likely(s->fe->options2 & PR_O2_INDEPSTR))
-               s->si[0].flags |= SI_FL_INDEP_STR;
-
-       s->si[0].conn = conn;
-       conn_prepare(conn, l->proto, l->xprt);
-       si_attach_conn(&s->si[0], conn);
-
        s->flags |= SN_INITIALIZED;
        s->unique_id = NULL;
 
@@ -470,12 +444,20 @@ int session_complete(struct session *s)
                                               s->stkctr[i].table->data_arg[STKTABLE_DT_SESS_RATE].u, 1);
        }
 
+       /* this part should be common with other protocols */
+       si_reset(&s->si[0], t);
+       si_set_state(&s->si[0], SI_ST_EST);
+
+       /* attach the incoming connection to the stream interface now */
+       si_attach_conn(&s->si[0], conn);
+
+       if (likely(s->fe->options2 & PR_O2_INDEPSTR))
+               s->si[0].flags |= SI_FL_INDEP_STR;
+
        /* pre-initialize the other side's stream interface to an INIT state. The
         * callbacks will be initialized before attempting to connect.
         */
        si_reset(&s->si[1], t);
-       conn_init(s->si[1].conn);
-       s->si[1].conn->target = NULL;
        si_detach(&s->si[1]);
 
        if (likely(s->fe->options2 & PR_O2_INDEPSTR))
@@ -673,8 +655,8 @@ static void session_free(struct session *s)
                bref->ref = s->list.n;
        }
        LIST_DEL(&s->list);
-       pool_free2(pool2_connection, s->si[1].conn);
-       pool_free2(pool2_connection, s->si[0].conn);
+       si_release_endpoint(&s->si[1]);
+       si_release_endpoint(&s->si[0]);
        pool_free2(pool2_session, s);
 
        /* We may want to free the maximum amount of pools if the proxy is stopping */
index 00b28e607766cc64ab56ad9946f1d7f77fca838f..69902d74f0f33a54b01b43eee434794e3db86fa3 100644 (file)
@@ -364,8 +364,8 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_
  */
 void stream_int_unregister_handler(struct stream_interface *si)
 {
+       si_detach(si);
        si->owner = NULL;
-       si->end = NULL;
 }
 
 /* This callback is used to send a valid PROXY protocol line to a socket being