static std::shared_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
{
std::shared_ptr<TCPConnectionToBackend> result;
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
const auto& it = t_downstreamConnections.find(ds);
if (it != t_downstreamConnections.end()) {
auto& list = it->second;
- if (!list.empty()) {
- result = std::move(list.front());
- list.pop_front();
+ while (!list.empty()) {
+ result = std::move(list.back());
+ list.pop_back();
+
result->setReused();
- ++ds->tcpReusedConnections;
- return result;
+ /* for connections that have not been used very recently,
+ check whether they have been closed in the meantime */
+ if (freshCutOff < result->getLastDataReceivedTime()) {
+ /* used recently enough, skip the check */
+ ++ds->tcpReusedConnections;
+ return result;
+ }
+
+ if (isTCPSocketUsable(result->getHandle())) {
+ ++ds->tcpReusedConnections;
+ return result;
+ }
+
+ /* otherwise let's try the next one, if any */
}
}
}
}
- static void cleanupClosedTCPConnections()
+ static void cleanupClosedTCPConnections(struct timeval now)
{
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+
for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
- if (*connIt && isTCPSocketUsable((*connIt)->getHandle())) {
+ if (!(*connIt)) {
+ ++connIt;
+ continue;
+ }
+
+ /* don't bother checking freshly used connections */
+ if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
+ ++connIt;
+ continue;
+ }
+
+ if (isTCPSocketUsable((*connIt)->getHandle())) {
++connIt;
}
else {
data.mplexer->run(&now);
if (g_downstreamTCPCleanupInterval > 0 && (now.tv_sec > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) {
- DownstreamConnectionsManager::cleanupClosedTCPConnections();
+ DownstreamConnectionsManager::cleanupClosedTCPConnections(now);
lastTCPCleanup = now.tv_sec;
if (g_tcpStatesDumpRequested > 0) {
conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
conn->d_responseBuffer.resize(conn->d_responseSize);
conn->d_currentPos = 0;
+ conn->d_lastDataReceivedTime = now;
}
else if (conn->d_state == State::waitingForResponseFromBackend && conn->d_currentPos > 0) {
conn->d_state = State::readingResponseSizeFromBackend;
if (iostate == IOState::Done) {
DEBUGLOG("got response from backend");
try {
+ conn->d_lastDataReceivedTime = now;
iostate = conn->handleResponse(conn, now);
}
catch (const std::exception& e) {
void setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent);
+ struct timeval getLastDataReceivedTime() const
+ {
+ return d_lastDataReceivedTime;
+ }
+
std::string toString() const
{
ostringstream o;
std::shared_ptr<IncomingTCPConnectionState> d_clientConn;
TCPQuery d_currentQuery;
struct timeval d_connectionStartTime;
+ struct timeval d_lastDataReceivedTime;
size_t d_currentPos{0};
uint64_t d_queries{0};
uint64_t d_downstreamFailures{0};