#include "dnsdist-xpf.hh"
#include "dnsparser.hh"
#include "dolog.hh"
-#include "ednsoptions.hh"
#include "gettime.hh"
#include "lock.hh"
#include "sstuff.hh"
{
std::shared_ptr<TCPConnectionToBackend> result;
- const auto& it = t_downstreamConnections.find(ds->remote);
+ const auto& it = t_downstreamConnections.find(ds);
if (it != t_downstreamConnections.end()) {
auto& list = it->second;
if (!list.empty()) {
return;
}
- const auto& remote = conn->getRemote();
- const auto& it = t_downstreamConnections.find(remote);
+ const auto& ds = conn->getDS();
+ const auto& it = t_downstreamConnections.find(ds);
if (it != t_downstreamConnections.end()) {
auto& list = it->second;
if (list.size() >= s_maxCachedConnectionsPerDownstream) {
list.push_back(std::move(conn));
}
else {
- t_downstreamConnections[remote].push_back(std::move(conn));
+ t_downstreamConnections[ds].push_back(std::move(conn));
}
}
}
private:
- static thread_local map<ComboAddress, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
+ static thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
static const size_t s_maxCachedConnectionsPerDownstream;
};
-thread_local map<ComboAddress, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
+thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> DownstreamConnectionsManager::t_downstreamConnections;
const size_t DownstreamConnectionsManager::s_maxCachedConnectionsPerDownstream{20};
static void decrementTCPClientCount(const ComboAddress& client)
IncomingTCPConnectionState::~IncomingTCPConnectionState()
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
decrementTCPClientCount(d_ci.remote);
if (d_ci.cs != nullptr) {
}
if (state->d_queuedResponses.empty()) {
- DEBUGLOG("no response remaining");
if (state->d_isXFR) {
/* we should still be reading from the backend, and we don't want to read from the client */
state->d_state = IncomingTCPConnectionState::State::idle;
DEBUGLOG("idling for XFR completion");
return IOState::Done;
} else {
- DEBUGLOG("reading new queries if any");
if (state->canAcceptNewQueries()) {
+ DEBUGLOG("waiting for new queries");
state->resetForNewQuery();
return IOState::NeedRead;
}
else {
+ DEBUGLOG("idling");
state->d_state = IncomingTCPConnectionState::State::idle;
return IOState::Done;
}
return false;
}
- // d_state ?
if (d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) {
DEBUGLOG("not accepting new queries because we already have "<<d_currentQueriesCount<<" out of "<<d_ci.cs->d_maxInFlightQueriesPerConn);
return false;
}
- DEBUGLOG("accepting new queries");
return true;
}
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getActiveDownstreamConnection(const std::shared_ptr<DownstreamState>& ds)
{
- auto it = d_activeConnectionsToBackend.find(ds->remote);
+ auto it = d_activeConnectionsToBackend.find(ds);
if (it == d_activeConnectionsToBackend.end()) {
- DEBUGLOG("no active connection found for "<<ds->remote.toString());
+ DEBUGLOG("no active connection found for "<<ds->getName());
return nullptr;
}
for (auto& conn : it->second) {
if (conn->canAcceptNewQueries()) {
- DEBUGLOG("Got one active connection accepting more for "<<ds->remote.toString());
+ DEBUGLOG("Got one active connection accepting more for "<<ds->getName());
return conn;
}
- DEBUGLOG("not accepting more for "<<ds->remote.toString());
+ DEBUGLOG("not accepting more for "<<ds->getName());
}
return nullptr;
void IncomingTCPConnectionState::registerActiveDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
{
- d_activeConnectionsToBackend[conn->getRemote()].push_front(conn);
+ d_activeConnectionsToBackend[conn->getDS()].push_front(conn);
}
/* this version is called when the buffer has been set and the rules have been processed */
void IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
// if we already reading a query (not the query size, mind you), or sending a response we need to either queue the response
// otherwise we can start sending it right away
if (state->d_state == IncomingTCPConnectionState::State::idle ||
/* this version is called from the backend code when a new response has been received */
void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
- if (response.d_connection && response.d_connection->isIdle()) {
- auto& list = d_activeConnectionsToBackend.at(response.d_connection->getRemote());
+ // if we have added a TCP Proxy Protocol payload to a connection, don't release it yet, no one else will be able to use it anyway
+ if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle() && response.d_connection->canBeReused()) {
+ auto& list = d_activeConnectionsToBackend.at(response.d_connection->getDS());
for (auto it = list.begin(); it != list.end(); ++it) {
if (*it == response.d_connection) {
DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
// why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
// even though the underlying socket is not ready, so we need to actually ask for the data first
bool wouldBlock = false;
DEBUGLOG("query received");
if (handleQuery(state, now)) {
- DEBUGLOG("handle query returned true");
// if the query has been passed to a backend, or dropped, we can start
// reading again, or sending queued responses
if (state->d_queuedResponses.empty()) {
else {
/* otherwise the state should already be waiting for
the socket to be writable */
- DEBUGLOG("should be waiting for writable socket");
ioGuard.release();
return;
}
void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
if (d_state == State::sendingResponse) {
/* if we have responses to send, let's do that first */
}
void TCPConnectionToBackend::assignToClientConnection(std::shared_ptr<IncomingTCPConnectionState>& clientConn, bool isXFR)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
+ if (d_usedForXFR == true) {
+ throw std::runtime_error("Trying to send a query over a backend connection used for XFR");
+ }
+
if (isXFR) {
d_usedForXFR = true;
}
- d_clientConn = clientConn;
- d_ioState = make_unique<IOStateHandler>(clientConn->getIOMPlexer(), d_socket->getHandle());
+ if (!d_clientConn) {
+ d_clientConn = clientConn;
+ d_ioState = make_unique<IOStateHandler>(clientConn->getIOMPlexer(), d_socket->getHandle());
+ }
+ else if (d_clientConn != clientConn) {
+ throw std::runtime_error("Assigning a query from a different client to an existing backend connection with pending queries");
+ }
}
IOState TCPConnectionToBackend::sendNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
if (conn->d_socket == nullptr) {
throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!");
}
try {
if (conn->d_state == State::sendingQueryToBackend) {
- DEBUGLOG("sending query to backend over FD "<<fd);
+ DEBUGLOG("sending query to backend "<<conn->getDS()->getName()<<" over FD "<<fd);
int socketFlags = 0;
#ifdef MSG_FASTOPEN
if (conn->isFastOpenEnabled()) {
/* request sent ! */
conn->incQueries();
conn->d_currentPos = 0;
- //conn->d_currentQuery.d_querySentTime = now;
+
DEBUGLOG("adding a pending response for ID "<<conn->d_currentQuery.d_idstate.origID<<" and QNAME "<<conn->d_currentQuery.d_idstate.qname);
conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery);
conn->d_currentQuery.d_buffer.clear();
// then we need to allocate a new buffer (new because we might need to re-send the query if the
// backend dies on us)
// We also might need to read and send to the client more than one response in case of XFR (yeah!)
- // should very likely be a TCPIOHandler d_downstreamHandler
+ // should very likely be a TCPIOHandler
conn->d_responseBuffer.resize(sizeof(uint16_t));
iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos);
if (iostate == IOState::Done) {
iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos);
if (iostate == IOState::Done) {
DEBUGLOG("got response from backend");
- //conn->d_responseReadTime = now;
try {
iostate = conn->handleResponse(conn, now);
}
++conn->d_downstreamFailures;
}
-#if 0
- if (conn->d_outstanding) {
- conn->d_outstanding = false;
-
- if (conn->d_ds != nullptr) {
- --conn->d_ds->outstanding;
- }
- }
-#endif
/* remove this FD from the IO multiplexer */
iostate = IOState::Done;
connectionDied = true;
DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->retries);
if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) {
- DEBUGLOG("reconnecting");
+
conn->d_ioState->reset();
ioGuard.release();
if (conn->reconnect()) {
- DEBUGLOG("reconnected");
-
conn->d_ioState = make_unique<IOStateHandler>(conn->d_clientConn->getIOMPlexer(), conn->d_socket->getHandle());
+ /* we need to resend the queries that were in flight, if any */
for (auto& pending : conn->d_pendingResponses) {
conn->d_pendingQueries.push_back(std::move(pending.second));
}
void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr<TCPConnectionToBackend>& sharedSelf)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
if (d_ioState == nullptr) {
throw std::runtime_error("Trying to queue a query to a TCP connection that has no incoming client connection assigned");
}
d_proxyProtocolPayloadAdded = true;
}
- DEBUGLOG("need write");
-
struct timeval now;
gettimeofday(&now, 0);
// store query in the list of queries to send
d_pendingQueries.push_back(std::move(query));
}
- DEBUGLOG("out of "<<__PRETTY_FUNCTION__);
}
bool TCPConnectionToBackend::reconnect()
do {
vinfolog("TCP connecting to downstream %s (%d)", d_ds->getNameWithAddr(), d_downstreamFailures);
+ DEBUGLOG("Opening TCP connection to backend "<<d_ds->getNameWithAddr());
try {
result = std::unique_ptr<Socket>(new Socket(d_ds->remote.sin4.sin_family, SOCK_STREAM, 0));
DEBUGLOG("result of connect is "<<result->getHandle());
+
if (!IsAnyAddress(d_ds->sourceAddr)) {
SSetsockopt(result->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
#ifdef IP_BIND_ADDRESS_NO_PORT
#endif /* MSG_FASTOPEN */
d_socket = std::move(result);
- DEBUGLOG("connected new socket "<<d_socket->getHandle());
++d_ds->tcpCurrentConnections;
return true;
}
void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, bool timeout)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
d_connectionDied = true;
auto& clientConn = d_clientConn;
++clientConn->d_ci.cs->tcpDownstreamTimeouts;
}
- if (d_state == State::doingHandshake || d_state == State::sendingQueryToBackend) {
+ if (d_state == State::sendingQueryToBackend) {
clientConn->notifyIOError(clientConn, std::move(d_currentQuery.d_idstate), now);
}
IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
{
- DEBUGLOG("in "<<__PRETTY_FUNCTION__);
-
d_downstreamFailures = 0;
auto& clientConn = d_clientConn;
if (!clientConn->active()) {
- DEBUGLOG("client is not active");
// a client timeout occured, or something like that */
d_connectionDied = true;
d_clientConn.reset();
d_state = State::readingResponseSizeFromBackend;
d_currentPos = 0;
d_responseBuffer.resize(sizeof(uint16_t));
- return IOState::NeedRead;
// get ready to read the next packet, if any
+ return IOState::NeedRead;
}
else {
- DEBUGLOG("not XFR, phew");
uint16_t queryId = 0;
try {
queryId = getQueryIdFromResponse();
notifyAllQueriesFailed(now);
return IOState::Done;
}
+
auto ids = std::move(it->second.d_idstate);
d_pendingResponses.erase(it);
DEBUGLOG("passing response to client connection for "<<ids.qname);
return IOState::NeedRead;
}
else {
- DEBUGLOG("nothing to do, phewwwww");
+ DEBUGLOG("nothing to do, waiting for a new query");
d_state = State::idle;
d_clientConn.reset();
return IOState::Done;