IOState state = conn->d_handler->tryWrite(conn->d_currentQuery.d_query.d_buffer, conn->d_currentPos, conn->d_currentQuery.d_query.d_buffer.size());
if (state != IOState::Done) {
+ conn->d_lastIOBlocked = true;
return state;
}
throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!");
}
+ if (conn->d_handlingIO) {
+ return;
+ }
+ conn->d_handlingIO = true;
+ dnsdist::tcp::HandlingIOGuard reentryGuard(conn->d_handlingIO);
+
bool connectionDied = false;
IOState iostate = IOState::Done;
IOStateGuard ioGuard(conn->d_ioState);
do {
reconnected = false;
+ conn->d_lastIOBlocked = false;
try {
if (conn->d_state == State::sendingQueryToBackend) {
conn->d_currentPos = 0;
conn->d_lastDataReceivedTime = now;
}
- else if (conn->d_state == State::waitingForResponseFromBackend && conn->d_currentPos > 0) {
- conn->d_state = State::readingResponseSizeFromBackend;
+ else {
+ conn->d_lastIOBlocked = true;
+ if (conn->d_state == State::waitingForResponseFromBackend && conn->d_currentPos > 0) {
+ conn->d_state = State::readingResponseSizeFromBackend;
+ }
}
}
return;
}
}
+ else {
+ conn->d_lastIOBlocked = true;
+ }
}
if (conn->d_state != State::idle &&
}
}
}
- while (reconnected);
+ while (reconnected || (iostate != IOState::Done && !conn->d_connectionDied && !conn->d_lastIOBlocked));
ioGuard.release();
}
DEBUGLOG("still have some queries to send");
return queueNextQuery(shared);
}
- else if (!d_pendingResponses.empty()) {
+ if (d_state == State::sendingQueryToBackend) {
+ DEBUGLOG("still have a query to send");
+ return IOState::NeedWrite;
+ }
+ if (!d_pendingResponses.empty()) {
DEBUGLOG("still have some responses to read");
return IOState::NeedRead;
}
- else {
- DEBUGLOG("nothing to do, waiting for a new query");
- d_state = State::idle;
- t_downstreamTCPConnectionsManager.moveToIdle(conn);
- return IOState::Done;
- }
+
+ DEBUGLOG("nothing to do, waiting for a new query");
+ d_state = State::idle;
+ t_downstreamTCPConnectionsManager.moveToIdle(conn);
+ return IOState::Done;
}
uint16_t TCPConnectionToBackend::getQueryIdFromResponse() const
}
{
-#if 1
TEST_INIT("=> 10k self-generated pipelined on the same connection");
/* 10k self-generated REFUSED pipelined on the same connection */
auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now);
state->handleIO();
BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size() * count);
-#endif
}
{
}
{
-#if 1
/* 101 queries on the same connection, check that the maximum number of queries kicks in */
TEST_INIT("=> 101 queries on the same connection");
dnsdist::configuration::updateRuntimeConfiguration([](dnsdist::configuration::RuntimeConfiguration& config) {
config.d_maxTCPQueriesPerConn = 0;
});
-#endif
}
{
/* set the client descriptor as NOT ready */
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc);
} },
- /* reading from the client (not ready) */
- { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 },
/* reading a response from the backend (5) */
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() - 2 },
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() },
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc);
timeout = true;
} },
+ /* reading from the client (not ready) */
+ { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 },
/* A timeout occurs */
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 },
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 },
/* sending it to the client */
- { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size(), [&threadData](int desc) {
+ { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size(), [&threadData,&backend1Desc](int desc) {
/* client becomes readable */
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc);
+ /* first backend is no longer readable */
+ dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backend1Desc);
} },
+ /* no response ready from the backend yet */
+ { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 },
/* reading a query from the client (5) */
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 },
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(4).size() - 2, [&threadData](int desc) {
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backend1Desc);
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backend2Desc);
} },
+ /* no more response ready yet from backend 1 */
+ { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 },
/* reading response (3) from the second backend (2) */
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 },
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(2).size() - 2 },
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backend2Desc);
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backend1Desc);
} },
+ /* no more response ready yet from backend 2 */
+ { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 },
/* reading response (5) from the first backend (1) */
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 },
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() - 2 },
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backend1Desc);
dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backend2Desc);
} },
+ /* no more response ready yet from backend 1 */
+ { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 },
/* reading response (4) from the second backend (2) */
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 },
{ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(3).size() - 2 },