From 7c6f8b146de309e14f0b7ff9e680794683498c54 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Tue, 13 Nov 2018 16:48:36 +0100 Subject: [PATCH] MAJOR: connections: Detach connections from streams. Do not destroy the connection when we're about to destroy a stream. This prevents us from doing keepalive on server connections when the client is using HTTP/2, as a new stream is created for each request. Instead, the session is now responsible for destroying connections. When reusing connections, the attach() mux method is now used to create a new conn_stream. --- include/proto/connection.h | 4 +- include/proto/stream_interface.h | 19 ----- src/backend.c | 119 +++++++++++++++++++------------ src/mux_h2.c | 5 +- src/mux_pt.c | 42 +++++++++-- src/proto_http.c | 47 ++++++------ src/session.c | 9 +++ src/stream_interface.c | 29 -------- 8 files changed, 147 insertions(+), 127 deletions(-) diff --git a/include/proto/connection.h b/include/proto/connection.h index ade6994011..611e6ad2dc 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -673,10 +673,12 @@ static inline void conn_free(struct connection *conn) LIST_INIT(&sess->conn_list); } conn_force_unsubscribe(conn); + LIST_DEL(&conn->list); + LIST_INIT(&conn->list); pool_free(pool_head_connection, conn); } -/* Release a conn_stream, and kill the connection if it was the last one */ +/* Release a conn_stream */ static inline void cs_destroy(struct conn_stream *cs) { if (cs->conn->mux) diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 90e395655c..3d2a6db6ef 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -45,7 +45,6 @@ extern struct si_ops si_embedded_ops; extern struct si_ops si_conn_ops; extern struct si_ops si_applet_ops; extern struct data_cb si_conn_cb; -extern struct data_cb si_idle_conn_cb; struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app); void si_applet_wake_cb(struct stream_interface *si); @@ -182,24 +181,6 @@ static inline void si_release_endpoint(struct stream_interface *si) si_detach_endpoint(si); } -/* Turn an existing connection endpoint of stream interface to idle mode, - * which means that the connection will be polled for incoming events and might - * be killed by the underlying I/O handler. If is not null, the - * connection will also be added at the head of this list. This connection - * remains assigned to the stream interface it is currently attached to. - */ -static inline void si_idle_cs(struct stream_interface *si, struct list *pool) -{ - struct conn_stream *cs = __objt_cs(si->end); - struct connection *conn = cs->conn; - - conn_force_unsubscribe(conn); - if (pool) - LIST_ADD(pool, &conn->list); - - cs_attach(cs, si, &si_idle_conn_cb); -} - /* Attach conn_stream to the stream interface . The stream interface * is configured to work with a connection and the connection it configured * with a stream interface data layer. diff --git a/src/backend.c b/src/backend.c index d40202885d..0b45d85918 100644 --- a/src/backend.c +++ b/src/backend.c @@ -583,7 +583,7 @@ int assign_server(struct stream *s) srv = NULL; s->target = NULL; - conn = cs_conn(objt_cs(s->si[1].end)); + conn = s->sess->srv_conn; if (conn && (conn->flags & CO_FL_CONNECTED) && @@ -1056,28 +1056,22 @@ int connect_server(struct stream *s) { struct connection *cli_conn = NULL; struct connection *srv_conn; + struct connection *old_conn; struct conn_stream *srv_cs; - struct conn_stream *old_cs; struct server *srv; int reuse = 0; int err; + srv = objt_server(s->target); - srv_cs = objt_cs(s->si[1].end); - srv_conn = cs_conn(srv_cs); + old_conn = srv_conn = s->sess->srv_conn; if (srv_conn) - reuse = s->target == srv_conn->target; + reuse = (s->target == srv_conn->target) && + (srv_conn->mux->avail_streams(srv_conn) > 0) && + conn_xprt_ready(srv_conn); if (srv && !reuse) { - old_cs = srv_cs; - if (old_cs) { - srv_conn = NULL; - srv_cs->data = NULL; - si_detach_endpoint(&s->si[1]); - /* note: if the connection was in a server's idle - * queue, it doesn't get dequeued. - */ - } + srv_conn = NULL; /* Below we pick connections from the safe or idle lists based * on the strategy, the fact that this is a first or second @@ -1114,29 +1108,8 @@ int connect_server(struct stream *s) * other owner's. That way it may remain alive for others to * pick. */ - if (srv_conn) { - LIST_DEL(&srv_conn->list); - LIST_INIT(&srv_conn->list); - - /* XXX cognet: this assumes only 1 conn_stream per - * connection, has to be revisited later - */ - srv_cs = srv_conn->mux_ctx; - - if (srv_cs->data) { - si_detach_endpoint(srv_cs->data); - if (old_cs && !(old_cs->conn->flags & CO_FL_PRIVATE)) { - si_attach_cs(srv_cs->data, old_cs); - si_idle_cs(srv_cs->data, NULL); - } - } - si_attach_cs(&s->si[1], srv_cs); + if (srv_conn) reuse = 1; - } - - /* we may have to release our connection if we couldn't swap it */ - if (old_cs && !old_cs->data) - cs_destroy(old_cs); } if (reuse) { @@ -1155,13 +1128,74 @@ int connect_server(struct stream *s) } } + /* We're about to use another connection, let the mux know we're + * done with this one + */ + if (old_conn != srv_conn) { + int did_switch = 0; + + if (srv_conn && reuse) { + struct session *sess; + int count = 0; + + /* + * If we're attempting to reuse a connection, and + * the new connection has only one user, and there + * are no more streams available, attempt to give + * it our old connection + */ + list_for_each_entry(sess, &srv_conn->session_list, + conn_list) { + count++; + if (count > 1) + break; + } + if (count == 1) { + sess = LIST_ELEM(srv_conn->session_list.n, + struct session *, conn_list); + LIST_DEL(&sess->conn_list); + if (old_conn && + !(old_conn->flags & CO_FL_PRIVATE) && + (old_conn->mux->avail_streams(old_conn) > 0) && + (srv_conn->mux->avail_streams(srv_conn) == 1)) { + LIST_ADDQ(&old_conn->session_list, &sess->conn_list); + sess->srv_conn = old_conn; + } else { + LIST_INIT(&sess->conn_list); + sess->srv_conn = NULL; + } + did_switch = 1; + } + + } + /* + * We didn't manage to give our old connection, destroy it + */ + if (old_conn && !did_switch) { + old_conn->owner = NULL; + old_conn->mux->destroy(old_conn); + old_conn = NULL; + } + } + if (!reuse) { srv_cs = si_alloc_cs(&s->si[1], NULL); srv_conn = cs_conn(srv_cs); } else { - /* reusing our connection, take it out of the idle list */ - LIST_DEL(&srv_conn->list); - LIST_INIT(&srv_conn->list); + if (srv_conn->mux->avail_streams(srv_conn) == 1) { + /* No more streams available, remove it from the list */ + LIST_DEL(&srv_conn->list); + LIST_INIT(&srv_conn->list); + } + srv_cs = srv_conn->mux->attach(srv_conn); + if (srv_cs) + si_attach_cs(&s->si[1], srv_cs); + } + if (srv_conn && old_conn != srv_conn) { + srv_conn->owner = s->sess; + s->sess->srv_conn = srv_conn; + LIST_DEL(&s->sess->conn_list); + LIST_ADDQ(&srv_conn->session_list, &s->sess->conn_list); } if (!srv_cs) @@ -1203,15 +1237,10 @@ int connect_server(struct stream *s) conn_get_to_addr(cli_conn); } - si_attach_cs(&s->si[1], srv_cs); - assign_tproxy_address(s); } - else { - /* the connection is being reused, just re-attach it */ - si_attach_cs(&s->si[1], srv_cs); + else s->flags |= SF_SRV_REUSED; - } /* flag for logging source ip/port */ if (strm_fe(s)->options2 & PR_O2_SRC_ADDR) diff --git a/src/mux_h2.c b/src/mux_h2.c index 87b5cb1dcf..9e4f80178f 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -62,7 +62,6 @@ static struct pool_head *pool_head_h2s; #define H2_CF_WAIT_FOR_HS 0x00004000 // We did check that at least a stream was waiting for handshake #define H2_CF_IS_BACK 0x00008000 // this is an outgoing connection - /* H2 connection state, in h2c->st0 */ enum h2_cs { H2_CS_PREFACE, // init done, waiting for connection preface @@ -2264,7 +2263,7 @@ static int h2_recv(struct h2c *h2c) if (!b_data(buf)) { h2_release_buf(h2c, &h2c->dbuf); - return 0; + return conn_xprt_read0_pending(conn); } if (b_data(buf) == buf->size) @@ -2282,7 +2281,7 @@ static int h2_send(struct h2c *h2c) int sent = 0; if (conn->flags & CO_FL_ERROR) - return 0; + return 1; if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) { diff --git a/src/mux_pt.c b/src/mux_pt.c index a0f0397758..a974ec3020 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -30,6 +30,9 @@ static void mux_pt_destroy(struct mux_pt_ctx *ctx) LIST_DEL(&conn->list); conn_stop_tracking(conn); conn_full_close(conn); + tasklet_free(ctx->wait_event.task); + conn->mux = NULL; + conn->mux_ctx = NULL; if (conn->destroy_cb) conn->destroy_cb(conn); /* We don't bother unsubscribing here, as we're about to destroy @@ -45,7 +48,7 @@ static struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned short stat struct mux_pt_ctx *ctx = tctx; conn_sock_drain(ctx->conn); - if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) + if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH)) mux_pt_destroy(ctx); else ctx->conn->xprt->subscribe(ctx->conn, SUB_CAN_RECV, @@ -135,6 +138,16 @@ static int mux_pt_wake(struct connection *conn) */ static struct conn_stream *mux_pt_attach(struct connection *conn) { + struct conn_stream *cs; + struct mux_pt_ctx *ctx = conn->mux_ctx; + + cs = cs_new(conn); + if (!cs) + goto fail; + + ctx->cs = cs; + return (cs); +fail: return NULL; } @@ -149,10 +162,13 @@ static const struct conn_stream *mux_pt_get_first_cs(const struct connection *co return cs; } -/* Destroy the mux and the associated connection */ +/* Destroy the mux and the associated connection, if no longer used */ static void mux_pt_destroy_meth(struct connection *conn) { - mux_pt_destroy(conn->mux_ctx); + struct mux_pt_ctx *ctx = conn->mux_ctx; + + if (!(ctx->cs)) + mux_pt_destroy(ctx); } /* @@ -164,9 +180,13 @@ static void mux_pt_detach(struct conn_stream *cs) struct mux_pt_ctx *ctx = cs->conn->mux_ctx; /* Subscribe, to know if we got disconnected */ - conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event); - ctx->cs = NULL; - mux_pt_destroy(ctx); + if (conn->owner != NULL && + !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) { + ctx->cs = NULL; + conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event); + } else + /* There's no session attached to that connection, destroy it */ + mux_pt_destroy(ctx); } static int mux_pt_avail_streams(struct connection *conn) @@ -184,6 +204,11 @@ static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode) cs->conn->xprt->shutr(cs->conn, (mode == CS_SHR_DRAIN)); if (cs->flags & CS_FL_SHW) conn_full_close(cs->conn); + /* Maybe we've been put in the list of available idle connections, + * get ouf of here + */ + LIST_DEL(&cs->conn->list); + LIST_INIT(&cs->conn->list); } static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode) @@ -196,6 +221,11 @@ static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode) conn_sock_shutw(cs->conn, (mode == CS_SHW_NORMAL)); else conn_full_close(cs->conn); + /* Maybe we've been put in the list of available idle connections, + * get ouf of here + */ + LIST_DEL(&cs->conn->list); + LIST_INIT(&cs->conn->list); } /* diff --git a/src/proto_http.c b/src/proto_http.c index 0ce03b3c15..c93afa9ede 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -3716,20 +3716,18 @@ void http_end_txn_clean_session(struct stream *s) * flags. We also need a more accurate method for computing per-request * data. */ - /* - * XXX cognet: This is probably wrong, this is killing a whole - * connection, in the new world order, we probably want to just kill - * the stream, this is to be revisited the day we handle multiple - * streams in one server connection. - */ cs = objt_cs(s->si[1].end); srv_conn = cs_conn(cs); /* unless we're doing keep-alive, we want to quickly close the connection * to the server. + * XXX cognet: If the connection doesn't have a owner then it may not + * be referenced anywhere, just kill it now, even if it could be reused. + * To be revisited later when revisited later when we handle connection + * pools properly. */ if (((s->txn->flags & TX_CON_WANT_MSK) != TX_CON_WANT_KAL) || - !si_conn_ready(&s->si[1])) { + !si_conn_ready(&s->si[1]) || !srv_conn->owner) { s->si[1].flags |= SI_FL_NOLINGER | SI_FL_NOHALF; si_shutr(&s->si[1]); si_shutw(&s->si[1]); @@ -3805,14 +3803,15 @@ void http_end_txn_clean_session(struct stream *s) s->target = NULL; - /* only release our endpoint if we don't intend to reuse the - * connection. + + /* If we're doing keepalive, first call the mux detach() method + * to let it know we want to detach without freing the connection. + * We then can call si_release_endpoint() to destroy the conn_stream */ if (((s->txn->flags & TX_CON_WANT_MSK) != TX_CON_WANT_KAL) || - !si_conn_ready(&s->si[1])) { - si_release_endpoint(&s->si[1]); + !si_conn_ready(&s->si[1]) || !srv_conn->owner) srv_conn = NULL; - } + si_release_endpoint(&s->si[1]); s->si[1].state = s->si[1].prev_state = SI_ST_INI; s->si[1].err_type = SI_ET_NONE; @@ -3867,18 +3866,18 @@ void http_end_txn_clean_session(struct stream *s) /* we're in keep-alive with an idle connection, monitor it if not already done */ if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) { srv = objt_server(srv_conn->target); - if (!srv) - si_idle_cs(&s->si[1], NULL); - else if (srv_conn->flags & CO_FL_PRIVATE) - si_idle_cs(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL)); - else if (prev_flags & TX_NOT_FIRST) - /* note: we check the request, not the connection, but - * this is valid for strategies SAFE and AGGR, and in - * case of ALWS, we don't care anyway. - */ - si_idle_cs(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL)); - else - si_idle_cs(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL)); + if (srv) { + if (srv_conn->flags & CO_FL_PRIVATE) + LIST_ADD(&srv->priv_conns[tid], &srv_conn->list); + else if (prev_flags & TX_NOT_FIRST) + /* note: we check the request, not the connection, but + * this is valid for strategies SAFE and AGGR, and in + * case of ALWS, we don't care anyway. + */ + LIST_ADD(&srv->safe_conns[tid], &srv_conn->list); + else + LIST_ADD(&srv->idle_conns[tid], &srv_conn->list); + } } s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0; s->res.analysers = 0; diff --git a/src/session.c b/src/session.c index d8c8d36cf0..7d21a6a6c0 100644 --- a/src/session.c +++ b/src/session.c @@ -67,11 +67,19 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type void session_free(struct session *sess) { + struct connection *conn; + HA_ATOMIC_SUB(&sess->fe->feconn, 1); if (sess->listener) listener_release(sess->listener); session_store_counters(sess); vars_prune_per_sess(&sess->vars); + conn = objt_conn(sess->origin); + if (conn != NULL && conn->mux) + conn->mux->destroy(conn); + conn = sess->srv_conn; + if (conn != NULL && conn->mux) + conn->mux->destroy(conn); pool_free(pool_head_session, sess); HA_ATOMIC_SUB(&jobs, 1); } @@ -377,6 +385,7 @@ static void session_kill_embryonic(struct session *sess, unsigned short state) conn_stop_tracking(conn); conn_full_close(conn); conn_free(conn); + sess->origin = NULL; task_delete(task); task_free(task); diff --git a/src/stream_interface.c b/src/stream_interface.c index f88b4323b3..22c329fc15 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -53,7 +53,6 @@ static void stream_int_chk_rcv_applet(struct stream_interface *si); static void stream_int_chk_snd_applet(struct stream_interface *si); int si_cs_recv(struct conn_stream *cs); static int si_cs_process(struct conn_stream *cs); -static int si_idle_conn_wake_cb(struct conn_stream *cs); int si_cs_send(struct conn_stream *cs); /* stream-interface operations for embedded tasks */ @@ -85,11 +84,6 @@ struct data_cb si_conn_cb = { .name = "STRM", }; -struct data_cb si_idle_conn_cb = { - .wake = si_idle_conn_wake_cb, - .name = "IDLE", -}; - /* * This function only has to be called once after a wakeup event in case of * suspected timeout. It controls the stream interface timeouts and sets @@ -411,29 +405,6 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) } -/* Callback to be used by connection I/O handlers when some activity is detected - * on an idle server connection. Its main purpose is to kill the connection once - * a close was detected on it. It returns 0 if it did nothing serious, or -1 if - * it killed the connection. - */ -static int si_idle_conn_wake_cb(struct conn_stream *cs) -{ - struct connection *conn = cs->conn; - struct stream_interface *si = cs->data; - - if (!conn_ctrl_ready(conn)) - return 0; - - conn_sock_drain(conn); - - if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) { - /* warning, we can't do anything on after this call ! */ - si_release_endpoint(si); - return -1; - } - return 0; -} - /* This function is the equivalent to stream_int_update() except that it's * designed to be called from outside the stream handlers, typically the lower * layers (applets, connections) after I/O completion. After updating the stream -- 2.39.5