}
}
-void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
+void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response, bool fromBackend)
{
// queue response
state->d_queuedResponses.emplace_back(std::move(response));
// for the same reason we need to update the state right away, nobody will do that for us
if (state->active()) {
updateIO(state, iostate, now);
+ // if we have not finished reading every available byte, we _need_ to do an actual read
+ // attempt before waiting for the socket to become readable again, because if there is
+ // buffered data available the socket might never become readable again.
+ // This is true as soon as we deal with TLS because TLS records are processed one by
+ // one and might not match what we see at the application layer, so data might already
+ // be available in the TLS library's buffers. This is especially true when OpenSSL's
+ // read-ahead mode is enabled because then it buffers even more than one SSL record
+ // for performance reasons.
+ if (fromBackend && !state->d_lastIOBlocked) {
+ state->handleIO();
+ }
}
}
}
++dnsdist::metrics::g_stats.responses;
++state->d_ci.cs->responses;
- queueResponse(state, now, std::move(response));
+ queueResponse(state, now, std::move(response), true);
}
struct TCPCrossProtocolResponse
TCPResponse response;
d_state = State::idle;
++d_currentQueriesCount;
- queueResponse(state, now, std::move(response));
+ queueResponse(state, now, std::move(response), false);
return QueryProcessingResult::SelfAnswered;
}
response.d_buffer = std::move(query);
d_state = State::idle;
++d_currentQueriesCount;
- queueResponse(state, now, std::move(response));
+ queueResponse(state, now, std::move(response), false);
return QueryProcessingResult::Empty;
}
}
d_state = State::idle;
++d_currentQueriesCount;
- queueResponse(state, now, std::move(response));
+ queueResponse(state, now, std::move(response), false);
return QueryProcessingResult::SelfAnswered;
}
}
std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
- queueResponse(state, now, std::move(response));
+ queueResponse(state, now, std::move(response), true);
}
void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
iostate = conn->handleResponse(conn, now);
}
catch (const std::exception& e) {
- vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getName() : "unknown", conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
+ vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getNameWithAddr() : "unknown", conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
ioGuard.release();
conn->release();
return;
d_state = State::idle;
t_downstreamTCPConnectionsManager.moveToIdle(conn);
}
+ else if (!d_pendingResponses.empty()) {
+ d_currentPos = 0;
+ d_state = State::waitingForResponseFromBackend;
+ }
+ // be very careful that handleResponse() might trigger new queries being assigned to us,
+ // which may reset our d_currentPos, d_state and/or d_responseBuffer, so we cannot assume
+ // anything without checking first
auto shared = conn;
if (sender->active()) {
DEBUGLOG("passing response to client connection for "<<ids.qname);
}
else if (!d_pendingResponses.empty()) {
DEBUGLOG("still have some responses to read");
- d_state = State::waitingForResponseFromBackend;
- d_currentPos = 0;
- d_responseBuffer.resize(sizeof(uint16_t));
return IOState::NeedRead;
}
else {
static void handleAsyncReady(int fd, FDMultiplexer::funcparam_t& param);
static void updateIO(std::shared_ptr<IncomingTCPConnectionState>& state, IOState newState, const struct timeval& now);
- static void queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
+ static void queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response, bool fromBackend);
static void handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write);
virtual void handleIO();