return t_downstreamTCPConnectionsManager.clear();
}
+static std::pair<std::shared_ptr<TCPConnectionToBackend>, bool> getOwnedDownstreamConnection(std::map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>>& ownedConnectionsToBackend, const std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
+{
+ bool tlvsMismatch = false;
+ auto connIt = ownedConnectionsToBackend.find(backend);
+ if (connIt == ownedConnectionsToBackend.end()) {
+ DEBUGLOG("no owned connection found for " << backend->getName());
+ return {nullptr, tlvsMismatch};
+ }
+
+ for (auto& conn : connIt->second) {
+ if (conn->canBeReused(true)) {
+ if (conn->matchesTLVs(tlvs)) {
+ DEBUGLOG("Got one owned connection accepting more for " << backend->getName());
+ conn->setReused();
+ return {conn, tlvsMismatch};
+ }
+ DEBUGLOG("Found one connection to " << backend->getName() << " but with different TLV values");
+ tlvsMismatch = true;
+ }
+ DEBUGLOG("not accepting more for " << backend->getName());
+ }
+
+ return {nullptr, tlvsMismatch};
+}
+
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
{
- auto downstream = getOwnedDownstreamConnection(backend, tlvs);
+ auto [downstream, tlvsMismatch] = getOwnedDownstreamConnection(d_ownedConnectionsToBackend, backend, tlvs);
if (!downstream) {
/* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, backend, now, std::string());
- if (backend->d_config.useProxyProtocol) {
+ // if we had an existing connection but the TLVs are different, they are likely unique per query so do not bother keeping the connection
+ // around
+ if (backend->d_config.useProxyProtocol && !tlvsMismatch) {
registerOwnedDownstreamConnection(downstream);
}
}
d_state = State::waitingForQuery;
}
-std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getOwnedDownstreamConnection(const std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
+void IncomingTCPConnectionState::registerOwnedDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
{
- auto connIt = d_ownedConnectionsToBackend.find(backend);
- if (connIt == d_ownedConnectionsToBackend.end()) {
- DEBUGLOG("no owned connection found for " << backend->getName());
- return nullptr;
- }
+ const auto& downstream = conn->getDS();
- for (auto& conn : connIt->second) {
- if (conn->canBeReused(true) && conn->matchesTLVs(tlvs)) {
- DEBUGLOG("Got one owned connection accepting more for " << backend->getName());
- conn->setReused();
- return conn;
- }
- DEBUGLOG("not accepting more for " << backend->getName());
- }
+ auto& queue = d_ownedConnectionsToBackend[downstream];
+ // how many proxy-protocol enabled connections do we want to keep around?
+ // - they are only usable for this incoming connection because of the proxy protocol header containing the source and destination addresses and ports
+ // - if we have TLV values, and they are unique per query, keeping these is useless
+ // - if there is no, or identical, TLV values, a single outgoing connection is enough if maxInFlight == 1, or if incoming maxInFlight == outgoing maxInFlight
+ // so it makes sense to keep a few of them around if incoming maxInFlight is greater than outgoing maxInFlight
- return nullptr;
-}
+ auto incomingMaxInFlightQueriesPerConn = d_ci.cs->d_maxInFlightQueriesPerConn;
+ incomingMaxInFlightQueriesPerConn = std::max(incomingMaxInFlightQueriesPerConn, static_cast<size_t>(1U));
+ auto outgoingMaxInFlightQueriesPerConn = downstream->d_config.d_maxInFlightQueriesPerConn;
+ outgoingMaxInFlightQueriesPerConn = std::max(outgoingMaxInFlightQueriesPerConn, static_cast<size_t>(1U));
+ size_t maxCachedOutgoingConnections = std::min(static_cast<size_t>(std::round(incomingMaxInFlightQueriesPerConn / outgoingMaxInFlightQueriesPerConn)), static_cast<size_t>(5U));
-void IncomingTCPConnectionState::registerOwnedDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
-{
- d_ownedConnectionsToBackend[conn->getDS()].push_front(conn);
+ queue.push_front(conn);
+ if (queue.size() > maxCachedOutgoingConnections) {
+ queue.pop_back();
+ }
}
/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */