}
}
- if (state->d_isXFR) {
- /* we should still be reading from the backend, and we don't want to read from the client */
- state->d_state = IncomingTCPConnectionState::State::idle;
- state->d_currentPos = 0;
- DEBUGLOG("idling for XFR completion");
- return IOState::Done;
- } else {
- if (state->canAcceptNewQueries()) {
- DEBUGLOG("waiting for new queries");
- state->resetForNewQuery();
- return IOState::NeedRead;
- }
- else {
- DEBUGLOG("idling");
- state->d_state = IncomingTCPConnectionState::State::idle;
- return IOState::Done;
- }
- }
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ return IOState::Done;
}
-static bool handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state)
{
- if (!state->d_isXFR) {
- --state->d_currentQueriesCount;
-
- const auto& currentResponse = state->d_currentResponse;
- if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
- const auto& ds = currentResponse.d_connection->getDS();
- struct timespec answertime;
- gettime(&answertime);
- const auto& ids = currentResponse.d_idstate;
- double udiff = ids.sentTime.udiff();
- g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->remote);
- vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff);
- }
-
- switch (currentResponse.d_cleartextDH.rcode) {
- case RCode::NXDomain:
- ++g_stats.frontendNXDomain;
- break;
- case RCode::ServFail:
- ++g_stats.servfailResponses;
- ++g_stats.frontendServFail;
- break;
- case RCode::NoError:
- ++g_stats.frontendNoError;
- break;
- }
+ if (state->d_isXFR) {
+ return;
+ }
- if (g_maxTCPQueriesPerConn && state->d_queriesCount > g_maxTCPQueriesPerConn) {
- vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", state->d_ci.remote.toStringWithPort(), state->d_queriesCount, g_maxTCPQueriesPerConn);
- return false;
- }
+ --state->d_currentQueriesCount;
- if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
- vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort());
- return false;
- }
+ const auto& currentResponse = state->d_currentResponse;
+ if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
+ const auto& ds = currentResponse.d_connection->getDS();
+ struct timespec answertime;
+ gettime(&answertime);
+ const auto& ids = currentResponse.d_idstate;
+ double udiff = ids.sentTime.udiff();
+ g_rings.insertResponse(answertime, state->d_ci.remote, ids.qname, ids.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->remote);
+ vinfolog("Got answer from %s, relayed to %s (%s), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), udiff);
}
- return true;
+ switch (currentResponse.d_cleartextDH.rcode) {
+ case RCode::NXDomain:
+ ++g_stats.frontendNXDomain;
+ break;
+ case RCode::ServFail:
+ ++g_stats.servfailResponses;
+ ++g_stats.frontendServFail;
+ break;
+ case RCode::NoError:
+ ++g_stats.frontendNoError;
+ break;
+ }
}
-bool IncomingTCPConnectionState::canAcceptNewQueries() const
+bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
{
if (d_isXFR) {
DEBUGLOG("not accepting new queries because used for XFR");
return false;
}
+ if (g_maxTCPQueriesPerConn && d_queriesCount > g_maxTCPQueriesPerConn) {
+ vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, g_maxTCPQueriesPerConn);
+ return false;
+ }
+
+ if (maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
+ vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
+ return false;
+ }
+
return true;
}
d_buffer.resize(sizeof(uint16_t));
d_currentPos = 0;
d_querySize = 0;
- d_state = State::readingQuerySize;
+ d_state = State::waitingForQuery;
}
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getActiveDownstreamConnection(const std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
try {
auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
if (iostate == IOState::Done) {
- DEBUGLOG("response sent");
- if (!handleResponseSent(state, now)) {
- return IOState::Done;
- }
- return sendQueuedResponses(state, now);
+ DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
+ handleResponseSent(state);
+ return iostate;
} else {
- return IOState::NeedWrite;
+ state->d_lastIOBlocked = true;
+ return IOState::NeedWrite;
DEBUGLOG("partial write");
}
}
void IncomingTCPConnectionState::terminateClientConnection()
{
+ cerr<<"terminating client connection"<<endl;
d_queuedResponses.clear();
/* we have already released idle connections that could be reused,
we don't care about the ones still waiting for responses */
d_activeConnectionsToBackend.clear();
/* meaning we will no longer be 'active' when the backend
response or timeout comes in */
- d_ioState->reset();
+ d_ioState.reset();
d_handler.close();
}
-/* called when handling a response or error coming from a backend */
-void IncomingTCPConnectionState::sendOrQueueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
{
- // if we were already reading a query (not the query size, mind you), or sending a response we need to queue the response
- // otherwise we can start sending it right away
+ // queue response
+ state->d_queuedResponses.push_back(std::move(response));
+ DEBUGLOG("queueing response, state is "<<(int)state->d_state<<", queue size is now "<<state->d_queuedResponses.size());
+
+ // when the response comes from a backend, there is a real possibility that we are currently
+ // idle, and thus not trying to send the response right away would make our ref count go to 0.
+ // Even if we are waiting for a query, we will not wake up before the new query arrives or a
+ // timeout occurs
if (state->d_state == IncomingTCPConnectionState::State::idle ||
- state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader ||
- state->d_state == IncomingTCPConnectionState::State::readingQuerySize) {
+ state->d_state == IncomingTCPConnectionState::State::waitingForQuery) {
+ auto iostate = sendQueuedResponses(state, now);
- auto iostate = sendResponse(state, now, std::move(response));
+ if (iostate == IOState::Done && state->canAcceptNewQueries(now)) {
+ state->resetForNewQuery();
+ state->d_state = IncomingTCPConnectionState::State::waitingForQuery;
+ iostate = IOState::NeedRead;
+ }
+
+ // for the same reason we need to update the state right away, nobody will do that for us
state->d_ioState->update(iostate, handleIOCallback, state, iostate == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now));
}
- else {
- // queue response
- state->d_queuedResponses.push_back(std::move(response));
- DEBUGLOG("queueing response because state is "<<(int)state->d_state<<", queue size is now "<<state->d_queuedResponses.size());
- }
}
/* called from the backend code when a new response has been received */
void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState> state, const struct timeval& now, TCPResponse&& response)
{
+ cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle()) {
// if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway
if (response.d_connection->canBeReused()) {
}
if (response.d_buffer.size() < sizeof(dnsheader)) {
+ cerr<<"too small!"<<endl;
return;
}
auto& ids = response.d_idstate;
unsigned int qnameWireLength;
if (!responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getRemote(), qnameWireLength)) {
+ cerr<<"does not match!"<<endl;
return;
}
memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
if (!processResponse(response.d_buffer, state->d_threadData.localRespRulactions, dr, false)) {
+ cerr<<"processResponse failed"<<endl;
return;
}
}
}
- sendOrQueueResponse(state, now, std::move(response));
+ cerr<<"calling queueResponse"<<endl;
+ queueResponse(state, now, std::move(response));
}
-static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+static void handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
{
if (state->d_querySize < sizeof(dnsheader)) {
++g_stats.nonCompliantQueries;
- return IOState::NeedRead;
+ state->terminateClientConnection();
+ return;
}
state->d_readingFirstQuery = false;
TCPResponse response;
state->d_state = IncomingTCPConnectionState::State::idle;
++state->d_currentQueriesCount;
- return state->sendResponse(state, now, std::move(response));
+ state->queueResponse(state, now, std::move(response));
+ return;
}
{
/* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */
auto* dh = reinterpret_cast<dnsheader*>(state->d_buffer.data());
if (!checkQueryHeaders(dh)) {
- return IOState::NeedRead;
+ state->terminateClientConnection();
+ return;
}
if (dh->qdcount == 0) {
response.d_buffer = std::move(state->d_buffer);
state->d_state = IncomingTCPConnectionState::State::idle;
++state->d_currentQueriesCount;
- return state->sendResponse(state, now, std::move(response));
+ state->queueResponse(state, now, std::move(response));
+ return;
}
}
auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds);
if (result == ProcessQueryResult::Drop) {
- return IOState::Done;
+ state->terminateClientConnection();
+ return;
}
// the buffer might have been invalidated by now
response.d_buffer = std::move(state->d_buffer);
state->d_state = IncomingTCPConnectionState::State::idle;
++state->d_currentQueriesCount;
- return state->sendResponse(state, now, std::move(response));
+ state->queueResponse(state, now, std::move(response));
+ return;
}
if (result != ProcessQueryResult::PassToBackend || ds == nullptr) {
- return IOState::Done;
+ state->terminateClientConnection();
+ return;
}
IDState ids;
++state->d_currentQueriesCount;
vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_proxiedRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName());
downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection);
-
- return IOState::NeedRead;
}
void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
{
// why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
// even though the underlying socket is not ready, so we need to actually ask for the data first
- bool wouldBlock = false;
IOState iostate = IOState::Done;
do {
iostate = IOState::Done;
return;
}
+ state->d_lastIOBlocked = false;
+
try {
if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) {
DEBUGLOG("doing handshake");
}
}
else {
- wouldBlock = true;
+ state->d_lastIOBlocked = true;
}
}
- if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) {
+ if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) {
DEBUGLOG("reading proxy protocol header");
do {
iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_proxyProtocolNeed);
}
}
else {
- wouldBlock = true;
+ state->d_lastIOBlocked = true;
}
}
- while (!wouldBlock);
+ while (state->active() && !state->d_lastIOBlocked);
}
- if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingQuerySize) {
+ if (!state->d_lastIOBlocked && (state->d_state == IncomingTCPConnectionState::State::waitingForQuery ||
+ state->d_state == IncomingTCPConnectionState::State::readingQuerySize)) {
DEBUGLOG("reading query size");
iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t));
+ if (state->d_currentPos > 0) {
+ /* if we got at least one byte, we can't go around sending responses */
+ state->d_state = IncomingTCPConnectionState::State::readingQuerySize;
+ }
+
if (iostate == IOState::Done) {
DEBUGLOG("query size received");
state->d_state = IncomingTCPConnectionState::State::readingQuery;
state->d_currentPos = 0;
}
else {
- wouldBlock = true;
+ state->d_lastIOBlocked = true;
}
}
- if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::readingQuery) {
+ if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingQuery) {
DEBUGLOG("reading query");
iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize);
if (iostate == IOState::Done) {
DEBUGLOG("query received");
state->d_buffer.resize(state->d_querySize);
- iostate = handleQuery(state, now);
- // if the query has been passed to a backend, or dropped, we can start
- // reading again, or sending queued responses
- if (iostate == IOState::NeedRead) {
- if (state->d_queuedResponses.empty()) {
- if (state->canAcceptNewQueries()) {
- state->resetForNewQuery();
- }
- else {
- state->d_state = IncomingTCPConnectionState::State::idle;
- iostate = IOState::Done;
- }
- }
- else {
- TCPResponse resp = std::move(state->d_queuedResponses.front());
- state->d_queuedResponses.pop_front();
- ioGuard.release();
- state->d_state = IncomingTCPConnectionState::State::idle;
- iostate = sendResponse(state, now, std::move(resp));
- if (iostate != IOState::Done) {
- wouldBlock = true;
- }
- }
- }
- else if (iostate != IOState::Done) {
- wouldBlock = true;
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ handleQuery(state, now);
+ cerr<<"out of handleQuery, state is "<<(int)state->d_state<<", iostate is "<<(int)iostate<<endl;
+ /* the state might have been updated in the meantime, we don't want to override it
+ in that case */
+ if (state->active() && state->d_state != IncomingTCPConnectionState::State::idle) {
+ iostate = state->d_ioState->getState();
}
}
else {
- wouldBlock = true;
+ state->d_lastIOBlocked = true;
}
}
- if (!wouldBlock && state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
+ if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
DEBUGLOG("sending response");
iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
if (iostate == IOState::Done) {
- DEBUGLOG("response sent");
- if (!handleResponseSent(state, now)) {
- iostate = IOState::Done;
- }
- else {
- iostate = sendQueuedResponses(state, now);
+ DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
+ handleResponseSent(state);
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ }
+ else {
+ state->d_lastIOBlocked = true;
+ }
+ }
+
+ if (state->active() &&
+ !state->d_lastIOBlocked &&
+ iostate == IOState::Done &&
+ (state->d_state == IncomingTCPConnectionState::State::idle ||
+ state->d_state == IncomingTCPConnectionState::State::waitingForQuery))
+ {
+ // try sending querued responses
+ cerr<<"send responses, if any"<<endl;
+ iostate = sendQueuedResponses(state, now);
+
+ if (!state->d_lastIOBlocked && iostate == IOState::Done) {
+ // if the query has been passed to a backend, or dropped, and the responses have been sent,
+ // we can start reading again
+ if (!state->d_isXFR && state->canAcceptNewQueries(now)) {
+ cerr<<"reset for new query"<<endl;
+ state->resetForNewQuery();
+ iostate = IOState::NeedRead;
}
- } else {
- wouldBlock = true;
- DEBUGLOG("partial write");
}
}
if (state->d_state != IncomingTCPConnectionState::State::idle &&
state->d_state != IncomingTCPConnectionState::State::doingHandshake &&
state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader &&
+ state->d_state != IncomingTCPConnectionState::State::waitingForQuery &&
state->d_state != IncomingTCPConnectionState::State::readingQuerySize &&
state->d_state != IncomingTCPConnectionState::State::readingQuery &&
state->d_state != IncomingTCPConnectionState::State::sendingResponse) {
vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(state->d_state));
}
}
- catch(const std::exception& e) {
+ catch (const std::exception& e) {
/* most likely an EOF because the other end closed the connection,
but it might also be a real IO error or something else.
Let's just drop the connection
if (state->d_state == IncomingTCPConnectionState::State::idle ||
state->d_state == IncomingTCPConnectionState::State::doingHandshake ||
state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader ||
+ state->d_state == IncomingTCPConnectionState::State::waitingForQuery ||
state->d_state == IncomingTCPConnectionState::State::readingQuerySize ||
state->d_state == IncomingTCPConnectionState::State::readingQuery) {
++state->d_ci.cs->tcpDiedReadingQuery;
iostate = IOState::Done;
}
+ if (!state->active()) {
+ cerr<<"state is no longer active"<<endl;
+ return;
+ }
+
if (iostate == IOState::Done) {
state->d_ioState->update(iostate, handleIOCallback, state);
}
}
ioGuard.release();
}
- while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !wouldBlock);
+ while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !state->d_lastIOBlocked);
}
void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
state->d_queuedResponses.pop_front();
state->d_state = IncomingTCPConnectionState::State::idle;
try {
- sendOrQueueResponse(state, now, std::move(resp));
+ queueResponse(state, now, std::move(resp));
}
catch (const std::exception& e) {
vinfolog("exception in notifyIOError: %s", e.what());
void IncomingTCPConnectionState::handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
{
- sendOrQueueResponse(state, now, std::move(response));
+ queueResponse(state, now, std::move(response));
}
void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)