state->d_currentPos = 0;
state->d_currentResponse = std::move(response);
- state->d_ioState->update(IOState::NeedWrite, handleIOCallback, state, getClientWriteTTD(now));
+ state->d_ioState->update(IOState::NeedWrite, handleIOCallback, state, state->getClientWriteTTD(now));
}
else {
// queue response
{
// if we have added a TCP Proxy Protocol payload to a connection, don't release it yet, no one else will be able to use it anyway
if (!state->d_isXFR && response.d_connection && response.d_connection->isIdle() && response.d_connection->canBeReused()) {
- auto& list = d_activeConnectionsToBackend.at(response.d_connection->getDS());
+ auto& list = state->d_activeConnectionsToBackend.at(response.d_connection->getDS());
for (auto it = list.begin(); it != list.end(); ++it) {
if (*it == response.d_connection) {
DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
void IncomingTCPConnectionState::notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now)
{
- if (d_state == State::sendingResponse) {
+ if (state->d_state == State::sendingResponse) {
/* if we have responses to send, let's do that first */
}
- else if (!d_queuedResponses.empty()) {
+ else if (!state->d_queuedResponses.empty()) {
/* stop reading and send what we have */
- TCPResponse resp = std::move(d_queuedResponses.front());
- d_queuedResponses.pop_front();
+ TCPResponse resp = std::move(state->d_queuedResponses.front());
+ state->d_queuedResponses.pop_front();
state->d_state = IncomingTCPConnectionState::State::idle;
sendResponse(state, now, std::move(resp));
}
else {
// the backend code already tried to reconnect if it was possible
- d_ioState->reset();
+ state->d_ioState->reset();
}
}
sendResponse(state, now, std::move(response));
}
-void IncomingTCPConnectionState::handleTimeout(bool write)
+void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
{
DEBUGLOG("client timeout");
- ++d_ci.cs->tcpClientTimeouts;
- d_ioState->reset();
+ DEBUGLOG("Processed "<<state->d_queriesCount<<" queries, current count is "<<state->d_currentQueriesCount<<", "<<state->d_activeConnectionsToBackend.size()<<" active connections, "<<state->d_queuedResponses.size()<<" response queued");
+
+ if (write || state->d_currentQueriesCount == 0) {
+ ++state->d_ci.cs->tcpClientTimeouts;
+ state->d_ioState->reset();
+ }
+ else {
+ DEBUGLOG("Going idle");
+ /* we still have some queries in flight, let's just stop reading for now */
+ state->d_state = IncomingTCPConnectionState::State::idle;
+ state->d_ioState->update(IOState::Done, handleIOCallback, state);
+ for (const auto& active : state->d_activeConnectionsToBackend) {
+ for (const auto& conn: active.second) {
+ DEBUGLOG("Connection to "<<active.first->getName()<<" is "<<(conn->isIdle() ? "idle" : "not idle"));
+ }
+ }
+ }
}
static void handleIncomingTCPQuery(int pipefd, FDMultiplexer::funcparam_t& param)
auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
if (cbData.first == state->d_ci.fd) {
vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
- state->handleTimeout(false);
+ state->handleTimeout(state, false);
}
}
else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
if (cbData.first == state->d_ci.fd) {
vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
- state->handleTimeout(true);
+ state->handleTimeout(state, true);
}
}
else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& conn, const struct timeval& now);
static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
+ static void notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now);
+ static void sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
+ static void handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
+ static void handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
+ static void handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write);
void queueQuery(TCPQuery&& query);
- void notifyIOError(std::shared_ptr<IncomingTCPConnectionState>& state, IDState&& query, const struct timeval& now);
- void sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
- void handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
- void handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
- void handleTimeout(bool write);
bool canAcceptNewQueries() const;