std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
-static IOState handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+{
+ IOState result = IOState::Done;
+
+ while (!state->d_queuedResponses.empty()) {
+ DEBUGLOG("queue size is "<<state->d_queuedResponses.size()<<", sending the next one");
+ TCPResponse resp = std::move(state->d_queuedResponses.front());
+ state->d_queuedResponses.pop_front();
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ result = state->sendResponse(state, now, std::move(resp));
+ if (result != IOState::Done) {
+ return result;
+ }
+ }
+
+ if (state->d_isXFR) {
+ /* we should still be reading from the backend, and we don't want to read from the client */
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ state->d_currentPos = 0;
+ DEBUGLOG("idling for XFR completion");
+ return IOState::Done;
+ } else {
+ if (state->canAcceptNewQueries()) {
+ DEBUGLOG("waiting for new queries");
+ state->resetForNewQuery();
+ return IOState::NeedRead;
+ }
+ else {
+ DEBUGLOG("idling");
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ return IOState::Done;
+ }
+ }
+}
+
+static bool handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
{
--state->d_currentQueriesCount;
if (g_maxTCPQueriesPerConn && state->d_queriesCount > g_maxTCPQueriesPerConn) {
vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", state->d_ci.remote.toStringWithPort(), state->d_queriesCount, g_maxTCPQueriesPerConn);
- return IOState::Done;
+ return false;
}
if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort());
- return IOState::Done;
+ return false;
}
}
- if (state->d_queuedResponses.empty()) {
- if (state->d_isXFR) {
- /* we should still be reading from the backend, and we don't want to read from the client */
- state->d_state = IncomingTCPConnectionState::State::idle;
- state->d_currentPos = 0;
- DEBUGLOG("idling for XFR completion");
- return IOState::Done;
- } else {
- if (state->canAcceptNewQueries()) {
- DEBUGLOG("waiting for new queries");
- state->resetForNewQuery();
- return IOState::NeedRead;
- }
- else {
- DEBUGLOG("idling");
- state->d_state = IncomingTCPConnectionState::State::idle;
- return IOState::Done;
- }
- }
- }
- else {
- DEBUGLOG("queue size is "<<state->d_queuedResponses.size()<<", sending the next one");
- TCPResponse resp = std::move(state->d_queuedResponses.front());
- state->d_queuedResponses.pop_front();
- state->d_state = IncomingTCPConnectionState::State::idle;
- state->sendResponse(state, now, std::move(resp));
- return IOState::NeedWrite;
- }
+ return true;
}
bool IncomingTCPConnectionState::canAcceptNewQueries() const
d_activeConnectionsToBackend[conn->getDS()].push_front(conn);
}
-/* this version is called when the buffer has been set and the rules have been processed */
-void IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */
+IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+{
+ state->d_state = IncomingTCPConnectionState::State::sendingResponse;
+
+ uint16_t responseSize = static_cast<uint16_t>(response.d_buffer.size());
+ const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
+ /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
+ that could occur if we had to deal with the size during the processing,
+ especially alignment issues */
+ response.d_buffer.insert(response.d_buffer.begin(), sizeBytes, sizeBytes + 2);
+ state->d_currentPos = 0;
+ state->d_currentResponse = std::move(response);
+
+ auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
+ if (iostate == IOState::Done) {
+ DEBUGLOG("response sent");
+ if (!handleResponseSent(state, now)) {
+ return IOState::Done;
+ }
+ return sendQueuedResponses(state, now);
+ } else {
+ return IOState::NeedWrite;
+ DEBUGLOG("partial write");
+ }
+}
+
+/* called when handling a response or error coming from a backend */
+void IncomingTCPConnectionState::sendOrQueueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
{
// if we already reading a query (not the query size, mind you), or sending a response we need to either queue the response
// otherwise we can start sending it right away
if (state->d_state == IncomingTCPConnectionState::State::idle ||
state->d_state == IncomingTCPConnectionState::State::readingQuerySize) {
- state->d_state = IncomingTCPConnectionState::State::sendingResponse;
-
- uint16_t responseSize = static_cast<uint16_t>(response.d_buffer.size());
- const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
- /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
- that could occur if we had to deal with the size during the processing,
- especially alignment issues */
- response.d_buffer.insert(response.d_buffer.begin(), sizeBytes, sizeBytes + 2);
- state->d_currentPos = 0;
- state->d_currentResponse = std::move(response);
-
- state->d_ioState->update(IOState::NeedWrite, handleIOCallback, state, state->getClientWriteTTD(now));
+ auto iostate = sendResponse(state, now, std::move(response));
+ state->d_ioState->update(iostate, handleIOCallback, state, iostate == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now));
}
else {
// queue response
}
}
-/* this version is called from the backend code when a new response has been received */
-void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+/* called from the backend code when a new response has been received */
+void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState> state, const struct timeval& now, TCPResponse&& response)
{
- // if we have added a TCP Proxy Protocol payload to a connection, don't release it yet, no one else will be able to use it anyway
- if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle() && response.d_connection->canBeReused()) {
- auto& list = state->d_activeConnectionsToBackend.at(response.d_connection->getDS());
- for (auto it = list.begin(); it != list.end(); ++it) {
- if (*it == response.d_connection) {
- DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
- list.erase(it);
- break;
+ if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle()) {
+ // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway
+ if (response.d_connection->canBeReused()) {
+ auto& list = state->d_activeConnectionsToBackend.at(response.d_connection->getDS());
+
+ for (auto it = list.begin(); it != list.end(); ++it) {
+ if (*it == response.d_connection) {
+ response.d_connection->release();
+ DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
+ list.erase(it);
+ break;
+ }
}
}
}
}
}
- sendResponse(state, now, std::move(response));
+ sendOrQueueResponse(state, now, std::move(response));
}
-static bool handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
+static IOState handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
{
if (state->d_querySize < sizeof(dnsheader)) {
++g_stats.nonCompliantQueries;
- return true;
+ return IOState::NeedRead;
}
state->d_readingFirstQuery = false;
response.d_buffer = std::move(*dnsCryptResponse);
state->d_state = IncomingTCPConnectionState::State::idle;
++state->d_currentQueriesCount;
- state->sendResponse(state, now, std::move(response));
- return false;
+ return state->sendResponse(state, now, std::move(response));
}
const auto& dh = reinterpret_cast<dnsheader*>(query);
if (!checkQueryHeaders(dh)) {
- return true;
+ return IOState::NeedRead;
}
uint16_t qtype, qclass;
auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds);
if (result == ProcessQueryResult::Drop) {
- return true;
+ return IOState::Done;
}
if (result == ProcessQueryResult::SendAnswer) {
response.d_buffer = std::move(state->d_buffer);
state->d_state = IncomingTCPConnectionState::State::idle;
++state->d_currentQueriesCount;
- state->sendResponse(state, now, std::move(response));
- return false;
+ return state->sendResponse(state, now, std::move(response));
}
if (result != ProcessQueryResult::PassToBackend || ds == nullptr) {
- return true;
+ return IOState::Done;
}
IDState ids;
vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_ci.remote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName());
downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection);
- return true;
+ return IOState::NeedRead;
}
void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
if (iostate == IOState::Done) {
DEBUGLOG("query received");
- if (handleQuery(state, now)) {
- // if the query has been passed to a backend, or dropped, we can start
- // reading again, or sending queued responses
+ iostate = handleQuery(state, now);
+ // if the query has been passed to a backend, or dropped, we can start
+ // reading again, or sending queued responses
+ if (iostate == IOState::NeedRead) {
if (state->d_queuedResponses.empty()) {
if (state->canAcceptNewQueries()) {
state->resetForNewQuery();
- iostate = IOState::NeedRead;
}
else {
state->d_state = IncomingTCPConnectionState::State::idle;
state->d_queuedResponses.pop_front();
ioGuard.release();
state->d_state = IncomingTCPConnectionState::State::idle;
- state->sendResponse(state, now, std::move(resp));
- return;
+ iostate = sendResponse(state, now, std::move(resp));
}
}
- else {
- /* otherwise the state should already be waiting for
- the socket to be writable */
- ioGuard.release();
- return;
- }
}
else {
wouldBlock = true;
iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
if (iostate == IOState::Done) {
DEBUGLOG("response sent");
- iostate = handleResponseSent(state, now);
+ if (!handleResponseSent(state, now)) {
+ iostate = IOState::Done;
+ }
+ else {
+ iostate = sendQueuedResponses(state, now);
+ }
} else {
wouldBlock = true;
DEBUGLOG("partial write");
}
ioGuard.release();
}
- while (state->d_state == IncomingTCPConnectionState::State::readingQuerySize && iostate == IOState::NeedRead && !wouldBlock);
+ while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !wouldBlock);
}
void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
TCPResponse resp = std::move(state->d_queuedResponses.front());
state->d_queuedResponses.pop_front();
state->d_state = IncomingTCPConnectionState::State::idle;
- sendResponse(state, now, std::move(resp));
+ sendOrQueueResponse(state, now, std::move(resp));
}
else {
// the backend code already tried to reconnect if it was possible
void IncomingTCPConnectionState::handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
{
- sendResponse(state, now, std::move(response));
+ sendOrQueueResponse(state, now, std::move(response));
}
void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
}
}
-IOState TCPConnectionToBackend::sendNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
+void TCPConnectionToBackend::release()
+{
+ d_clientConn.reset();
+ if (d_ioState) {
+ d_ioState.reset();
+ }
+}
+
+IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
{
conn->d_currentQuery = std::move(conn->d_pendingQueries.front());
conn->d_pendingQueries.pop_front();
return IOState::Done;
}
+IOState TCPConnectionToBackend::sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
+{
+ int fd = conn->d_socket->getHandle();
+ DEBUGLOG("sending query to backend "<<conn->getDS()->getName()<<" over FD "<<fd);
+ int socketFlags = 0;
+#ifdef MSG_FASTOPEN
+ if (conn->isFastOpenEnabled()) {
+ socketFlags |= MSG_FASTOPEN;
+ }
+#endif /* MSG_FASTOPEN */
+
+ size_t sent = sendMsgWithOptions(fd, reinterpret_cast<const char *>(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags);
+ if (sent == conn->d_currentQuery.d_buffer.size()) {
+ DEBUGLOG("query sent to backend");
+ /* request sent ! */
+ conn->incQueries();
+ conn->d_currentPos = 0;
+
+ DEBUGLOG("adding a pending response for ID "<<conn->d_currentQuery.d_idstate.origID<<" and QNAME "<<conn->d_currentQuery.d_idstate.qname);
+ conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery);
+ conn->d_currentQuery.d_buffer.clear();
+
+ return IOState::Done;
+ }
+ else {
+ conn->d_currentPos += sent;
+ /* disable fast open on partial write */
+ conn->disableFastOpen();
+ return IOState::NeedWrite;
+ }
+}
+
void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
{
if (conn->d_socket == nullptr) {
try {
if (conn->d_state == State::sendingQueryToBackend) {
- DEBUGLOG("sending query to backend "<<conn->getDS()->getName()<<" over FD "<<fd);
- int socketFlags = 0;
-#ifdef MSG_FASTOPEN
- if (conn->isFastOpenEnabled()) {
- socketFlags |= MSG_FASTOPEN;
+ iostate = sendQuery(conn, now);
+
+ while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
+ queueNextQuery(conn);
+ iostate = sendQuery(conn, now);
}
-#endif /* MSG_FASTOPEN */
- size_t sent = sendMsgWithOptions(fd, reinterpret_cast<const char *>(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags);
- if (sent == conn->d_currentQuery.d_buffer.size()) {
- DEBUGLOG("query sent to backend");
- /* request sent ! */
- conn->incQueries();
+ if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
+ conn->d_state = State::readingResponseSizeFromBackend;
conn->d_currentPos = 0;
-
- DEBUGLOG("adding a pending response for ID "<<conn->d_currentQuery.d_idstate.origID<<" and QNAME "<<conn->d_currentQuery.d_idstate.qname);
- conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery);
- conn->d_currentQuery.d_buffer.clear();
-
- if (conn->d_pendingQueries.empty()) {
- conn->d_state = State::readingResponseSizeFromBackend;
- conn->d_currentPos = 0;
- conn->d_responseBuffer.resize(sizeof(uint16_t));
- iostate = IOState::NeedRead;
- }
- else {
- iostate = sendNextQuery(conn);
- }
- }
- else {
- conn->d_currentPos += sent;
- iostate = IOState::NeedWrite;
- /* disable fast open on partial write */
- conn->disableFastOpen();
+ conn->d_responseBuffer.resize(sizeof(uint16_t));
+ iostate = IOState::NeedRead;
}
}
// resume sending query
}
else {
- iostate = sendNextQuery(conn);
+ if (conn->d_pendingQueries.empty()) {
+ throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
+ }
+
+ iostate = queueNextQuery(conn);
}
if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) {
}
}
- if (iostate == IOState::Done) {
- conn->d_ioState->update(iostate, handleIOCallback, conn);
- }
- else {
- conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
+ if (conn->d_ioState) {
+ if (iostate == IOState::Done) {
+ conn->d_ioState->update(iostate, handleIOCallback, conn);
+ }
+ else {
+ conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
+ }
}
+
ioGuard.release();
}
struct timeval now;
gettimeofday(&now, 0);
- d_ioState->update(IOState::NeedWrite, handleIOCallback, sharedSelf, getBackendWriteTTD(now));
+ handleIO(sharedSelf, now);
+ // d_ioState->update(IOState::NeedWrite, handleIOCallback, sharedSelf, getBackendWriteTTD(now));
}
else {
// store query in the list of queries to send
d_downstreamFailures = 0;
auto& clientConn = d_clientConn;
- if (!clientConn->active()) {
+ if (!clientConn || !clientConn->active()) {
// a client timeout occured, or something like that */
d_connectionDied = true;
d_clientConn.reset();
auto ids = std::move(it->second.d_idstate);
d_pendingResponses.erase(it);
DEBUGLOG("passing response to client connection for "<<ids.qname);
+ /* marking as idle for now, so we can accept new queries if our queues are empty */
+ if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
+ d_state = State::idle;
+ }
clientConn->handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
if (!d_pendingQueries.empty()) {