Let's start naively.
*/
-static thread_local map<ComboAddress, std::deque<std::unique_ptr<Socket>>> t_downstreamSockets;
static std::mutex tcpClientsCountMutex;
static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
static const size_t g_maxCachedConnectionsPerDownstream = 20;
size_t g_maxTCPQueriesPerConn{0};
size_t g_maxTCPConnectionDuration{0};
size_t g_maxTCPConnectionsPerClient{0};
+uint16_t g_downstreamTCPCleanupInterval{60};
bool g_useTCPSinglePipe{false};
-std::atomic<uint16_t> g_downstreamTCPCleanupInterval{60};
-static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, int timeout)
+static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures)
{
std::unique_ptr<Socket> result;
result->setNonBlocking();
#ifdef MSG_FASTOPEN
if (!ds->tcpFastOpen) {
- SConnectWithTimeout(result->getHandle(), ds->remote, timeout);
+ SConnectWithTimeout(result->getHandle(), ds->remote, /* no timeout, we will handle it ourselves */ 0);
}
#else
- SConnectWithTimeout(result->getHandle(), ds->remote, timeout);
+ SConnectWithTimeout(result->getHandle(), ds->remote, /* no timeout, we will handle it ourselves */ 0);
#endif /* MSG_FASTOPEN */
return result;
}
return nullptr;
}
-static std::unique_ptr<Socket> getConnectionToDownstream(std::shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, bool& isFresh)
+class TCPConnectionToBackend
{
- std::unique_ptr<Socket> result;
+public:
+ TCPConnectionToBackend(std::shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, const struct timeval& now): d_ds(ds), d_connectionStartTime(now)
+ {
+ d_socket = setupTCPDownstream(d_ds, downstreamFailures);
+ ++d_ds->tcpCurrentConnections;
+ }
- const auto& it = t_downstreamSockets.find(ds->remote);
- if (it != t_downstreamSockets.end()) {
+ ~TCPConnectionToBackend()
+ {
+ if (d_ds && d_socket) {
+ --d_ds->tcpCurrentConnections;
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+
+ auto diff = now - d_connectionStartTime;
+ d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000);
+ }
+ }
+
+ int getHandle() const
+ {
+ if (!d_socket) {
+ throw std::runtime_error("Attempt to get the socket handle from a non-established TCP connection");
+ }
+
+ return d_socket->getHandle();
+ }
+
+ const ComboAddress& getRemote() const
+ {
+ return d_ds->remote;
+ }
+
+ bool isFresh() const
+ {
+ return d_fresh;
+ }
+
+ void incQueries()
+ {
+ ++d_queries;
+ }
+
+ void setReused()
+ {
+ d_fresh = false;
+ }
+
+private:
+ std::unique_ptr<Socket> d_socket{nullptr};
+ std::shared_ptr<DownstreamState> d_ds{nullptr};
+ struct timeval d_connectionStartTime;
+ uint64_t d_queries{0};
+ bool d_fresh{true};
+};
+
+static thread_local map<ComboAddress, std::deque<std::unique_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
+
+static std::unique_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, const struct timeval& now)
+{
+ std::unique_ptr<TCPConnectionToBackend> result;
+
+ const auto& it = t_downstreamConnections.find(ds->remote);
+ if (it != t_downstreamConnections.end()) {
auto& list = it->second;
if (!list.empty()) {
result = std::move(list.front());
list.pop_front();
- isFresh = false;
+ result->setReused();
return result;
}
}
- isFresh = true;
- return setupTCPDownstream(ds, downstreamFailures, 0);
+ return std::unique_ptr<TCPConnectionToBackend>(new TCPConnectionToBackend(ds, downstreamFailures, now));
}
-static void releaseDownstreamConnection(std::shared_ptr<DownstreamState>& ds, std::unique_ptr<Socket>&& socket)
+static void releaseDownstreamConnection(std::unique_ptr<TCPConnectionToBackend>&& conn)
{
- if (socket == nullptr) {
+ if (conn == nullptr) {
return;
}
- const auto& it = t_downstreamSockets.find(ds->remote);
- if (it != t_downstreamSockets.end()) {
+ const auto& remote = conn->getRemote();
+ const auto& it = t_downstreamConnections.find(remote);
+ if (it != t_downstreamConnections.end()) {
auto& list = it->second;
if (list.size() >= g_maxCachedConnectionsPerDownstream) {
/* too many connections queued already */
- socket.reset();
+ conn.reset();
return;
}
- list.push_back(std::move(socket));
+ list.push_back(std::move(conn));
}
else {
- t_downstreamSockets[ds->remote].push_back(std::move(socket));
+ t_downstreamConnections[remote].push_back(std::move(conn));
}
}
struct ConnectionInfo
{
- ConnectionInfo(): cs(nullptr), fd(-1)
+ ConnectionInfo(ClientState* cs_): cs(cs_), fd(-1)
{
}
ConnectionInfo(ConnectionInfo&& rhs)
close(fd);
fd = -1;
}
+ if (cs) {
+ --cs->tcpCurrentConnections;
+ }
}
ComboAddress remote;
static void cleanupClosedTCPConnections()
{
- for(auto dsIt = t_downstreamSockets.begin(); dsIt != t_downstreamSockets.end(); ) {
- for (auto socketIt = dsIt->second.begin(); socketIt != dsIt->second.end(); ) {
- if (*socketIt && isTCPSocketUsable((*socketIt)->getHandle())) {
- ++socketIt;
+ for(auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
+ for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
+ if (*connIt && isTCPSocketUsable((*connIt)->getHandle())) {
+ ++connIt;
}
else {
- socketIt = dsIt->second.erase(socketIt);
+ connIt = dsIt->second.erase(connIt);
}
}
++dsIt;
}
else {
- dsIt = t_downstreamSockets.erase(dsIt);
+ dsIt = t_downstreamConnections.erase(dsIt);
}
}
}
~IncomingTCPConnectionState()
{
decrementTCPClientCount(d_ci.remote);
+ if (d_ci.cs != nullptr) {
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+
+ auto diff = now - d_connectionStartTime;
+ d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000.0 + diff.tv_usec / 1000.0);
+ }
if (d_ds != nullptr) {
if (d_outstanding) {
d_outstanding = false;
}
- if (d_downstreamSocket) {
+ if (d_downstreamConnection) {
try {
if (d_lastIOState == IOState::NeedRead) {
- cerr<<__func__<<": removing leftover backend read FD "<<d_downstreamSocket->getHandle()<<endl;
- d_threadData.mplexer->removeReadFD(d_downstreamSocket->getHandle());
+ cerr<<__func__<<": removing leftover backend read FD "<<d_downstreamConnection->getHandle()<<endl;
+ d_threadData.mplexer->removeReadFD(d_downstreamConnection->getHandle());
}
else if (d_lastIOState == IOState::NeedWrite) {
- cerr<<__func__<<": removing leftover backend write FD "<<d_downstreamSocket->getHandle()<<endl;
- d_threadData.mplexer->removeWriteFD(d_downstreamSocket->getHandle());
+ cerr<<__func__<<": removing leftover backend write FD "<<d_downstreamConnection->getHandle()<<endl;
+ d_threadData.mplexer->removeWriteFD(d_downstreamConnection->getHandle());
}
}
catch(const FDMultiplexerException& e) {
return now;
}
- boost::optional<struct timeval> getBackendReadTTD() const
+ boost::optional<struct timeval> getBackendReadTTD(const struct timeval& now) const
{
if (d_ds == nullptr) {
throw std::runtime_error("getBackendReadTTD() without any backend selected");
return boost::none;
}
- struct timeval res;
- gettimeofday(&res, 0);
-
+ struct timeval res = now;
res.tv_sec += d_ds->tcpRecvTimeout;
return res;
}
- boost::optional<struct timeval> getClientWriteTTD(boost::optional<struct timeval> now=boost::none) const
+ boost::optional<struct timeval> getClientWriteTTD(const struct timeval& now) const
{
if (g_maxTCPConnectionDuration == 0 && g_tcpSendTimeout == 0) {
return boost::none;
}
- struct timeval res;
- if (now) {
- res = *now;
- }
- else {
- gettimeofday(&res, 0);
- }
+ struct timeval res = now;
if (g_maxTCPConnectionDuration > 0) {
auto elapsed = res.tv_sec - d_connectionStartTime.tv_sec;
return res;
}
- boost::optional<struct timeval> getBackendWriteTTD() const
+ boost::optional<struct timeval> getBackendWriteTTD(const struct timeval& now) const
{
if (d_ds == nullptr) {
throw std::runtime_error("getBackendReadTTD() called without any backend selected");
return boost::none;
}
- struct timeval res;
- gettimeofday(&res, 0);
-
+ struct timeval res = now;
res.tv_sec += d_ds->tcpSendTimeout;
return res;
return false;
}
+ void dump() const
+ {
+ static std::mutex s_mutex;
+
+ struct timeval now;
+ gettimeofday(&now, 0);
+
+ {
+ std::lock_guard<std::mutex> lock(s_mutex);
+ fprintf(stderr, "State is %p\n", this);
+ cerr << "Current state is " << static_cast<int>(d_state) << ", got "<<d_queriesCount<<" queries so far" << endl;
+ cerr << "Current time is " << now.tv_sec << " - " << now.tv_usec << endl;
+ cerr << "Connection started at " << d_connectionStartTime.tv_sec << " - " << d_connectionStartTime.tv_usec << endl;
+ if (d_state > State::doingHandshake) {
+ cerr << "Handshake done at " << d_handshakeDoneTime.tv_sec << " - " << d_handshakeDoneTime.tv_usec << endl;
+ }
+ if (d_state > State::readingQuerySize) {
+ cerr << "Got first query size at " << d_firstQuerySizeReadTime.tv_sec << " - " << d_firstQuerySizeReadTime.tv_usec << endl;
+ }
+ if (d_state > State::readingQuerySize) {
+ cerr << "Got query size at " << d_querySizeReadTime.tv_sec << " - " << d_querySizeReadTime.tv_usec << endl;
+ }
+ if (d_state > State::readingQuery) {
+ cerr << "Got query at " << d_queryReadTime.tv_sec << " - " << d_queryReadTime.tv_usec << endl;
+ }
+ if (d_state > State::sendingQueryToBackend) {
+ cerr << "Sent query at " << d_querySentTime.tv_sec << " - " << d_querySentTime.tv_usec << endl;
+ }
+ if (d_state > State::readingResponseFromBackend) {
+ cerr << "Got response at " << d_responseReadTime.tv_sec << " - " << d_responseReadTime.tv_usec << endl;
+ }
+ }
+ }
+
enum class State { doingHandshake, readingQuerySize, readingQuery, sendingQueryToBackend, readingResponseSizeFromBackend, readingResponseFromBackend, sendingResponse };
std::vector<uint8_t> d_buffer;
IDState d_ids;
ConnectionInfo d_ci;
TCPIOHandler d_handler;
- std::unique_ptr<Socket> d_downstreamSocket{nullptr};
+ std::unique_ptr<TCPConnectionToBackend> d_downstreamConnection{nullptr};
std::shared_ptr<DownstreamState> d_ds{nullptr};
struct timeval d_connectionStartTime;
+ struct timeval d_handshakeDoneTime;
+ struct timeval d_firstQuerySizeReadTime;
+ struct timeval d_querySizeReadTime;
+ struct timeval d_queryReadTime;
+ struct timeval d_querySentTime;
+ struct timeval d_responseReadTime;
size_t d_currentPos{0};
size_t d_queriesCount{0};
unsigned int d_remainingTime{0};
uint16_t d_downstreamFailures{0};
State d_state{State::doingHandshake};
IOState d_lastIOState{IOState::Done};
- bool d_freshDownstreamConnection{false};
bool d_readingFirstQuery{true};
bool d_outstanding{false};
bool d_firstResponsePacket{true};
{
handleNewIOState(state, IOState::Done, state->d_ci.fd, handleIOCallback);
- if (state->d_isXFR && state->d_downstreamSocket) {
+ if (state->d_isXFR && state->d_downstreamConnection) {
/* we need to resume reading from the backend! */
state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend;
state->d_currentPos = 0;
state->d_state = IncomingTCPConnectionState::State::sendingQueryToBackend;
state->d_currentPos = 0;
state->d_firstResponsePacket = true;
- state->d_downstreamSocket.reset();
+ state->d_downstreamConnection.reset();
if (state->d_xfrStarted) {
/* sorry, but we are not going to resume a XFR if we have already sent some packets
while (state->d_downstreamFailures < state->d_ds->retries)
{
- state->d_downstreamSocket = getConnectionToDownstream(ds, state->d_downstreamFailures, state->d_freshDownstreamConnection);
+ state->d_downstreamConnection = getConnectionToDownstream(ds, state->d_downstreamFailures, now);
- if (!state->d_downstreamSocket) {
+ if (!state->d_downstreamConnection) {
++ds->tcpGaveUp;
++state->d_ci.cs->tcpGaveUp;
vinfolog("Downstream connection to %s failed %d times in a row, giving up.", ds->getName(), state->d_downstreamFailures);
static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
{
- if (state->d_downstreamSocket == nullptr) {
+ if (state->d_downstreamConnection == nullptr) {
throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!");
}
- int fd = state->d_downstreamSocket->getHandle();
+ int fd = state->d_downstreamConnection->getHandle();
IOState iostate = IOState::Done;
bool connectionDied = false;
if (state->d_state == IncomingTCPConnectionState::State::sendingQueryToBackend) {
int socketFlags = 0;
#ifdef MSG_FASTOPEN
- if (state->d_ds->tcpFastOpen && state->d_freshDownstreamConnection) {
+ if (state->d_ds->tcpFastOpen && state->d_downstreamConnection->isFresh()) {
socketFlags |= MSG_FASTOPEN;
}
#endif /* MSG_FASTOPEN */
size_t sent = sendMsgWithTimeout(fd, reinterpret_cast<const char *>(&state->d_buffer.at(state->d_currentPos)), state->d_buffer.size() - state->d_currentPos, 0, &state->d_ds->remote, &state->d_ds->sourceAddr, state->d_ds->sourceItf, 0, socketFlags);
if (sent == state->d_buffer.size()) {
/* request sent ! */
+ state->d_downstreamConnection->incQueries();
state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend;
state->d_currentPos = 0;
+ state->d_querySentTime = now;
iostate = IOState::NeedRead;
if (!state->d_isXFR) {
/* don't bother with the outstanding count for XFR queries */
state->d_currentPos += sent;
iostate = IOState::NeedWrite;
/* disable fast open on partial write */
- state->d_freshDownstreamConnection = false;
+ state->d_downstreamConnection->setReused();
}
}
/* but don't reset it either, we will need to read more messages */
}
else {
- releaseDownstreamConnection(state->d_ds, std::move(state->d_downstreamSocket));
+ releaseDownstreamConnection(std::move(state->d_downstreamConnection));
}
fd = -1;
+ state->d_responseReadTime = now;
handleResponse(state, now);
return;
}
}
/* don't increase this counter when reusing connections */
- if (state->d_freshDownstreamConnection) {
+ if (state->d_downstreamConnection->isFresh()) {
++state->d_downstreamFailures;
}
if (state->d_outstanding && state->d_ds != nullptr) {
handleNewIOState(state, iostate, fd, handleDownstreamIOCallback);
}
else {
- handleNewIOState(state, iostate, fd, handleDownstreamIOCallback, iostate == IOState::NeedRead ? state->getBackendReadTTD() : state->getBackendWriteTTD());
+ handleNewIOState(state, iostate, fd, handleDownstreamIOCallback, iostate == IOState::NeedRead ? state->getBackendReadTTD(now) : state->getBackendWriteTTD(now));
}
if (connectionDied) {
static void handleDownstreamIOCallback(int fd, FDMultiplexer::funcparam_t& param)
{
auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
- if (state->d_downstreamSocket == nullptr) {
+ if (state->d_downstreamConnection == nullptr) {
throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!");
}
- if (fd != state->d_downstreamSocket->getHandle()) {
- throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamSocket->getHandle()));
+ if (fd != state->d_downstreamConnection->getHandle()) {
+ throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamConnection->getHandle()));
}
struct timeval now;
if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) {
iostate = state->d_handler.tryHandshake();
if (iostate == IOState::Done) {
+ state->d_handshakeDoneTime = now;
state->d_state = IncomingTCPConnectionState::State::readingQuerySize;
}
}
iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t) - state->d_currentPos);
if (iostate == IOState::Done) {
state->d_state = IncomingTCPConnectionState::State::readingQuery;
+ state->d_querySizeReadTime = now;
+ if (state->d_queriesCount == 0) {
+ state->d_firstQuerySizeReadTime = now;
+ }
state->d_querySize = state->d_buffer.at(0) * 256 + state->d_buffer.at(1);
if (state->d_querySize < sizeof(dnsheader)) {
/* go away */
tcpClientCountIncremented = false;
try {
socklen_t remlen = remote.getSocklen();
- ci = std::unique_ptr<ConnectionInfo>(new ConnectionInfo);
- ci->cs = cs;
+ ci = std::unique_ptr<ConnectionInfo>(new ConnectionInfo(cs));
#ifdef HAVE_ACCEPT4
ci->fd = accept4(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
#else
ci->fd = accept(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
#endif
+ ++cs->tcpCurrentConnections;
+
if(ci->fd < 0) {
throw std::runtime_error((boost::format("accepting new connection on socket: %s") % strerror(errno)).str());
}
output << "# TYPE " << statesbase << "tcpreadtimeouts " << "counter" << "\n";
output << "# HELP " << statesbase << "tcpwritetimeouts " << "The number of TCP write timeouts" << "\n";
output << "# TYPE " << statesbase << "tcpwritetimeouts " << "counter" << "\n";
+ output << "# HELP " << statesbase << "tcpcurrentconnections " << "The number of current TCP connections" << "\n";
+ output << "# TYPE " << statesbase << "tcpcurrentconnections " << "gauge" << "\n";
+ output << "# HELP " << statesbase << "tcpavgqueriesperconn " << "The average number of queries per TCP connection" << "\n";
+ output << "# TYPE " << statesbase << "tcpavgqueriesperconn " << "gauge" << "\n";
+ output << "# HELP " << statesbase << "tcpavgconnduration " << "The average duration of a TCP connection (ms)" << "\n";
+ output << "# TYPE " << statesbase << "tcpavgconnduration " << "gauge" << "\n";
for (const auto& state : *states) {
string serverName;
const std::string label = boost::str(boost::format("{server=\"%1%\",address=\"%2%\"}")
% serverName % state->remote.toStringWithPort());
- output << statesbase << "queries" << label << " " << state->queries.load() << "\n";
- output << statesbase << "drops" << label << " " << state->reuseds.load() << "\n";
- output << statesbase << "latency" << label << " " << state->latencyUsec/1000.0 << "\n";
- output << statesbase << "senderrors" << label << " " << state->sendErrors.load() << "\n";
- output << statesbase << "outstanding" << label << " " << state->outstanding.load() << "\n";
- output << statesbase << "order" << label << " " << state->order << "\n";
- output << statesbase << "weight" << label << " " << state->weight << "\n";
- output << statesbase << "tcpdiedsendingquery" << label << " " << state->tcpDiedSendingQuery << "\n";
- output << statesbase << "tcpdiedreadingresponse" << label << " " << state->tcpDiedReadingResponse << "\n";
- output << statesbase << "tcpgaveup" << label << " " << state->tcpGaveUp << "\n";
- output << statesbase << "tcpreadtimeouts" << label << " " << state->tcpReadTimeouts << "\n";
- output << statesbase << "tcpwritetimeouts" << label << " " << state->tcpWriteTimeouts << "\n";
+ output << statesbase << "queries" << label << " " << state->queries.load() << "\n";
+ output << statesbase << "drops" << label << " " << state->reuseds.load() << "\n";
+ output << statesbase << "latency" << label << " " << state->latencyUsec/1000.0 << "\n";
+ output << statesbase << "senderrors" << label << " " << state->sendErrors.load() << "\n";
+ output << statesbase << "outstanding" << label << " " << state->outstanding.load() << "\n";
+ output << statesbase << "order" << label << " " << state->order << "\n";
+ output << statesbase << "weight" << label << " " << state->weight << "\n";
+ output << statesbase << "tcpdiedsendingquery" << label << " " << state->tcpDiedSendingQuery << "\n";
+ output << statesbase << "tcpdiedreadingresponse" << label << " " << state->tcpDiedReadingResponse << "\n";
+ output << statesbase << "tcpgaveup" << label << " " << state->tcpGaveUp << "\n";
+ output << statesbase << "tcpreadtimeouts" << label << " " << state->tcpReadTimeouts << "\n";
+ output << statesbase << "tcpwritetimeouts" << label << " " << state->tcpWriteTimeouts << "\n";
+ output << statesbase << "tcpcurrentconnections" << label << " " << state->tcpCurrentConnections << "\n";
+ output << statesbase << "tcpavgqueriesperconn" << label << " " << state->tcpAvgQueriesPerConnection << "\n";
+ output << statesbase << "tcpavgconnduration" << label << " " << state->tcpAvgConnectionDuration << "\n";
}
for (const auto& front : g_frontends) {
{"tcpGaveUp", (double)a->tcpGaveUp},
{"tcpReadTimeouts", (double)a->tcpReadTimeouts},
{"tcpWriteTimeouts", (double)a->tcpWriteTimeouts},
+ {"tcpCurrentConnections", (double)a->tcpCurrentConnections},
+ {"tcpAvgQueriesPerConnection", (double)a->tcpAvgQueriesPerConnection},
+ {"tcpAvgConnectionDuration", (double)a->tcpAvgConnectionDuration},
{"dropRate", (double)a->dropRate}
};
{ "tcpGaveUp", (double) front->tcpGaveUp.load() },
{ "tcpClientTimeouts", (double) front->tcpClientTimeouts },
{ "tcpDownstreamTimeouts", (double) front->tcpDownstreamTimeouts },
+ { "tcpCurrentConnections", (double) front->tcpCurrentConnections },
+ { "tcpAvgQueriesPerConnection", (double) front->tcpAvgQueriesPerConnection },
+ { "tcpAvgConnectionDuration", (double) front->tcpAvgConnectionDuration },
};
frontends.push_back(frontend);
}