}
}
+ static void clear()
+ {
+ t_downstreamConnections.clear();
+ }
+
private:
static thread_local map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
static const size_t s_maxCachedConnectionsPerDownstream;
d_handler.close();
}
+void IncomingTCPConnectionState::clearAllDownstreamConnections()
+{
+ DownstreamConnectionsManager::clear();
+}
+
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
{
std::shared_ptr<TCPConnectionToBackend> downstream{nullptr};
return IOState::Done;
}
-static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state)
+static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, const TCPResponse& currentResponse)
{
if (state->d_isXFR) {
return;
--state->d_currentQueriesCount;
- const auto& currentResponse = state->d_currentResponse;
if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
const auto& ds = currentResponse.d_connection->getDS();
struct timespec answertime;
bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
{
+ if (d_hadErrors) {
+ DEBUGLOG("not accepting new queries because we encountered some error during the processing already");
+ return false;
+ }
+
if (d_isXFR) {
DEBUGLOG("not accepting new queries because used for XFR");
return false;
auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
if (iostate == IOState::Done) {
DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
- handleResponseSent(state);
+ handleResponseSent(state, state->d_currentResponse);
return iostate;
} else {
state->d_lastIOBlocked = true;
void IncomingTCPConnectionState::terminateClientConnection()
{
- cerr<<"terminating client connection"<<endl;
+ DEBUGLOG("terminating client connection");
d_queuedResponses.clear();
/* we have already released idle connections that could be reused,
we don't care about the ones still waiting for responses */
state->d_state == IncomingTCPConnectionState::State::waitingForQuery) {
auto iostate = sendQueuedResponses(state, now);
- if (iostate == IOState::Done && state->canAcceptNewQueries(now)) {
- state->resetForNewQuery();
- state->d_state = IncomingTCPConnectionState::State::waitingForQuery;
- iostate = IOState::NeedRead;
+ if (iostate == IOState::Done) {
+ if (state->canAcceptNewQueries(now)) {
+ state->resetForNewQuery();
+ state->d_state = IncomingTCPConnectionState::State::waitingForQuery;
+ iostate = IOState::NeedRead;
+ }
+ else {
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ }
}
// for the same reason we need to update the state right away, nobody will do that for us
/* called from the backend code when a new response has been received */
void IncomingTCPConnectionState::handleResponse(std::shared_ptr<IncomingTCPConnectionState> state, const struct timeval& now, TCPResponse&& response)
{
- cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle()) {
// if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway
if (response.d_connection->canBeReused()) {
}
if (response.d_buffer.size() < sizeof(dnsheader)) {
- cerr<<"too small!"<<endl;
+ state->terminateClientConnection();
return;
}
- auto& ids = response.d_idstate;
- unsigned int qnameWireLength;
- if (!responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getRemote(), qnameWireLength)) {
- cerr<<"does not match!"<<endl;
- return;
- }
+ try {
+ auto& ids = response.d_idstate;
+ unsigned int qnameWireLength;
+ if (!responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getRemote(), qnameWireLength)) {
+ state->terminateClientConnection();
+ return;
+ }
- DNSResponse dr = makeDNSResponseFromIDState(ids, response.d_buffer, true);
+ DNSResponse dr = makeDNSResponseFromIDState(ids, response.d_buffer, true);
- memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
+ memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
- if (!processResponse(response.d_buffer, state->d_threadData.localRespRulactions, dr, false)) {
- cerr<<"processResponse failed"<<endl;
+ if (!processResponse(response.d_buffer, state->d_threadData.localRespRulactions, dr, false)) {
+ state->terminateClientConnection();
+ return;
+ }
+ }
+ catch (const std::exception& e) {
+ vinfolog("Unxpected exception while handling response from backend: %s", e.what());
+ state->terminateClientConnection();
return;
}
}
}
- cerr<<"calling queueResponse"<<endl;
queueResponse(state, now, std::move(response));
}
}
if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) {
- DEBUGLOG("reading proxy protocol header");
do {
+ DEBUGLOG("reading proxy protocol header");
iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_proxyProtocolNeed);
if (iostate == IOState::Done) {
state->d_buffer.resize(state->d_currentPos);
state->d_state = IncomingTCPConnectionState::State::idle;
handleQuery(state, now);
- cerr<<"out of handleQuery, state is "<<(int)state->d_state<<", iostate is "<<(int)iostate<<endl;
/* the state might have been updated in the meantime, we don't want to override it
in that case */
if (state->active() && state->d_state != IncomingTCPConnectionState::State::idle) {
iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
if (iostate == IOState::Done) {
DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
- handleResponseSent(state);
+ handleResponseSent(state, state->d_currentResponse);
state->d_state = IncomingTCPConnectionState::State::idle;
}
else {
state->d_state == IncomingTCPConnectionState::State::waitingForQuery))
{
// try sending querued responses
- cerr<<"send responses, if any"<<endl;
+ DEBUGLOG("send responses, if any");
iostate = sendQueuedResponses(state, now);
if (!state->d_lastIOBlocked && iostate == IOState::Done) {
// if the query has been passed to a backend, or dropped, and the responses have been sent,
// we can start reading again
- if (!state->d_isXFR && state->canAcceptNewQueries(now)) {
- cerr<<"reset for new query"<<endl;
- state->resetForNewQuery();
- iostate = IOState::NeedRead;
+ if (!state->d_isXFR) {
+ if (state->canAcceptNewQueries(now)) {
+ state->resetForNewQuery();
+ iostate = IOState::NeedRead;
+ }
+ else {
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ iostate = IOState::Done;
+ }
}
}
}
DEBUGLOG("Closing TCP client connection: "<<e.what());
}
/* remove this FD from the IO multiplexer */
- iostate = IOState::Done;
+ state->terminateClientConnection();
}
if (!state->active()) {
- cerr<<"state is no longer active"<<endl;
+ DEBUGLOG("state is no longer active");
return;
}
void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
{
--state->d_currentQueriesCount;
+ state->d_hadErrors = true;
if (state->d_state == State::sendingResponse) {
/* if we have responses to send, let's do that first */
}
else {
// the backend code already tried to reconnect if it was possible
- state->d_ioState->reset();
+ state->terminateClientConnection();
}
}
void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
{
+ vinfolog("Timeout while %s TCP client %s", (write ? "writing to" : "reading from"), state->d_ci.remote.toStringWithPort());
DEBUGLOG("client timeout");
DEBUGLOG("Processed "<<state->d_queriesCount<<" queries, current count is "<<state->d_currentQueriesCount<<", "<<state->d_activeConnectionsToBackend.size()<<" active connections, "<<state->d_queuedResponses.size()<<" response queued");
if (write || state->d_currentQueriesCount == 0) {
++state->d_ci.cs->tcpClientTimeouts;
- state->d_ioState->reset();
+ state->d_ioState.reset();
}
else {
DEBUGLOG("Going idle");
/* we still have some queries in flight, let's just stop reading for now */
+ state->d_hadErrors = true;
state->d_state = IncomingTCPConnectionState::State::idle;
state->d_ioState->update(IOState::Done, handleIOCallback, state);
bool connectionDied = false;
IOState iostate = IOState::Done;
IOStateGuard ioGuard(conn->d_ioState);
+ bool reconnected = false;
- try {
- if (conn->d_state == State::sendingQueryToBackend) {
- iostate = sendQuery(conn, now);
+ do {
+ reconnected = false;
- while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
- queueNextQuery(conn);
+ try {
+ if (conn->d_state == State::sendingQueryToBackend) {
iostate = sendQuery(conn, now);
+
+ while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
+ queueNextQuery(conn);
+ iostate = sendQuery(conn, now);
+ }
+
+ if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
+ conn->d_state = State::readingResponseSizeFromBackend;
+ conn->d_currentPos = 0;
+ conn->d_responseBuffer.resize(sizeof(uint16_t));
+ iostate = IOState::NeedRead;
+ }
}
- if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
- conn->d_state = State::readingResponseSizeFromBackend;
- conn->d_currentPos = 0;
+ if (conn->d_state == State::readingResponseSizeFromBackend) {
+ DEBUGLOG("reading response size from backend");
+ // then we need to allocate a new buffer (new because we might need to re-send the query if the
+ // backend dies on us)
+ // We also might need to read and send to the client more than one response in case of XFR (yeah!)
conn->d_responseBuffer.resize(sizeof(uint16_t));
- iostate = IOState::NeedRead;
+ iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t));
+ if (iostate == IOState::Done) {
+ DEBUGLOG("got response size from backend");
+ conn->d_state = State::readingResponseFromBackend;
+ conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
+ conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
+ conn->d_responseBuffer.resize(conn->d_responseSize);
+ conn->d_currentPos = 0;
+ }
}
- }
- if (conn->d_state == State::readingResponseSizeFromBackend) {
- DEBUGLOG("reading response size from backend");
- // then we need to allocate a new buffer (new because we might need to re-send the query if the
- // backend dies on us)
- // We also might need to read and send to the client more than one response in case of XFR (yeah!)
- conn->d_responseBuffer.resize(sizeof(uint16_t));
- iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos);
- if (iostate == IOState::Done) {
- DEBUGLOG("got response size from backend");
- conn->d_state = State::readingResponseFromBackend;
- conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
- conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
- conn->d_responseBuffer.resize(conn->d_responseSize);
- conn->d_currentPos = 0;
+ if (conn->d_state == State::readingResponseFromBackend) {
+ DEBUGLOG("reading response from backend");
+ iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize);
+ if (iostate == IOState::Done) {
+ DEBUGLOG("got response from backend");
+ try {
+ iostate = conn->handleResponse(conn, now);
+ }
+ catch (const std::exception& e) {
+ vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getName() : "unknown", conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
+ ioGuard.release();
+ conn->release();
+ return;
+ }
+ }
}
- }
- if (conn->d_state == State::readingResponseFromBackend) {
- DEBUGLOG("reading response from backend");
- iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos);
- if (iostate == IOState::Done) {
- DEBUGLOG("got response from backend");
- try {
- iostate = conn->handleResponse(conn, now);
- }
- catch (const std::exception& e) {
- vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getName() : "unknown", conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
- ioGuard.release();
- conn->release();
- return;
- }
+ if (conn->d_state != State::idle &&
+ conn->d_state != State::sendingQueryToBackend &&
+ conn->d_state != State::readingResponseSizeFromBackend &&
+ conn->d_state != State::readingResponseFromBackend) {
+ vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn->d_state));
}
}
+ catch (const std::exception& e) {
+ /* most likely an EOF because the other end closed the connection,
+ but it might also be a real IO error or something else.
+ Let's just drop the connection
+ */
+ vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn->d_state == State::sendingQueryToBackend ? "writing to" : "reading from"), conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
+ if (conn->d_state == State::sendingQueryToBackend) {
+ ++conn->d_ds->tcpDiedSendingQuery;
+ }
+ else {
+ ++conn->d_ds->tcpDiedReadingResponse;
+ }
- if (conn->d_state != State::idle &&
- conn->d_state != State::sendingQueryToBackend &&
- conn->d_state != State::readingResponseSizeFromBackend &&
- conn->d_state != State::readingResponseFromBackend) {
- vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn->d_state));
- }
- }
- catch (const std::exception& e) {
- /* most likely an EOF because the other end closed the connection,
- but it might also be a real IO error or something else.
- Let's just drop the connection
- */
- vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn->d_ioState->getState() == IOState::NeedRead ? "reading from" : "writing to"), conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
- if (conn->d_state == State::sendingQueryToBackend) {
- ++conn->d_ds->tcpDiedSendingQuery;
- }
- else {
- ++conn->d_ds->tcpDiedReadingResponse;
- }
+ /* don't increase this counter when reusing connections */
+ if (conn->d_fresh) {
+ ++conn->d_downstreamFailures;
+ }
- /* don't increase this counter when reusing connections */
- if (conn->d_fresh) {
- ++conn->d_downstreamFailures;
+ /* remove this FD from the IO multiplexer */
+ iostate = IOState::Done;
+ connectionDied = true;
}
- /* remove this FD from the IO multiplexer */
- iostate = IOState::Done;
- connectionDied = true;
- }
-
- if (connectionDied) {
+ if (connectionDied) {
- bool reconnected = false;
- DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->retries);
+ DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->retries);
- if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) {
+ if ((!conn->d_usedForXFR || conn->d_queries == 0) && conn->d_downstreamFailures < conn->d_ds->retries) {
- conn->d_ioState->reset();
- ioGuard.release();
+ conn->d_ioState.reset();
+ ioGuard.release();
- try {
- if (conn->reconnect()) {
- conn->d_ioState = make_unique<IOStateHandler>(conn->d_clientConn->getIOMPlexer(), conn->d_handler->getDescriptor());
+ try {
+ if (conn->reconnect()) {
+ conn->d_ioState = make_unique<IOStateHandler>(conn->d_clientConn->getIOMPlexer(), conn->d_handler->getDescriptor());
+
+ /* we need to resend the queries that were in flight, if any */
+ for (auto& pending : conn->d_pendingResponses) {
+ conn->d_pendingQueries.push_back(std::move(pending.second));
+ if (!conn->d_usedForXFR) {
+ --conn->d_ds->outstanding;
+ }
+ }
+ conn->d_pendingResponses.clear();
+ conn->d_currentPos = 0;
- /* we need to resend the queries that were in flight, if any */
- for (auto& pending : conn->d_pendingResponses) {
- conn->d_pendingQueries.push_back(std::move(pending.second));
- if (!conn->d_usedForXFR) {
- --conn->d_ds->outstanding;
+ if (conn->d_state == State::doingHandshake ||
+ conn->d_state == State::sendingQueryToBackend) {
+ iostate = IOState::NeedWrite;
+ // resume sending query
}
- }
- conn->d_pendingResponses.clear();
- conn->d_currentPos = 0;
+ else {
+ if (conn->d_pendingQueries.empty()) {
+ throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
+ }
- if (conn->d_state == State::doingHandshake ||
- conn->d_state == State::sendingQueryToBackend) {
- iostate = IOState::NeedWrite;
- // resume sending query
- }
- else {
- if (conn->d_pendingQueries.empty()) {
- throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
+ iostate = queueNextQuery(conn);
}
- iostate = queueNextQuery(conn);
- }
+ if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) {
+ conn->d_currentQuery.d_buffer.insert(conn->d_currentQuery.d_buffer.begin(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
+ conn->d_proxyProtocolPayloadAdded = true;
+ }
- if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) {
- conn->d_currentQuery.d_buffer.insert(conn->d_currentQuery.d_buffer.begin(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
- conn->d_proxyProtocolPayloadAdded = true;
+ reconnected = true;
+ connectionDied = false;
}
-
- reconnected = true;
+ }
+ catch (const std::exception& e) {
+ // reconnect might throw on failure, let's ignore that, we just need to know
+ // it failed
}
}
- catch (const std::exception& e) {
- // reconnect might throw on failure, let's ignore that, we just need to know
- // it failed
- }
- }
- if (!reconnected) {
- /* reconnect failed, we give up */
- DEBUGLOG("reconnect failed, we give up");
- ++conn->d_ds->tcpGaveUp;
- conn->notifyAllQueriesFailed(now, FailureReason::gaveUp);
+ if (!reconnected) {
+ /* reconnect failed, we give up */
+ DEBUGLOG("reconnect failed, we give up");
+ ++conn->d_ds->tcpGaveUp;
+ conn->notifyAllQueriesFailed(now, FailureReason::gaveUp);
+ }
}
- }
- if (conn->d_ioState) {
- if (iostate == IOState::Done) {
- conn->d_ioState->update(iostate, handleIOCallback, conn);
- }
- else {
- conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
+ if (conn->d_ioState) {
+ if (iostate == IOState::Done) {
+ conn->d_ioState->update(iostate, handleIOCallback, conn);
+ }
+ else {
+ conn->d_ioState->update(iostate, handleIOCallback, conn, iostate == IOState::NeedRead ? conn->getBackendReadTTD(now) : conn->getBackendWriteTTD(now));
+ }
}
}
+ while (reconnected);
ioGuard.release();
}
catch(const std::runtime_error& e) {
vinfolog("Connection to downstream server %s failed: %s", d_ds->getName(), e.what());
d_downstreamFailures++;
- if (d_downstreamFailures > d_ds->retries) {
+ if (d_downstreamFailures >= d_ds->retries) {
throw;
}
}
}
- while (d_downstreamFailures <= d_ds->retries);
+ while (d_downstreamFailures < d_ds->retries);
return false;
}
void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
{
/* in some cases we could retry, here, reconnecting and sending our pending responses again */
+ vinfolog("Timeout while %s TCP backend %s", (write ? "writing to" : "reading from"), d_ds->getName());
if (write) {
++d_ds->tcpWriteTimeouts;
}
// get ready to read the next packet, if any
return IOState::NeedRead;
}
- else {
- uint16_t queryId = 0;
- try {
- queryId = getQueryIdFromResponse();
- }
- catch (const std::exception& e) {
- DEBUGLOG("Unable to get query ID");
- notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
- throw;
- }
- auto it = d_pendingResponses.find(queryId);
- if (it == d_pendingResponses.end()) {
- DEBUGLOG("could not find any corresponding query for ID "<<queryId<<". This is likely a duplicated ID over the same TCP connection, giving up!");
- notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
- return IOState::Done;
- }
+ uint16_t queryId = 0;
+ try {
+ queryId = getQueryIdFromResponse();
+ }
+ catch (const std::exception& e) {
+ DEBUGLOG("Unable to get query ID");
+ notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
+ throw;
+ }
- if (!conn->d_usedForXFR) {
- --conn->d_ds->outstanding;
- }
+ auto it = d_pendingResponses.find(queryId);
+ if (it == d_pendingResponses.end()) {
+ DEBUGLOG("could not find any corresponding query for ID "<<queryId<<". This is likely a duplicated ID over the same TCP connection, giving up!");
+ notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
+ return IOState::Done;
+ }
- auto ids = std::move(it->second.d_idstate);
- d_pendingResponses.erase(it);
- DEBUGLOG("passing response to client connection for "<<ids.qname);
- /* marking as idle for now, so we can accept new queries if our queues are empty */
- if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
- d_state = State::idle;
- }
- clientConn->handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
-
- if (!d_pendingQueries.empty()) {
- DEBUGLOG("still have some queries to send");
- d_state = State::sendingQueryToBackend;
- d_currentQuery = std::move(d_pendingQueries.front());
- d_currentPos = 0;
- d_pendingQueries.pop_front();
- return IOState::NeedWrite;
- }
- else if (!d_pendingResponses.empty()) {
- DEBUGLOG("still have some responses to read");
- d_state = State::readingResponseSizeFromBackend;
- d_currentPos = 0;
- d_responseBuffer.resize(sizeof(uint16_t));
- return IOState::NeedRead;
- }
- else {
- DEBUGLOG("nothing to do, waiting for a new query");
- d_state = State::idle;
- d_clientConn.reset();
- return IOState::Done;
- }
+ if (!conn->d_usedForXFR) {
+ --conn->d_ds->outstanding;
+ }
+
+ auto ids = std::move(it->second.d_idstate);
+ d_pendingResponses.erase(it);
+ DEBUGLOG("passing response to client connection for "<<ids.qname);
+ /* marking as idle for now, so we can accept new queries if our queues are empty */
+ if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
+ d_state = State::idle;
+ }
+ clientConn->handleResponse(clientConn, now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
+
+ if (!d_pendingQueries.empty()) {
+ DEBUGLOG("still have some queries to send");
+ d_state = State::sendingQueryToBackend;
+ d_currentQuery = std::move(d_pendingQueries.front());
+ d_currentPos = 0;
+ d_pendingQueries.pop_front();
+ return IOState::NeedWrite;
+ }
+ else if (!d_pendingResponses.empty()) {
+ DEBUGLOG("still have some responses to read");
+ d_state = State::readingResponseSizeFromBackend;
+ d_currentPos = 0;
+ d_responseBuffer.resize(sizeof(uint16_t));
+ return IOState::NeedRead;
+ }
+ else {
+ DEBUGLOG("nothing to do, waiting for a new query");
+ d_state = State::idle;
+ d_clientConn.reset();
+ return IOState::Done;
}
}