if (response.d_connection && response.d_connection->isIdle()) {
// if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool yet, no one else will be able to use it anyway
if (response.d_connection->canBeReused()) {
- auto& list = state->d_activeConnectionsToBackend.at(response.d_connection->getDS());
-
- for (auto it = list.begin(); it != list.end(); ++it) {
- if (*it == response.d_connection) {
- try {
- response.d_connection->release();
- DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
- }
- catch (const std::exception& e) {
- vinfolog("Error releasing connection: %s", e.what());
+ const auto connIt = state->d_activeConnectionsToBackend.find(response.d_connection->getDS());
+ if (connIt != state->d_activeConnectionsToBackend.end()) {
+ auto& list = connIt->second;
+
+ for (auto it = list.begin(); it != list.end(); ++it) {
+ if (*it == response.d_connection) {
+ try {
+ response.d_connection->release();
+ DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
+ }
+ catch (const std::exception& e) {
+ vinfolog("Error releasing connection: %s", e.what());
+ }
+ list.erase(it);
+ break;
}
- list.erase(it);
- break;
}
}
}
queueResponse(state, now, std::move(response));
}
+class TCPCrossProtocolQuery : public CrossProtocolQuery
+{
+public:
+ TCPCrossProtocolQuery(PacketBuffer&& buffer, IDState&& ids, std::shared_ptr<DownstreamState>& ds, std::shared_ptr<IncomingTCPConnectionState>& sender): d_sender(sender)
+ {
+ query = InternalQuery(std::move(buffer), std::move(ids));
+ downstream = ds;
+ #warning handle proxy protocol payload
+ proxyProtocolPayloadSize = 0;
+ }
+
+ ~TCPCrossProtocolQuery()
+ {
+ }
+
+ std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
+ {
+ return d_sender;
+ }
+
+private:
+ std::shared_ptr<IncomingTCPConnectionState> d_sender;
+};
+
static void handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
{
if (state->d_querySize < sizeof(dnsheader)) {
setIDStateFromDNSQuestion(ids, dq, std::move(qname));
ids.origID = dh->id;
+ ++state->d_currentQueriesCount;
+
+ if (ds->isDoH()) {
+ std::shared_ptr<TCPQuerySender> incoming = state;
+ auto cpq = std::make_unique<TCPCrossProtocolQuery>(std::move(state->d_buffer), std::move(ids), ds, state);
+
+ ds->passCrossProtocolQuery(std::move(cpq));
+ return;
+ }
+
prependSizeToTCPQuery(state->d_buffer, 0);
#warning FIXME: handle DoH backends here
query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
}
- ++state->d_currentQueriesCount;
vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", query.d_idstate.qname.toLogString(), QType(query.d_idstate.qtype).toString(), state->d_proxiedRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), query.d_buffer.size(), ds->getName());
std::shared_ptr<TCPQuerySender> incoming = state;
downstreamConnection->queueQuery(incoming, std::move(query));
//cerr<<"trying to read "<<conn->d_in.size()<<endl;
try {
IOState newState = conn->d_handler->tryRead(conn->d_in, conn->d_inPos, conn->d_in.size(), true);
- // userData.d_handler->tryRead(userData.d_in, pos, userData.d_in.size());
//cerr<<"got a "<<(int)newState<<" state and "<<conn->d_inPos<<" bytes"<<endl;
conn->d_in.resize(conn->d_inPos);
if (newState == IOState::Done) {
return;
}
nghttp2_session_send(conn->d_session.get());
+ if (conn->getConcurrentStreamsCount() == 0) {
+ conn->stopIO();
+ ioGuard.release();
+ break;
+ }
}
else {
if (newState == IOState::NeedWrite) {
case NGHTTP2_DATA:
cerr<<"got data"<<endl;
break;
- case NGHTTP2_GOAWAY;
}
#endif