size_t g_tcpInternalPipeBufferSize{0};
uint64_t g_maxTCPQueuedConnections{1000};
#endif
-uint16_t g_downstreamTCPCleanupInterval{60};
+
int g_tcpRecvTimeout{2};
int g_tcpSendTimeout{2};
std::atomic<uint64_t> g_tcpStatesDumpRequested{0};
-class DownstreamConnectionsManager
-{
-public:
-
- static std::shared_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
- {
- std::shared_ptr<TCPConnectionToBackend> result;
- struct timeval freshCutOff = now;
- freshCutOff.tv_sec -= 1;
-
- const auto& it = t_downstreamConnections.find(ds);
- if (it != t_downstreamConnections.end()) {
- auto& list = it->second;
- while (!list.empty()) {
- result = std::move(list.back());
- list.pop_back();
-
- result->setReused();
- /* for connections that have not been used very recently,
- check whether they have been closed in the meantime */
- if (freshCutOff < result->getLastDataReceivedTime()) {
- /* used recently enough, skip the check */
- ++ds->tcpReusedConnections;
- return result;
- }
-
- if (isTCPSocketUsable(result->getHandle())) {
- ++ds->tcpReusedConnections;
- return result;
- }
-
- /* otherwise let's try the next one, if any */
- }
- }
-
- return std::make_shared<TCPConnectionToBackend>(ds, mplexer, now);
- }
-
- static void releaseDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>&& conn)
- {
- if (conn == nullptr) {
- return;
- }
-
- if (!conn->canBeReused()) {
- conn.reset();
- return;
- }
-
- const auto& ds = conn->getDS();
- auto& list = t_downstreamConnections[ds];
- while (list.size() >= s_maxCachedConnectionsPerDownstream) {
- /* too many connections queued already */
- list.pop_front();
- }
-
- list.push_back(std::move(conn));
- }
-
- static void cleanupClosedTCPConnections(struct timeval now)
- {
- struct timeval freshCutOff = now;
- freshCutOff.tv_sec -= 1;
-
- for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
- for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
- if (!(*connIt)) {
- ++connIt;
- continue;
- }
-
- /* don't bother checking freshly used connections */
- if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
- ++connIt;
- continue;
- }
-
- if (isTCPSocketUsable((*connIt)->getHandle())) {
- ++connIt;
- }
- else {
- connIt = dsIt->second.erase(connIt);
- }
- }
-
- if (!dsIt->second.empty()) {
- ++dsIt;
- }
- else {
- dsIt = t_downstreamConnections.erase(dsIt);
- }
- }
- }
-
- static size_t clear()
- {
- size_t count = 0;
- for (const auto& downstream : t_downstreamConnections) {
- count += downstream.second.size();
- }
-
- t_downstreamConnections.clear();
-
- return count;
- }
-
- static void setMaxCachedConnectionsPerDownstream(size_t max)
- {
- s_maxCachedConnectionsPerDownstream = max;
- }
-
-private:
- static thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
- static size_t s_maxCachedConnectionsPerDownstream;
-};
-
-void setMaxCachedTCPConnectionsPerDownstream(size_t max)
-{
- DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
-}
-
-thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
-size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10};
-
static void decrementTCPClientCount(const ComboAddress& client)
{
if (g_maxTCPConnectionsPerClient) {
struct timeval now;
gettimeofday(&now, nullptr);
- time_t lastTCPCleanup = now.tv_sec;
time_t lastTimeoutScan = now.tv_sec;
for (;;) {
data.mplexer->run(&now);
- if (g_downstreamTCPCleanupInterval > 0 && (now.tv_sec > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) {
- DownstreamConnectionsManager::cleanupClosedTCPConnections(now);
- lastTCPCleanup = now.tv_sec;
+ if (now.tv_sec > lastTimeoutScan) {
+ lastTimeoutScan = now.tv_sec;
+ auto expiredReadConns = data.mplexer->getTimeouts(now, false);
+ for (const auto& cbData : expiredReadConns) {
+ if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
+ auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
+ if (cbData.first == state->d_handler.getDescriptor()) {
+ vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
+ state->handleTimeout(state, false);
+ }
+ }
+ else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
+ auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
+ vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
+ conn->handleTimeout(now, false);
+ }
+ }
+
+ auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
+ for (const auto& cbData : expiredWriteConns) {
+ if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
+ auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
+ if (cbData.first == state->d_handler.getDescriptor()) {
+ vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
+ state->handleTimeout(state, true);
+ }
+ }
+ else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
+ auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
+ vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
+ conn->handleTimeout(now, true);
+ }
+ }
if (g_tcpStatesDumpRequested > 0) {
/* just to keep things clean in the output, debug only */
}
}
}
-
- if (now.tv_sec > lastTimeoutScan) {
- lastTimeoutScan = now.tv_sec;
- auto expiredReadConns = data.mplexer->getTimeouts(now, false);
- for (const auto& cbData : expiredReadConns) {
- if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
- auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
- if (cbData.first == state->d_handler.getDescriptor()) {
- vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
- state->handleTimeout(state, false);
- }
- }
- else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
- auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
- vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
- conn->handleTimeout(now, false);
- }
- }
-
- auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
- for (const auto& cbData : expiredWriteConns) {
- if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
- auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
- if (cbData.first == state->d_handler.getDescriptor()) {
- vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
- state->handleTimeout(state, true);
- }
- }
- else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
- auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
- vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
- conn->handleTimeout(now, true);
- }
- }
- }
}
}
// get ready to read the next packet, if any
return IOState::NeedRead;
}
- else {
- --conn->d_ds->outstanding;
- }
+ --conn->d_ds->outstanding;
auto ids = std::move(it->second.d_idstate);
d_pendingResponses.erase(it);
/* marking as idle for now, so we can accept new queries if our queues are empty */
}
DEBUGLOG("passing response to client connection for "<<ids.qname);
+ // make sure that we still exist after calling handleResponse()
+ auto shared = shared_from_this();
+ bool release = canBeReused() && sender->releaseConnection();
sender->handleResponse(now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
if (!d_pendingQueries.empty()) {
DEBUGLOG("nothing to do, waiting for a new query");
d_state = State::idle;
d_sender.reset();
+ if (release) {
+ DownstreamConnectionsManager::releaseDownstreamConnection(std::move(shared));
+ }
return IOState::Done;
}
}
}
return done;
}
+
+std::shared_ptr<TCPConnectionToBackend> DownstreamConnectionsManager::getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
+{
+ std::shared_ptr<TCPConnectionToBackend> result;
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+
+ auto backendId = ds->getID();
+
+ if (s_cleanupInterval > 0 && (s_nextCleanup == 0 || s_nextCleanup <= now.tv_sec)) {
+ s_nextCleanup = now.tv_sec + s_cleanupInterval;
+ cleanupClosedTCPConnections(now);
+ }
+
+ {
+ std::lock_guard<decltype(s_lock)> lock(s_lock);
+ const auto& it = s_downstreamConnections.find(backendId);
+ if (it != s_downstreamConnections.end()) {
+ auto& list = it->second;
+ while (!list.empty()) {
+ result = std::move(list.back());
+ list.pop_back();
+
+ result->setReused();
+ /* for connections that have not been used very recently,
+ check whether they have been closed in the meantime */
+ if (freshCutOff < result->getLastDataReceivedTime()) {
+ /* used recently enough, skip the check */
+ ++ds->tcpReusedConnections;
+ return result;
+ }
+
+ if (isTCPSocketUsable(result->getHandle())) {
+ ++ds->tcpReusedConnections;
+ return result;
+ }
+
+ /* otherwise let's try the next one, if any */
+ }
+ }
+ }
+
+ return std::make_shared<TCPConnectionToBackend>(ds, mplexer, now);
+}
+
+void DownstreamConnectionsManager::releaseDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>&& conn)
+{
+ if (conn == nullptr) {
+ return;
+ }
+
+ if (!conn->canBeReused()) {
+ conn.reset();
+ return;
+ }
+
+ const auto& ds = conn->getDS();
+ {
+ std::lock_guard<decltype(s_lock)> lock(s_lock);
+ auto& list = s_downstreamConnections[ds->getID()];
+ while (list.size() >= s_maxCachedConnectionsPerDownstream) {
+ /* too many connections queued already */
+ list.pop_front();
+ }
+
+ list.push_back(std::move(conn));
+ }
+}
+
+void DownstreamConnectionsManager::cleanupClosedTCPConnections(struct timeval now)
+{
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+
+ std::lock_guard<decltype(s_lock)> lock(s_lock);
+ for (auto dsIt = s_downstreamConnections.begin(); dsIt != s_downstreamConnections.end(); ) {
+ for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
+ if (!(*connIt)) {
+ ++connIt;
+ continue;
+ }
+
+ /* don't bother checking freshly used connections */
+ if (freshCutOff < (*connIt)->getLastDataReceivedTime()) {
+ ++connIt;
+ continue;
+ }
+
+ if (isTCPSocketUsable((*connIt)->getHandle())) {
+ ++connIt;
+ }
+ else {
+ connIt = dsIt->second.erase(connIt);
+ }
+ }
+
+ if (!dsIt->second.empty()) {
+ ++dsIt;
+ }
+ else {
+ dsIt = s_downstreamConnections.erase(dsIt);
+ }
+ }
+}
+
+size_t DownstreamConnectionsManager::clear()
+{
+ size_t count = 0;
+ std::lock_guard<decltype(s_lock)> lock(s_lock);
+ for (const auto& downstream : s_downstreamConnections) {
+ count += downstream.second.size();
+ }
+
+ s_downstreamConnections.clear();
+
+ return count;
+}
+
+void setMaxCachedTCPConnectionsPerDownstream(size_t max)
+{
+ DownstreamConnectionsManager::setMaxCachedConnectionsPerDownstream(max);
+}
+
+map<boost::uuids::uuid, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::s_downstreamConnections;
+std::mutex DownstreamConnectionsManager::s_lock;
+size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{10};
+time_t DownstreamConnectionsManager::s_nextCleanup{0};
+uint16_t DownstreamConnectionsManager::s_cleanupInterval{60};