From: Remi Gacogne Date: Mon, 14 Jun 2021 10:56:50 +0000 (+0200) Subject: dnsdist: Properly resume operations after XFR, add tests X-Git-Tag: dnsdist-1.7.0-alpha1~124^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F10489%2Fhead;p=thirdparty%2Fpdns.git dnsdist: Properly resume operations after XFR, add tests --- diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index b3ad94801a..dd3be5a414 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -477,22 +477,52 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptrd_usedForXFR) { + --conn->d_ds->outstanding; + } + if (d_usedForXFR) { DEBUGLOG("XFR!"); bool done = false; TCPResponse response; response.d_buffer = std::move(d_responseBuffer); response.d_connection = conn; - /* could be a IXFR but that does not matter, - we only need to know that this is a AXFR or IXFR response */ - response.d_idstate.qtype = QType::AXFR; + /* we don't move the whole IDS because we will need for the responses to come */ + response.d_idstate.qtype = it->second.d_idstate.qtype; + response.d_idstate.qname = it->second.d_idstate.qname; + DEBUGLOG("passing XFRresponse to client connection for "<handleXFRResponse(clientConn, now, std::move(response)); if (done) { - conn->d_usedForXFR = false; + d_pendingResponses.erase(it); + /* marking as idle for now, so we can accept new queries if our queues are empty */ + if (d_pendingQueries.empty() && d_pendingResponses.empty()) { + d_state = State::idle; + } clientConn->d_isXFR = false; + conn->d_usedForXFR = false; + } + + clientConn->handleXFRResponse(clientConn, now, std::move(response)); + if (done) { d_state = State::idle; d_clientConn.reset(); return IOState::Done; @@ -505,34 +535,14 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptrd_usedForXFR) { - --conn->d_ds->outstanding; - } - auto ids = std::move(it->second.d_idstate); d_pendingResponses.erase(it); - DEBUGLOG("passing response to client connection for "<handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn)); if (!d_pendingQueries.empty()) { diff --git a/pdns/dnsdistdist/test-dnsdisttcp_cc.cc b/pdns/dnsdistdist/test-dnsdisttcp_cc.cc index 78737a7ca2..b3445422c1 100644 --- a/pdns/dnsdistdist/test-dnsdisttcp_cc.cc +++ b/pdns/dnsdistdist/test-dnsdisttcp_cc.cc @@ -2519,7 +2519,9 @@ BOOST_AUTO_TEST_CASE(test_IncomingConnectionOOOR_BackendOOOR) TEST_INIT("=> AXFR"); PacketBuffer axfrQuery; + PacketBuffer secondQuery; std::vector axfrResponses(3); + PacketBuffer secondResponse; GenericDNSPacketWriter pwAXFRQuery(axfrQuery, DNSName("powerdns.com."), QType::AXFR, QClass::IN, 0); pwAXFRQuery.getHeader()->rd = 0; @@ -2528,12 +2530,26 @@ BOOST_AUTO_TEST_CASE(test_IncomingConnectionOOOR_BackendOOOR) const uint8_t axfrQuerySizeBytes[] = { static_cast(axfrQuerySize / 256), static_cast(axfrQuerySize % 256) }; axfrQuery.insert(axfrQuery.begin(), axfrQuerySizeBytes, axfrQuerySizeBytes + 2); - for (auto& response : axfrResponses) { - DNSName name("powerdns.com."); + const DNSName name("powerdns.com."); + { + /* first message */ + auto& response = axfrResponses.at(0); GenericDNSPacketWriter pwR(response, name, QType::A, QClass::IN, 0); pwR.getHeader()->qr = 1; pwR.getHeader()->id = 42; - // whatever + + /* insert SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(1 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* A record */ pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); pwR.xfr32BitInt(0x01020304); pwR.commit(); @@ -2542,17 +2558,85 @@ BOOST_AUTO_TEST_CASE(test_IncomingConnectionOOOR_BackendOOOR) const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; response.insert(response.begin(), sizeBytes, sizeBytes + 2); } + { + /* second message */ + auto& response = axfrResponses.at(1); + GenericDNSPacketWriter pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + uint16_t responseSize = static_cast(response.size()); + const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + { + /* third message */ + auto& response = axfrResponses.at(2); + GenericDNSPacketWriter pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + /* final SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(1 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + uint16_t responseSize = static_cast(response.size()); + const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + + { + GenericDNSPacketWriter pwSecondQuery(secondQuery, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwSecondQuery.getHeader()->rd = 1; + pwSecondQuery.getHeader()->id = 84; + uint16_t secondQuerySize = static_cast(secondQuery.size()); + const uint8_t secondQuerySizeBytes[] = { static_cast(secondQuerySize / 256), static_cast(secondQuerySize % 256) }; + secondQuery.insert(secondQuery.begin(), secondQuerySizeBytes, secondQuerySizeBytes + 2); + } + + { + GenericDNSPacketWriter pwSecondResponse(secondResponse, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwSecondResponse.getHeader()->qr = 1; + pwSecondResponse.getHeader()->rd = 1; + pwSecondResponse.getHeader()->ra = 1; + pwSecondResponse.getHeader()->id = 84; + pwSecondResponse.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwSecondResponse.xfr32BitInt(0x01020304); + pwSecondResponse.commit(); + uint16_t responseSize = static_cast(secondResponse.size()); + const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; + secondResponse.insert(secondResponse.begin(), sizeBytes, sizeBytes + 2); + } PacketBuffer expectedWriteBuffer; PacketBuffer expectedBackendWriteBuffer; s_readBuffer = axfrQuery; + s_readBuffer.insert(s_readBuffer.end(), secondQuery.begin(), secondQuery.end()); - expectedBackendWriteBuffer = axfrQuery; + expectedBackendWriteBuffer = s_readBuffer; for (const auto& response : axfrResponses) { s_backendReadBuffer.insert(s_backendReadBuffer.end(), response.begin(), response.end()); } + s_backendReadBuffer.insert(s_backendReadBuffer.end(), secondResponse.begin(), secondResponse.end()); expectedWriteBuffer = s_backendReadBuffer; @@ -2585,13 +2669,22 @@ BOOST_AUTO_TEST_CASE(test_IncomingConnectionOOOR_BackendOOOR) { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, axfrResponses.at(2).size() - 2 }, /* sending response (3) to the client */ - { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, axfrResponses.at(2).size() }, - /* trying to read from the backend but blocking and descriptor is no longer ready */ - { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData,&timeout](int desc, const ExpectedStep& step) { - /* the backend descriptor is not ready anymore */ - dynamic_cast(threadData.mplexer.get())->setNotReady(desc); - timeout = true; + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, axfrResponses.at(2).size(), [&threadData](int desc, const ExpectedStep& step) { + /* the client descriptor becomes ready */ + dynamic_cast(threadData.mplexer.get())->setReady(-1); } }, + /* trying to read from the client, getting a second query */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, secondQuery.size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, secondQuery.size() }, + /* reading the response (4) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, secondResponse.size() - 2 }, + /* sending response (4) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, secondResponse.size() }, + /* trying to read from the client, getting EOF */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, /* closing the client connection */ { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, /* closing the backend connection */ @@ -2612,15 +2705,265 @@ BOOST_AUTO_TEST_CASE(test_IncomingConnectionOOOR_BackendOOOR) threadData.mplexer->run(&now); } - struct timeval later = now; - later.tv_sec += backend->tcpRecvTimeout + 1; - auto expiredConns = threadData.mplexer->getTimeouts(later, false); - BOOST_CHECK_EQUAL(expiredConns.size(), 1U); - for (const auto& cbData : expiredConns) { - if (cbData.second.type() == typeid(std::shared_ptr)) { - auto cbState = boost::any_cast>(cbData.second); - cbState->handleTimeout(later, false); - } + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> IXFR"); + + PacketBuffer firstQuery; + PacketBuffer ixfrQuery; + PacketBuffer secondQuery; + PacketBuffer firstResponse; + std::vector ixfrResponses(1); + PacketBuffer secondResponse; + + { + GenericDNSPacketWriter pwFirstQuery(firstQuery, DNSName("powerdns.com."), QType::SOA, QClass::IN, 0); + pwFirstQuery.getHeader()->rd = 1; + pwFirstQuery.getHeader()->id = 84; + uint16_t firstQuerySize = static_cast(firstQuery.size()); + const uint8_t firstQuerySizeBytes[] = { static_cast(firstQuerySize / 256), static_cast(firstQuerySize % 256) }; + firstQuery.insert(firstQuery.begin(), firstQuerySizeBytes, firstQuerySizeBytes + 2); + } + + { + GenericDNSPacketWriter pwFirstResponse(firstResponse, DNSName("powerdns.com."), QType::SOA, QClass::IN, 0); + pwFirstResponse.getHeader()->qr = 1; + pwFirstResponse.getHeader()->rd = 1; + pwFirstResponse.getHeader()->ra = 1; + pwFirstResponse.getHeader()->id = 84; + pwFirstResponse.startRecord(DNSName("powerdns.com."), QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwFirstResponse.xfrName(g_rootdnsname, true); + pwFirstResponse.xfrName(g_rootdnsname, true); + pwFirstResponse.xfr32BitInt(3 /* serial */); + pwFirstResponse.xfr32BitInt(0); + pwFirstResponse.xfr32BitInt(0); + pwFirstResponse.xfr32BitInt(0); + pwFirstResponse.xfr32BitInt(0); + pwFirstResponse.commit(); + + uint16_t responseSize = static_cast(firstResponse.size()); + const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; + firstResponse.insert(firstResponse.begin(), sizeBytes, sizeBytes + 2); + } + + { + GenericDNSPacketWriter pwIXFRQuery(ixfrQuery, DNSName("powerdns.com."), QType::IXFR, QClass::IN, 0); + pwIXFRQuery.getHeader()->rd = 0; + pwIXFRQuery.getHeader()->id = 42; + uint16_t ixfrQuerySize = static_cast(ixfrQuery.size()); + const uint8_t ixfrQuerySizeBytes[] = { static_cast(ixfrQuerySize / 256), static_cast(ixfrQuerySize % 256) }; + ixfrQuery.insert(ixfrQuery.begin(), ixfrQuerySizeBytes, ixfrQuerySizeBytes + 2); + } + + const DNSName name("powerdns.com."); + { + /* first message */ + auto& response = ixfrResponses.at(0); + GenericDNSPacketWriter pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* insert final SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(3 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* insert first SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(1 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* removals */ + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + /* additions */ + /* insert second SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(2 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020305); + pwR.commit(); + /* done with 1 -> 2 */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(2 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* no removal */ + + /* additions */ + /* insert second SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(3 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* actually no addition either */ + /* done */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(3 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + uint16_t responseSize = static_cast(response.size()); + const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + + { + GenericDNSPacketWriter pwSecondQuery(secondQuery, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwSecondQuery.getHeader()->rd = 1; + pwSecondQuery.getHeader()->id = 84; + uint16_t secondQuerySize = static_cast(secondQuery.size()); + const uint8_t secondQuerySizeBytes[] = { static_cast(secondQuerySize / 256), static_cast(secondQuerySize % 256) }; + secondQuery.insert(secondQuery.begin(), secondQuerySizeBytes, secondQuerySizeBytes + 2); + } + + { + GenericDNSPacketWriter pwSecondResponse(secondResponse, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwSecondResponse.getHeader()->qr = 1; + pwSecondResponse.getHeader()->rd = 1; + pwSecondResponse.getHeader()->ra = 1; + pwSecondResponse.getHeader()->id = 84; + pwSecondResponse.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwSecondResponse.xfr32BitInt(0x01020304); + pwSecondResponse.commit(); + uint16_t responseSize = static_cast(secondResponse.size()); + const uint8_t sizeBytes[] = { static_cast(responseSize / 256), static_cast(responseSize % 256) }; + secondResponse.insert(secondResponse.begin(), sizeBytes, sizeBytes + 2); + } + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer = firstQuery; + s_readBuffer.insert(s_readBuffer.end(), ixfrQuery.begin(), ixfrQuery.end()); + s_readBuffer.insert(s_readBuffer.end(), secondQuery.begin(), secondQuery.end()); + + expectedBackendWriteBuffer = s_readBuffer; + + s_backendReadBuffer = firstResponse; + for (const auto& response : ixfrResponses) { + s_backendReadBuffer.insert(s_backendReadBuffer.end(), response.begin(), response.end()); + } + s_backendReadBuffer.insert(s_backendReadBuffer.end(), secondResponse.begin(), secondResponse.end()); + + expectedWriteBuffer = s_backendReadBuffer; + + bool timeout = false; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, firstQuery.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, firstQuery.size() }, + /* no response ready yet, but setting the backend descriptor readable */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc, const ExpectedStep& step) { + /* the backend descriptor becomes ready */ + dynamic_cast(threadData.mplexer.get())->setReady(desc); + } }, + /* try to read a second query from the client, none yet */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 }, + /* read the response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, firstResponse.size() - 2 }, + /* sending response (1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, firstResponse.size(), [&threadData](int desc, const ExpectedStep& step) { + /* client descriptor becomes ready */ + dynamic_cast(threadData.mplexer.get())->setReady(-1); + } }, + /* reading a query from the client (2) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, ixfrQuery.size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, ixfrQuery.size() }, + /* read the response (ixfr 1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, ixfrResponses.at(0).size() - 2 }, + /* sending response (ixfr 1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, ixfrResponses.at(0).size(), [&threadData](int desc, const ExpectedStep& step) { + /* the client descriptor becomes ready */ + dynamic_cast(threadData.mplexer.get())->setReady(-1); + } }, + /* trying to read from the client, getting a second query */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, secondQuery.size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, secondQuery.size() }, + /* reading the response (4) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, secondResponse.size() - 2 }, + /* sending response (4) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, secondResponse.size() }, + /* trying to read from the client, getting EOF */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing the client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing the backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, LocalStateHolder >& localRespRuleActions, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared(ConnectionInfo(&localCS), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); } BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size());