}
}
- static void clear()
+ static size_t clear()
{
+ size_t count = 0;
+ for (const auto downstream : t_downstreamConnections) {
+ count += downstream.second.size();
+ }
+
t_downstreamConnections.clear();
+
+ return count;
}
private:
d_handler.close();
}
-void IncomingTCPConnectionState::clearAllDownstreamConnections()
+size_t IncomingTCPConnectionState::clearAllDownstreamConnections()
{
- DownstreamConnectionsManager::clear();
+ return DownstreamConnectionsManager::clear();
}
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
downstreamConnection->setProxyProtocolValuesSent(std::move(dq.proxyProtocolValues));
}
+ TCPQuery query(std::move(state->d_buffer), std::move(ids));
if (proxyProtocolPayloadAdded) {
- downstreamConnection->setProxyProtocolPayloadAdded(true);
+ query.d_proxyProtocolPayloadAdded = true;
}
else {
- downstreamConnection->setProxyProtocolPayload(std::move(proxyProtocolPayload));
+ query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
}
++state->d_currentQueriesCount;
- vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).getName(), state->d_proxiedRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName());
- downstreamConnection->queueQuery(TCPQuery(std::move(state->d_buffer), std::move(ids)), downstreamConnection);
+ vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", query.d_idstate.qname.toLogString(), QType(query.d_idstate.qtype).getName(), state->d_proxiedRemote.toStringWithPort(), (state->d_ci.cs->tlsFrontend ? "DoT" : "TCP"), query.d_buffer.size(), ds->getName());
+ downstreamConnection->queueQuery(std::move(query), downstreamConnection);
}
void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
++state->d_ci.cs->tcpDiedReadingQuery;
}
else if (state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
+ /* unlikely to happen here, the exception should be handled in sendResponse() */
++state->d_ci.cs->tcpDiedSendingResponse;
}
DEBUGLOG("query sent to backend");
/* request sent ! */
+ if (conn->d_currentQuery.d_proxyProtocolPayloadAdded) {
+ conn->d_proxyProtocolPayloadSent = true;
+ }
conn->incQueries();
conn->d_currentPos = 0;
iostate = queueNextQuery(conn);
}
- if (!conn->d_proxyProtocolPayloadAdded && !conn->d_proxyProtocolPayload.empty()) {
- conn->d_currentQuery.d_buffer.insert(conn->d_currentQuery.d_buffer.begin(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
- conn->d_proxyProtocolPayloadAdded = true;
+ if (conn->needProxyProtocolPayload() && !conn->d_currentQuery.d_proxyProtocolPayloadAdded && !conn->d_currentQuery.d_proxyProtocolPayload.empty()) {
+ conn->d_currentQuery.d_buffer.insert(conn->d_currentQuery.d_buffer.begin(), conn->d_currentQuery.d_proxyProtocolPayload.begin(), conn->d_currentQuery.d_proxyProtocolPayload.end());
+ conn->d_currentQuery.d_proxyProtocolPayloadAdded = true;
}
reconnected = true;
d_state = State::sendingQueryToBackend;
d_currentPos = 0;
d_currentQuery = std::move(query);
- if (!d_proxyProtocolPayloadAdded && !d_proxyProtocolPayload.empty()) {
- d_currentQuery.d_buffer.insert(d_currentQuery.d_buffer.begin(), d_proxyProtocolPayload.begin(), d_proxyProtocolPayload.end());
- d_proxyProtocolPayloadAdded = true;
+ if (needProxyProtocolPayload() && !d_currentQuery.d_proxyProtocolPayloadAdded && !d_currentQuery.d_proxyProtocolPayload.empty()) {
+ d_currentQuery.d_buffer.insert(d_currentQuery.d_buffer.begin(), d_currentQuery.d_proxyProtocolPayload.begin(), d_currentQuery.d_proxyProtocolPayload.end());
+ d_currentQuery.d_proxyProtocolPayloadAdded = true;
}
struct timeval now;
gettimeofday(&now, 0);
handleIO(sharedSelf, now);
- // d_ioState->update(IOState::NeedWrite, handleIOCallback, sharedSelf, getBackendWriteTTD(now));
}
else {
DEBUGLOG("Adding new query to the queue because we are in state "<<(int)d_state);
}
d_fresh = true;
+ d_proxyProtocolPayloadSent = false;
do {
vinfolog("TCP connecting to downstream %s (%d)", d_ds->getNameWithAddr(), d_downstreamFailures);
return ntohs(dh.id);
}
-void TCPConnectionToBackend::setProxyProtocolPayload(std::string&& payload)
-{
- d_proxyProtocolPayload = std::move(payload);
-}
-
-void TCPConnectionToBackend::setProxyProtocolPayloadAdded(bool added)
-{
- d_proxyProtocolPayloadAdded = added;
-}
-
void TCPConnectionToBackend::setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent)
{
/* if we already have some values, we have already verified they match */
IDState d_idstate;
PacketBuffer d_buffer;
+ std::string d_proxyProtocolPayload;
+ bool d_proxyProtocolPayloadAdded{false};
};
class TCPConnectionToBackend;
void handleTimeout(const struct timeval& now, bool write);
void release();
- void setProxyProtocolPayload(std::string&& payload);
- void setProxyProtocolPayloadAdded(bool added);
void setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent);
std::string toString() const
uint16_t getQueryIdFromResponse();
bool reconnect();
void notifyAllQueriesFailed(const struct timeval& now, FailureReason reason);
+ bool needProxyProtocolPayload() const
+ {
+ return !d_proxyProtocolPayloadSent && (d_ds && d_ds->useProxyProtocol);
+ }
boost::optional<struct timeval> getBackendReadTTD(const struct timeval& now) const
{
std::unique_ptr<IOStateHandler> d_ioState{nullptr};
std::shared_ptr<DownstreamState> d_ds{nullptr};
std::shared_ptr<IncomingTCPConnectionState> d_clientConn;
- std::string d_proxyProtocolPayload;
TCPQuery d_currentQuery;
struct timeval d_connectionStartTime;
size_t d_currentPos{0};
bool d_enableFastOpen{false};
bool d_connectionDied{false};
bool d_usedForXFR{false};
- bool d_proxyProtocolPayloadAdded{false};
+ bool d_proxyProtocolPayloadSent{false};
};
return d_threadData.mplexer;
}
- static void clearAllDownstreamConnections();
+ static size_t clearAllDownstreamConnections();
static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& conn, const struct timeval& now);
static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);