DownstreamConnectionsManager::cleanupClosedTCPConnections();
lastTCPCleanup = now.tv_sec;
- /*
+#if 0
+ /* just to keep things clean in the output, debug only */
+ static std::mutex s_lock;
+ std::lock_guard<decltype(s_lock)> lck(s_lock);
data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd)
{
- cerr<<"- "<<isRead<<" "<<fd<<": "<<param.type().name()<<" "<<ttd.tv_sec<<endl;
+ struct timeval lnow;
+ gettimeofday(&lnow, 0);
+ cerr<<"- "<<isRead<<" "<<fd<<": "<<" "<<(ttd.tv_sec-lnow.tv_sec)<<endl;
+ if (param.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
+ auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
+ cerr<<" - "<<state->toString()<<endl;
+ }
+ else if (param.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
+ auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
+ cerr<<" - "<<conn->toString()<<endl;
+ }
});
- */
+#endif
}
if (now.tv_sec > lastTimeoutScan) {
void setProxyProtocolPayloadAdded(bool added);
void setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent);
+ std::string toString() const
+ {
+ ostringstream o;
+ o << "TCP connection to backend "<<(d_ds ? d_ds->getName() : "empty")<<" over FD "<<(d_socket ? std::to_string(d_socket->getHandle()) : "no socket")<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? std::to_string((int)d_ioState->getState()) : "empty")<<", queries count is "<<d_queries<<", pending queries count is "<<d_pendingQueries.size()<<", "<<d_pendingResponses.size()<<" pending responses, linked to "<<(d_clientConn ? " a client" : "no client");
+ return o.str();
+ }
+
private:
/* waitingForResponseFromBackend is a state where we have not yet started reading the size,
so we can still switch to sending instead */
return d_ioState != nullptr;
}
+ std::string toString() const
+ {
+ ostringstream o;
+ o << "Incoming TCP connection from "<<d_ci.remote.toStringWithPort()<<" over FD "<<d_ci.fd<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? std::to_string((int)d_ioState->getState()) : "empty")<<", queries count is "<<d_queriesCount<<", current queries count is "<<d_currentQueriesCount<<", "<<d_queuedResponses.size()<<" queued responses, "<<d_activeConnectionsToBackend.size()<<" active connections to a backend";
+ return o.str();
+ }
+
enum class State { doingHandshake, readingProxyProtocolHeader, readingQuerySize, readingQuery, sendingResponse, idle /* in case of XFR, we stop processing queries */ };
std::map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> d_activeConnectionsToBackend;