}
auto states = g_dstates.getLocal();
for(const auto& state : *states) {
- string serverName = state->getName().empty() ? state->remote.toStringWithPort() : state->getName();
+ string serverName = state->getName().empty() ? state->d_config.remote.toStringWithPort() : state->getName();
boost::replace_all(serverName, ".", "_");
const string base = namespace_name + "." + hostname + "." + instance_name + ".servers." + serverName + ".";
str<<base<<"queries" << ' ' << state->queries.load() << " " << now << "\r\n";
str<<base<<"responses" << ' ' << state->responses.load() << " " << now << "\r\n";
str<<base<<"drops" << ' ' << state->reuseds.load() << " " << now << "\r\n";
- str<<base<<"latency" << ' ' << (state->availability != DownstreamState::Availability::Down ? state->latencyUsec/1000.0 : 0) << " " << now << "\r\n";
+ str<<base<<"latency" << ' ' << (state->d_config.availability != DownstreamState::Availability::Down ? state->latencyUsec/1000.0 : 0) << " " << now << "\r\n";
str<<base<<"senderrors" << ' ' << state->sendErrors.load() << " " << now << "\r\n";
str<<base<<"outstanding" << ' ' << state->outstanding.load() << " " << now << "\r\n";
str<<base<<"tcpdiedsendingquery" << ' '<< state->tcpDiedSendingQuery.load() << " " << now << "\r\n";
auto localPools = g_pools.getCopy();
addServerToPool(localPools, pool, s);
g_pools.setState(localPools);
- s->pools.insert(pool);
+ s->d_config.pools.insert(pool);
});
luaCtx.registerFunction<void(std::shared_ptr<DownstreamState>::*)(string)>("rmPool", [](std::shared_ptr<DownstreamState> s, string pool) {
auto localPools = g_pools.getCopy();
removeServerFromPool(localPools, pool, s);
g_pools.setState(localPools);
- s->pools.erase(pool);
+ s->d_config.pools.erase(pool);
});
luaCtx.registerFunction<uint64_t(DownstreamState::*)()const>("getOutstanding", [](const DownstreamState& s) { return s.outstanding.load(); });
luaCtx.registerFunction<uint64_t(DownstreamState::*)()const>("getDrops", [](const DownstreamState& s) { return s.reuseds.load(); });
luaCtx.registerFunction<std::string(DownstreamState::*)()const>("getNameWithAddr", [](const DownstreamState& s) { return s.getNameWithAddr(); });
luaCtx.registerMember("upStatus", &DownstreamState::upStatus);
luaCtx.registerMember<int (DownstreamState::*)>("weight",
- [](const DownstreamState& s) -> int {return s.weight;},
- [](DownstreamState& s, int newWeight) {s.setWeight(newWeight);}
+ [](const DownstreamState& s) -> int {return s.d_config.d_weight;},
+ [](DownstreamState& s, int newWeight) { s.setWeight(newWeight); }
+ );
+ luaCtx.registerMember<int (DownstreamState::*)>("order",
+ [](const DownstreamState& s) -> int {return s.d_config.order; },
+ [](DownstreamState& s, int newOrder) { s.d_config.order = newOrder; }
);
- luaCtx.registerMember("order", &DownstreamState::order);
luaCtx.registerMember<const std::string(DownstreamState::*)>("name", [](const DownstreamState& backend) -> const std::string { return backend.getName(); }, [](DownstreamState& backend, const std::string& newName) { backend.setName(newName); });
- luaCtx.registerFunction<std::string(DownstreamState::*)()const>("getID", [](const DownstreamState& s) { return boost::uuids::to_string(s.id); });
+ luaCtx.registerFunction<std::string(DownstreamState::*)()const>("getID", [](const DownstreamState& s) { return boost::uuids::to_string(*s.d_config.id); });
#endif /* DISABLE_DOWNSTREAM_BINDINGS */
#ifndef DISABLE_DNSHEADER_BINDINGS
auto states = g_dstates.getLocal();
counter = 0;
for(const auto& s : *states) {
- ret << (fmt % counter % s->getName() % s->remote.toStringWithPort() % s->tcpCurrentConnections % s->tcpMaxConcurrentConnections % s->tcpDiedSendingQuery % s->tcpDiedReadingResponse % s->tcpGaveUp % s->tcpReadTimeouts % s->tcpWriteTimeouts % s->tcpConnectTimeouts % s->tcpNewConnections % s->tcpReusedConnections % s->tlsResumptions % s->tcpAvgQueriesPerConnection % s->tcpAvgConnectionDuration) << endl;
+ ret << (fmt % counter % s->getName() % s->d_config.remote.toStringWithPort() % s->tcpCurrentConnections % s->tcpMaxConcurrentConnections % s->tcpDiedSendingQuery % s->tcpDiedReadingResponse % s->tcpGaveUp % s->tcpReadTimeouts % s->tcpWriteTimeouts % s->tcpConnectTimeouts % s->tcpNewConnections % s->tcpReusedConnections % s->tlsResumptions % s->tcpAvgQueriesPerConnection % s->tcpAvgConnectionDuration) << endl;
++counter;
}
return std::shared_ptr<DownstreamState>();
}
- ComboAddress sourceAddr;
- std::string sourceItfName;
- unsigned int sourceItf = 0;
- size_t numberOfSockets = 1;
- std::set<int> cpus;
+ DownstreamState::Config config;
if (vars.count("source")) {
/* handle source in the following forms:
if (pos == std::string::npos) {
/* no '@', try to parse that as a valid v4/v6 address */
try {
- sourceAddr = ComboAddress(source);
+ config.sourceAddr = ComboAddress(source);
parsed = true;
}
catch (...) {
if (parsed == false) {
/* try to parse as interface name, or v4/v6@itf */
- sourceItfName = source.substr(pos == std::string::npos ? 0 : pos + 1);
- unsigned int itfIdx = if_nametoindex(sourceItfName.c_str());
+ config.sourceItfName = source.substr(pos == std::string::npos ? 0 : pos + 1);
+ unsigned int itfIdx = if_nametoindex(config.sourceItfName.c_str());
if (itfIdx != 0) {
if (pos == 0 || pos == std::string::npos) {
/* "eth0" or "@eth0" */
- sourceItf = itfIdx;
+ config.sourceItf = itfIdx;
}
else {
/* "192.0.2.1@eth0" */
- sourceAddr = ComboAddress(source.substr(0, pos));
- sourceItf = itfIdx;
+ config.sourceAddr = ComboAddress(source.substr(0, pos));
+ config.sourceItf = itfIdx;
}
#ifdef SO_BINDTODEVICE
/* we need to retain CAP_NET_RAW to be able to set SO_BINDTODEVICE in the health checks */
#endif
}
else {
- warnlog("Dismissing source %s because '%s' is not a valid interface name", source, sourceItfName);
+ warnlog("Dismissing source %s because '%s' is not a valid interface name", source, config.sourceItfName);
}
}
}
if (vars.count("sockets")) {
- numberOfSockets = std::stoul(boost::get<string>(vars["sockets"]));
- if (numberOfSockets == 0) {
+ config.d_numberOfSockets = std::stoul(boost::get<string>(vars["sockets"]));
+ if (config.d_numberOfSockets == 0) {
warnlog("Dismissing invalid number of sockets '%s', using 1 instead", boost::get<string>(vars["sockets"]));
- numberOfSockets = 1;
+ config.d_numberOfSockets = 1;
}
}
- // create but don't connect the socket in client or check-config modes
- auto ret = std::make_shared<DownstreamState>(serverAddr, sourceAddr, sourceItf, sourceItfName);
- if (!(client || configCheck)) {
- infolog("Added downstream server %s", serverAddr.toStringWithPort());
- }
-
if (vars.count("qps")) {
- int qpsVal = std::stoi(boost::get<string>(vars["qps"]));
- ret->qps = QPSLimiter(qpsVal, qpsVal);
+ config.d_qpsLimit = std::stoi(boost::get<string>(vars["qps"]));
}
if (vars.count("order")) {
- ret->order = std::stoi(boost::get<string>(vars["order"]));
+ config.order = std::stoi(boost::get<string>(vars["order"]));
}
if (vars.count("weight")) {
try {
- int weightVal = std::stoi(boost::get<string>(vars["weight"]));
+ config.d_weight = std::stoi(boost::get<string>(vars["weight"]));
- if (weightVal < 1) {
+ if (config.d_weight < 1) {
errlog("Error creating new server: downstream weight value must be greater than 0.");
- return ret;
+ return std::shared_ptr<DownstreamState>();
}
-
- ret->setWeight(weightVal);
}
catch (const std::exception& e) {
// std::stoi will throw an exception if the string isn't in a value int range
errlog("Error creating new server: downstream weight value must be between %s and %s", 1, std::numeric_limits<int>::max());
- return ret;
+ return std::shared_ptr<DownstreamState>();
}
}
if (vars.count("retries")) {
- ret->d_retries = std::stoi(boost::get<string>(vars["retries"]));
+ config.d_retries = std::stoi(boost::get<string>(vars["retries"]));
}
if (vars.count("checkInterval")) {
- ret->checkInterval = static_cast<unsigned int>(std::stoul(boost::get<string>(vars["checkInterval"])));
+ config.checkInterval = static_cast<unsigned int>(std::stoul(boost::get<string>(vars["checkInterval"])));
}
if (vars.count("tcpConnectTimeout")) {
- ret->tcpConnectTimeout = std::stoi(boost::get<string>(vars["tcpConnectTimeout"]));
+ config.tcpConnectTimeout = std::stoi(boost::get<string>(vars["tcpConnectTimeout"]));
}
if (vars.count("tcpSendTimeout")) {
- ret->tcpSendTimeout = std::stoi(boost::get<string>(vars["tcpSendTimeout"]));
+ config.tcpSendTimeout = std::stoi(boost::get<string>(vars["tcpSendTimeout"]));
}
if (vars.count("tcpRecvTimeout")) {
- ret->tcpRecvTimeout = std::stoi(boost::get<string>(vars["tcpRecvTimeout"]));
+ config.tcpRecvTimeout = std::stoi(boost::get<string>(vars["tcpRecvTimeout"]));
}
if (vars.count("tcpFastOpen")) {
bool fastOpen = boost::get<bool>(vars["tcpFastOpen"]);
if (fastOpen) {
#ifdef MSG_FASTOPEN
- ret->tcpFastOpen = true;
+ config.tcpFastOpen = true;
#else
warnlog("TCP Fast Open has been configured on downstream server %s but is not supported", boost::get<string>(vars["address"]));
#endif
}
if (vars.count("maxInFlight")) {
- ret->d_maxInFlightQueriesPerConn = std::stoi(boost::get<string>(vars["maxInFlight"]));
+ config.d_maxInFlightQueriesPerConn = std::stoi(boost::get<string>(vars["maxInFlight"]));
}
if (vars.count("name")) {
- ret->setName(boost::get<string>(vars["name"]));
+ config.name = boost::get<string>(vars["name"]);
}
if (vars.count("id")) {
- ret->setId(boost::uuids::string_generator()(boost::get<string>(vars["id"])));
+ config.id = boost::uuids::string_generator()(boost::get<string>(vars["id"]));
}
if (vars.count("checkName")) {
- ret->checkName = DNSName(boost::get<string>(vars["checkName"]));
+ config.checkName = DNSName(boost::get<string>(vars["checkName"]));
}
if (vars.count("checkType")) {
- ret->checkType = boost::get<string>(vars["checkType"]);
+ config.checkType = boost::get<string>(vars["checkType"]);
}
if (vars.count("checkClass")) {
- ret->checkClass = std::stoi(boost::get<string>(vars["checkClass"]));
+ config.checkClass = std::stoi(boost::get<string>(vars["checkClass"]));
}
if (vars.count("checkFunction")) {
- ret->checkFunction = boost::get<DownstreamState::checkfunc_t>(vars["checkFunction"]);
+ config.checkFunction = boost::get<DownstreamState::checkfunc_t>(vars["checkFunction"]);
}
if (vars.count("checkTimeout")) {
- ret->checkTimeout = std::stoi(boost::get<string>(vars["checkTimeout"]));
+ config.checkTimeout = std::stoi(boost::get<string>(vars["checkTimeout"]));
}
if (vars.count("checkTCP")) {
- ret->d_tcpCheck = boost::get<bool>(vars.at("checkTCP"));
+ config.d_tcpCheck = boost::get<bool>(vars.at("checkTCP"));
}
if (vars.count("setCD")) {
- ret->setCD = boost::get<bool>(vars["setCD"]);
+ config.setCD = boost::get<bool>(vars["setCD"]);
}
if (vars.count("mustResolve")) {
- ret->mustResolve = boost::get<bool>(vars["mustResolve"]);
+ config.mustResolve = boost::get<bool>(vars["mustResolve"]);
}
if (vars.count("useClientSubnet")) {
- ret->useECS = boost::get<bool>(vars["useClientSubnet"]);
+ config.useECS = boost::get<bool>(vars["useClientSubnet"]);
}
if (vars.count("useProxyProtocol")) {
- ret->useProxyProtocol = boost::get<bool>(vars["useProxyProtocol"]);
+ config.useProxyProtocol = boost::get<bool>(vars["useProxyProtocol"]);
}
if (vars.count("disableZeroScope")) {
- ret->disableZeroScope = boost::get<bool>(vars["disableZeroScope"]);
+ config.disableZeroScope = boost::get<bool>(vars["disableZeroScope"]);
}
if (vars.count("ipBindAddrNoPort")) {
- ret->ipBindAddrNoPort = boost::get<bool>(vars["ipBindAddrNoPort"]);
+ config.ipBindAddrNoPort = boost::get<bool>(vars["ipBindAddrNoPort"]);
}
if (vars.count("addXPF")) {
- ret->xpfRRCode = std::stoi(boost::get<string>(vars["addXPF"]));
+ config.xpfRRCode = std::stoi(boost::get<string>(vars["addXPF"]));
}
if (vars.count("maxCheckFailures")) {
- ret->maxCheckFailures = std::stoi(boost::get<string>(vars["maxCheckFailures"]));
+ config.maxCheckFailures = std::stoi(boost::get<string>(vars["maxCheckFailures"]));
}
if (vars.count("rise")) {
- ret->minRiseSuccesses = std::stoi(boost::get<string>(vars["rise"]));
+ config.minRiseSuccesses = std::stoi(boost::get<string>(vars["rise"]));
}
if (vars.count("reconnectOnUp")) {
- ret->reconnectOnUp = boost::get<bool>(vars["reconnectOnUp"]);
+ config.reconnectOnUp = boost::get<bool>(vars["reconnectOnUp"]);
}
if (vars.count("cpus")) {
for (const auto& cpu : boost::get<vector<pair<int, string>>>(vars["cpus"])) {
- cpus.insert(std::stoi(cpu.second));
+ config.d_cpus.insert(std::stoi(cpu.second));
}
}
if (vars.count("tcpOnly")) {
- ret->d_tcpOnly = boost::get<bool>(vars.at("tcpOnly"));
+ config.d_tcpOnly = boost::get<bool>(vars.at("tcpOnly"));
}
+ std::shared_ptr<TLSCtx> tlsCtx;
if (vars.count("tls")) {
TLSContextParameters tlsParams;
std::string ciphers;
tlsParams.d_enableRenegotiation = boost::get<bool>(vars.at("enableRenegotiation"));
}
if (vars.count("subjectName")) {
- ret->d_tlsSubjectName = boost::get<string>(vars.at("subjectName"));
+ config.d_tlsSubjectName = boost::get<string>(vars.at("subjectName"));
}
- ret->d_tlsCtx = getTLSContext(tlsParams);
+ tlsCtx = getTLSContext(tlsParams);
if (vars.count("dohPath")) {
#ifdef HAVE_NGHTTP2
- ret->d_dohPath = boost::get<string>(vars.at("dohPath"));
- if (ret->d_tlsCtx) {
- setupDoHClientProtocolNegotiation(ret->d_tlsCtx);
- }
-
- if (g_configurationDone && g_outgoingDoHWorkerThreads && *g_outgoingDoHWorkerThreads == 0) {
- throw std::runtime_error("Error: setOutgoingDoHWorkerThreads() is set to 0 so no outgoing DoH worker thread is available to serve queries");
- }
-
- if (!g_outgoingDoHWorkerThreads || *g_outgoingDoHWorkerThreads == 0) {
- g_outgoingDoHWorkerThreads = 1;
- }
+ config.d_dohPath = boost::get<string>(vars.at("dohPath"));
if (vars.count("addXForwardedHeaders")) {
- ret->d_addXForwardedHeaders = boost::get<bool>(vars.at("addXForwardedHeaders"));
+ config.d_addXForwardedHeaders = boost::get<bool>(vars.at("addXForwardedHeaders"));
}
#else /* HAVE_NGHTTP2 */
- throw std::runtime_error("Outgoing DNS over HTTPS support requested (via 'dohPath' on newServer()) but nghttp2 support is not available");
+ throw std::runtime_error("Outgoing DNS over HTTPS support requested (via 'dohPath' on newServer()) but nghttp2 support is not available");
#endif /* HAVE_NGHTTP2 */
}
- else {
- setupDoTProtocolNegotiation(ret->d_tlsCtx);
- }
}
- if (!ret->isTCPOnly() && !(client || configCheck)) {
- if (!IsAnyAddress(ret->remote)) {
- ret->connectUDPSockets(numberOfSockets);
- }
- }
-
- /* this needs to be done _AFTER_ the order has been set,
- since the server are kept ordered inside the pool */
- auto localPools = g_pools.getCopy();
if (vars.count("pool")) {
if (auto* pool = boost::get<string>(&vars["pool"])) {
- ret->pools.insert(*pool);
+ config.pools.insert(*pool);
}
else {
auto pools = boost::get<vector<pair<int, string>>>(vars["pool"]);
for (auto& p : pools) {
- ret->pools.insert(p.second);
+ config.pools.insert(p.second);
}
}
- for (const auto& poolName : ret->pools) {
+ }
+
+ // create but don't connect the socket in client or check-config modes
+ auto ret = std::make_shared<DownstreamState>(std::move(config), std::move(tlsCtx), !(client || configCheck));
+ if (!(client || configCheck)) {
+ infolog("Added downstream server %s", serverAddr.toStringWithPort());
+ }
+
+ /* this needs to be done _AFTER_ the order has been set,
+ since the server are kept ordered inside the pool */
+ auto localPools = g_pools.getCopy();
+ if (!ret->d_config.pools.empty()) {
+ for (const auto& poolName : ret->d_config.pools) {
addServerToPool(localPools, poolName, ret);
}
}
g_pools.setState(localPools);
if (ret->connected) {
- ret->threadStarted.test_and_set();
-
if (g_launchWork) {
- g_launchWork->push_back([ret, cpus]() {
- ret->tid = thread(responderThread, ret);
- if (!cpus.empty()) {
- mapThreadToCPUList(ret->tid.native_handle(), cpus);
- }
+ g_launchWork->push_back([&ret]() {
+ ret->start();
});
}
else {
- ret->tid = thread(responderThread, ret);
- if (!cpus.empty()) {
- mapThreadToCPUList(ret->tid.native_handle(), cpus);
- }
+ ret->start();
}
}
auto states = g_dstates.getCopy();
states.push_back(ret);
std::stable_sort(states.begin(), states.end(), [](const decltype(ret)& a, const decltype(ret)& b) {
- return a->order < b->order;
+ return a->d_config.order < b->d_config.order;
});
g_dstates.setState(states);
return ret;
else if (auto str = boost::get<std::string>(&var)) {
const auto uuid = getUniqueID(*str);
for (auto& state : states) {
- if (state->id == uuid) {
+ if (*state->d_config.id == uuid) {
server = state;
}
}
throw std::runtime_error("unable to locate the requested server");
}
auto localPools = g_pools.getCopy();
- for (const string& poolName : server->pools) {
+ for (const string& poolName : server->d_config.pools) {
removeServerFromPool(localPools, poolName, server);
}
/* the server might also be in the default pool */
for (const auto& s : *states) {
string status = s->getStatus();
string pools;
- for (auto& p : s->pools) {
- if (!pools.empty())
+ for (const auto& p : s->d_config.pools) {
+ if (!pools.empty()) {
pools += " ";
+ }
pools += p;
}
if (showUUIDs) {
- ret << (fmt % counter % s->getName() % s->remote.toStringWithPort() % status % s->queryLoad % s->qps.getRate() % s->order % s->weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec / 1000.0) % s->outstanding.load() % pools % s->id) << endl;
+ ret << (fmt % counter % s->getName() % s->d_config.remote.toStringWithPort() % status % s->queryLoad % s->qps.getRate() % s->d_config.order % s->d_config.d_weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec / 1000.0) % s->outstanding.load() % pools % *s->d_config.id) << endl;
}
else {
- ret << (fmt % counter % s->getName() % s->remote.toStringWithPort() % status % s->queryLoad % s->qps.getRate() % s->order % s->weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec / 1000.0) % s->outstanding.load() % pools) << endl;
+ ret << (fmt % counter % s->getName() % s->d_config.remote.toStringWithPort() % status % s->queryLoad % s->qps.getRate() % s->d_config.order % s->d_config.d_weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec / 1000.0) % s->outstanding.load() % pools) << endl;
}
totQPS += s->queryLoad;
totQueries += s->queries.load();
if (auto str = boost::get<std::string>(&i)) {
const auto uuid = getUniqueID(*str);
for (auto& state : states) {
- if (state->id == uuid) {
+ if (*state->d_config.id == uuid) {
return state;
}
}
servers += server.second->getName();
servers += " ";
}
- servers += server.second->remote.toStringWithPort();
+ servers += server.second->d_config.remote.toStringWithPort();
}
ret << (fmt % name % cache % policy % servers) << endl;
break;
case COLUMN_BACKENDWEIGHT:
DNSDistSNMPAgent::setCounter64Value(request,
- server->weight);
+ server->d_config.d_weight);
break;
case COLUMN_BACKENDOUTSTANDING:
DNSDistSNMPAgent::setCounter64Value(request,
}
case COLUMN_BACKENDADDRESS:
{
- std::string addr(server->remote.toStringWithPort());
+ std::string addr(server->d_config.remote.toStringWithPort());
snmp_set_var_typed_value(request->requestvb,
ASN_OCTET_STR,
addr.c_str(),
case COLUMN_BACKENDPOOLS:
{
std::string pools;
- for(auto& p : server->pools) {
- if(!pools.empty())
+ for (const auto& p : server->d_config.pools) {
+ if (!pools.empty()) {
pools+=" ";
- pools+=p;
+ }
+ pools += p;
}
snmp_set_var_typed_value(request->requestvb,
ASN_OCTET_STR,
DNSDistSNMPAgent::setCounter64Value(request, server->queries.load());
break;
case COLUMN_BACKENDORDER:
- DNSDistSNMPAgent::setCounter64Value(request, server->order);
+ DNSDistSNMPAgent::setCounter64Value(request, server->d_config.order);
break;
default:
netsnmp_set_request_error(reqinfo,
bool DNSDistSNMPAgent::sendBackendStatusChangeTrap(const std::shared_ptr<DownstreamState>& dss)
{
#ifdef HAVE_NET_SNMP
- const string backendAddress = dss->remote.toStringWithPort();
+ const string backendAddress = dss->d_config.remote.toStringWithPort();
const string backendStatus = dss->getStatus();
netsnmp_variable_list* varList = nullptr;
if (!downstream) {
/* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, ds, now, std::string());
- if (ds->useProxyProtocol) {
+ if (ds->d_config.useProxyProtocol) {
registerOwnedDownstreamConnection(downstream);
}
}
const auto& ds = currentResponse.d_connection->getDS();
const auto& ids = currentResponse.d_idstate;
double udiff = ids.sentTime.udiff();
- vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f usec", ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), currentResponse.d_buffer.size(), udiff);
+ vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f usec", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), currentResponse.d_buffer.size(), udiff);
- ::handleResponseSent(ids, udiff, state->d_ci.remote, ds->remote, static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->getProtocol());
+ ::handleResponseSent(ids, udiff, state->d_ci.remote, ds->d_config.remote, static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ds->getProtocol());
updateTCPLatency(ds, udiff);
}
{
std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
- if (response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->useProxyProtocol) {
+ if (response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) {
// if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool as no one else will be able to use it anyway
if (!response.d_connection->willBeReusable(true)) {
// if it can't be reused even by us, well
/* we need to do this _before_ creating the cross protocol query because
after that the buffer will have been moved */
- if (ds->useProxyProtocol) {
+ if (ds->d_config.useProxyProtocol) {
proxyProtocolPayload = getProxyProtocolPayload(dq);
}
auto downstreamConnection = state->getDownstreamConnection(ds, dq.proxyProtocolValues, now);
- if (ds->useProxyProtocol) {
+ if (ds->d_config.useProxyProtocol) {
/* if we ever sent a TLV over a connection, we can never go back */
if (!state->d_proxyProtocolPayloadHasTLV) {
state->d_proxyProtocolPayloadHasTLV = dq.proxyProtocolValues && !dq.proxyProtocolValues->empty();
for (const auto& state : *states) {
string serverName;
- if (state->getName().empty())
- serverName = state->remote.toStringWithPort();
- else
+ if (state->getName().empty()) {
+ serverName = state->d_config.remote.toStringWithPort();
+ }
+ else {
serverName = state->getName();
+ }
boost::replace_all(serverName, ".", "_");
const std::string label = boost::str(boost::format("{server=\"%1%\",address=\"%2%\"}")
- % serverName % state->remote.toStringWithPort());
+ % serverName % state->d_config.remote.toStringWithPort());
output << statesbase << "status" << label << " " << (state->isUp() ? "1" : "0") << "\n";
output << statesbase << "queries" << label << " " << state->queries.load() << "\n";
output << statesbase << "latency" << label << " " << state->latencyUsec/1000.0 << "\n";
output << statesbase << "senderrors" << label << " " << state->sendErrors.load() << "\n";
output << statesbase << "outstanding" << label << " " << state->outstanding.load() << "\n";
- output << statesbase << "order" << label << " " << state->order << "\n";
- output << statesbase << "weight" << label << " " << state->weight << "\n";
+ output << statesbase << "order" << label << " " << state->d_config.order << "\n";
+ output << statesbase << "weight" << label << " " << state->d_config.d_weight << "\n";
output << statesbase << "tcpdiedsendingquery" << label << " " << state->tcpDiedSendingQuery << "\n";
output << statesbase << "tcpdiedreadingresponse" << label << " " << state->tcpDiedReadingResponse << "\n";
output << statesbase << "tcpgaveup" << label << " " << state->tcpGaveUp << "\n";
static void addServerToJSON(Json::array& servers, int id, const std::shared_ptr<DownstreamState>& a)
{
string status;
- if (a->availability == DownstreamState::Availability::Up) {
+ if (a->d_config.availability == DownstreamState::Availability::Up) {
status = "UP";
}
- else if (a->availability == DownstreamState::Availability::Down) {
+ else if (a->d_config.availability == DownstreamState::Availability::Down) {
status = "DOWN";
}
else {
}
Json::array pools;
- for(const auto& p: a->pools) {
+ for (const auto& p: a->d_config.pools) {
pools.push_back(p);
}
Json::object server {
{"id", id},
{"name", a->getName()},
- {"address", a->remote.toStringWithPort()},
+ {"address", a->d_config.remote.toStringWithPort()},
{"state", status},
{"qps", (double)a->queryLoad},
{"qpsLimit", (double)a->qps.getRate()},
{"outstanding", (double)a->outstanding},
{"reuseds", (double)a->reuseds},
- {"weight", (double)a->weight},
- {"order", (double)a->order},
+ {"weight", (double)a->d_config.d_weight},
+ {"order", (double)a->d_config.order},
{"pools", pools},
{"latency", (double)(a->latencyUsec/1000.0)},
{"queries", (double)a->queries},
};
/* sending a latency for a DOWN server doesn't make sense */
- if (a->availability == DownstreamState::Availability::Down) {
+ if (a->d_config.availability == DownstreamState::Availability::Down) {
server["latency"] = nullptr;
}
int origFD = ids->origFD;
unsigned int qnameWireLength = 0;
- if (fd != ids->backendFD || !responseContentMatches(response, ids->qname, ids->qtype, ids->qclass, dss->remote, qnameWireLength)) {
+ if (fd != ids->backendFD || !responseContentMatches(response, ids->qname, ids->qtype, ids->qclass, dss->d_config.remote, qnameWireLength)) {
continue;
}
}
double udiff = ids->sentTime.udiff();
- vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
+ vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->d_config.remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
- handleResponseSent(*ids, udiff, *dr.remote, dss->remote, static_cast<unsigned int>(got), cleartextDH, dss->getProtocol());
+ handleResponseSent(*ids, udiff, *dr.remote, dss->d_config.remote, static_cast<unsigned int>(got), cleartextDH, dss->getProtocol());
dss->releaseState(queryId);
dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
}
}
catch (const std::exception& e){
- vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
+ vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->d_config.remote.toStringWithPort(), queryId, e.what());
}
}
}
{
ssize_t result;
- if (ss->sourceItf == 0) {
+ if (ss->d_config.sourceItf == 0) {
result = send(sd, request.data(), request.size(), 0);
}
else {
struct msghdr msgh;
struct iovec iov;
cmsgbuf_aligned cbuf;
- ComboAddress remote(ss->remote);
+ ComboAddress remote(ss->d_config.remote);
fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), const_cast<char*>(reinterpret_cast<const char *>(request.data())), request.size(), &remote);
- addCMsgSrcAddr(&msgh, &cbuf, &ss->sourceAddr, ss->sourceItf);
+ addCMsgSrcAddr(&msgh, &cbuf, &ss->d_config.sourceAddr, ss->d_config.sourceItf);
result = sendmsg(sd, &msgh, 0);
}
if (result == -1) {
int savederrno = errno;
- vinfolog("Error sending request to backend %s: %d", ss->remote.toStringWithPort(), savederrno);
+ vinfolog("Error sending request to backend %s: %d", ss->d_config.remote.toStringWithPort(), savederrno);
/* This might sound silly, but on Linux send() might fail with EINVAL
if the interface the socket was bound to doesn't exist anymore.
dq.dnssecOK = (getEDNSZ(dq) & EDNS_HEADER_FLAG_DO);
}
- if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
+ if (dq.useECS && ((selectedBackend && selectedBackend->d_config.useECS) || (!selectedBackend && serverPool->getECS()))) {
// we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
// we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
// ECS option, which would make it unsuitable for the zero-scope feature.
- if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
+ if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->d_config.disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
if (dq.packetCache->get(dq, dq.getHeader()->id, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, !dq.overTCP(), allowExpired)) {
if (!prepareOutgoingResponse(holders, cs, dq, true)) {
/* save the DNS flags as sent to the backend so we can cache the answer with the right flags later */
dq.cacheFlags = *getFlagsFromDNSHeader(dq.getHeader());
- if (dq.addXPF && selectedBackend->xpfRRCode != 0) {
- addXPF(dq, selectedBackend->xpfRRCode);
+ if (dq.addXPF && selectedBackend->d_config.xpfRRCode != 0) {
+ addXPF(dq, selectedBackend->d_config.xpfRRCode);
}
selectedBackend->incQueriesCount();
}
double udiff = ids.sentTime.udiff();
- vinfolog("Got answer from %s, relayed to %s (UDP), took %f usec", d_ds->remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff);
+ vinfolog("Got answer from %s, relayed to %s (UDP), took %f usec", d_ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff);
- handleResponseSent(ids, udiff, *dr.remote, d_ds->remote, response.d_buffer.size(), cleartextDH, d_ds->getProtocol());
+ handleResponseSent(ids, udiff, *dr.remote, d_ds->d_config.remote, response.d_buffer.size(), cleartextDH, d_ds->getProtocol());
d_ds->latencyUsec = (127.0 * d_ds->latencyUsec / 128.0) + udiff/128.0;
std::string proxyProtocolPayload;
/* we need to do this _before_ creating the cross protocol query because
after that the buffer will have been moved */
- if (ss->useProxyProtocol) {
+ if (ss->d_config.useProxyProtocol) {
proxyProtocolPayload = getProxyProtocolPayload(dq);
}
dh = dq.getHeader();
dh->id = idOffset;
- if (ss->useProxyProtocol) {
+ if (ss->d_config.useProxyProtocol) {
addProxyProtocol(dq);
}
auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
for(auto& dss : *states) {
- if (++dss->lastCheck < dss->checkInterval) {
+ if (++dss->lastCheck < dss->d_config.checkInterval) {
continue;
}
dss->lastCheck = 0;
- if (dss->availability == DownstreamState::Availability::Auto) {
+ if (dss->d_config.availability == DownstreamState::Availability::Auto) {
if (!queueHealthCheck(mplexer, dss)) {
updateHealthCheckResult(dss, false, false);
}
// pre compute hashes
auto backends = g_dstates.getLocal();
for (auto& backend: *backends) {
- if (backend->weight < 100) {
- vinfolog("Warning, the backend '%s' has a very low weight (%d), which will not yield a good distribution of queries with the 'chashed' policy. Please consider raising it to at least '100'.", backend->getName(), backend->weight);
+ if (backend->d_config.d_weight < 100) {
+ vinfolog("Warning, the backend '%s' has a very low weight (%d), which will not yield a good distribution of queries with the 'chashed' policy. Please consider raising it to at least '100'.", backend->getName(), backend->d_config.d_weight);
}
backend->hash();
createPoolIfNotExists(localPools, "");
if (g_cmdLine.remotes.size()) {
for (const auto& address : g_cmdLine.remotes) {
- auto ret = std::make_shared<DownstreamState>(ComboAddress(address, 53));
- ret->connectUDPSockets(1);
+ DownstreamState::Config config;
+ config.remote = ComboAddress(address, 53);
+ auto ret = std::make_shared<DownstreamState>(std::move(config), nullptr, true);
addServerToPool(localPools, "", ret);
- if (ret->connected && !ret->threadStarted.test_and_set()) {
- ret->tid = thread(responderThread, ret);
- }
- g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
+ ret->start();
+ g_dstates.modify([&ret](servers_t& servers) { servers.push_back(std::move(ret)); });
}
}
g_pools.setState(localPools);
checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
- for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
- if (dss->availability == DownstreamState::Availability::Auto) {
+ for (auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
+ if (dss->d_config.availability == DownstreamState::Availability::Auto) {
if (!queueHealthCheck(mplexer, dss, true)) {
dss->setUpStatus(false);
warnlog("Marking downstream %s as 'down'", dss->getNameWithAddr());
struct CrossProtocolQuery;
-struct DownstreamState
+struct DownstreamState: public std::enable_shared_from_this<DownstreamState>
{
+ DownstreamState(const DownstreamState&) = delete;
+ DownstreamState(DownstreamState&&) = default;
+ DownstreamState& operator=(const DownstreamState&) = delete;
+ DownstreamState& operator=(DownstreamState&&) = default;
+
typedef std::function<std::tuple<DNSName, uint16_t, uint16_t>(const DNSName&, uint16_t, uint16_t, dnsheader*)> checkfunc_t;
+ enum class Availability : uint8_t { Up, Down, Auto};
+
+ struct Config
+ {
+ Config()
+ {
+ }
+ Config(const ComboAddress& remote_): remote(remote_)
+ {
+ }
+
+ set<string> pools;
+ std::set<int> d_cpus;
+ checkfunc_t checkFunction;
+ std::optional<boost::uuids::uuid> id;
+ DNSName checkName{"a.root-servers.net."};
+ ComboAddress remote;
+ ComboAddress sourceAddr;
+ std::string sourceItfName;
+ std::string d_tlsSubjectName;
+ std::string d_dohPath;
+ std::string name;
+ std::string nameWithAddr;
+ size_t d_numberOfSockets{1};
+ size_t d_maxInFlightQueriesPerConn{1};
+ size_t d_tcpConcurrentConnectionsLimit{0};
+ int order{1};
+ int d_weight{1};
+ int tcpConnectTimeout{5};
+ int tcpRecvTimeout{30};
+ int tcpSendTimeout{30};
+ int d_qpsLimit{0};
+ unsigned int checkInterval{1};
+ unsigned int sourceItf{0};
+ QType checkType{QType::A};
+ uint16_t checkClass{QClass::IN};
+ uint16_t d_retries{5};
+ uint16_t xpfRRCode{0};
+ uint16_t checkTimeout{1000}; /* in milliseconds */
+ uint8_t maxCheckFailures{1};
+ uint8_t minRiseSuccesses{1};
+ Availability availability{Availability::Auto};
+ bool mustResolve{false};
+ bool useECS{false};
+ bool useProxyProtocol{false};
+ bool setCD{false};
+ bool disableZeroScope{false};
+ bool tcpFastOpen{false};
+ bool ipBindAddrNoPort{true};
+ bool reconnectOnUp{false};
+ bool d_tcpCheck{false};
+ bool d_tcpOnly{false};
+ bool d_addXForwardedHeaders{false}; // for DoH backends
+ };
+
+ DownstreamState(DownstreamState::Config&& config, std::shared_ptr<TLSCtx> tlsCtx, bool connect);
+ DownstreamState(const ComboAddress& remote): DownstreamState(DownstreamState::Config(remote), nullptr, false)
+ {
+ }
- DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, const std::string& sourceItfName);
- DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, std::string()) {}
~DownstreamState();
- void connectUDPSockets(size_t numberOfSockets);
+ Config d_config;
stat_t sendErrors{0};
stat_t outstanding{0};
stat_t reuseds{0};
pdns::stat_t_trait<double> tcpAvgConnectionDuration{0.0};
pdns::stat_t_trait<double> queryLoad{0.0};
pdns::stat_t_trait<double> dropRate{0.0};
- boost::uuids::uuid id;
- const ComboAddress remote;
- const ComboAddress sourceAddr;
+
SharedLockGuarded<std::vector<unsigned int>> hashes;
LockGuarded<std::unique_ptr<FDMultiplexer>> mplexer{nullptr};
- const std::string sourceItfName;
- std::string d_tlsSubjectName;
- std::string d_dohPath;
private:
- std::string name;
- std::string nameWithAddr;
LockGuarded<std::map<uint16_t, IDState>> d_idStatesMap;
vector<IDState> idStates;
public:
std::shared_ptr<TLSCtx> d_tlsCtx{nullptr};
std::vector<int> sockets;
- set<string> pools;
- std::mutex connectLock;
- std::thread tid;
- checkfunc_t checkFunction;
- DNSName checkName{"a.root-servers.net."};
StopWatch sw;
QPSLimiter qps;
std::atomic<uint64_t> idOffset{0};
size_t socketsOffset{0};
- size_t d_maxInFlightQueriesPerConn{1};
- size_t d_tcpConcurrentConnectionsLimit{0};
double latencyUsec{0.0};
double latencyUsecTCP{0.0};
- int order{1};
- int weight{1};
- int tcpConnectTimeout{5};
- int tcpRecvTimeout{30};
- int tcpSendTimeout{30};
- unsigned int checkInterval{1};
unsigned int lastCheck{0};
- const unsigned int sourceItf{0};
- QType checkType{QType::A};
- uint16_t checkClass{QClass::IN};
- uint16_t d_retries{5};
- uint16_t xpfRRCode{0};
- uint16_t checkTimeout{1000}; /* in milliseconds */
uint8_t currentCheckFailures{0};
uint8_t consecutiveSuccessfulChecks{0};
- uint8_t maxCheckFailures{1};
- uint8_t minRiseSuccesses{1};
- enum class Availability : uint8_t { Up, Down, Auto} availability{Availability::Auto};
-private:
- bool d_stopped{false};
-public:
std::atomic<bool> hashesComputed{false};
- bool mustResolve{false};
- bool upStatus{false};
- bool useECS{false};
- bool useProxyProtocol{false};
- bool setCD{false};
- bool disableZeroScope{false};
std::atomic<bool> connected{false};
+ bool upStatus{false};
+
+private:
+ void connectUDPSockets();
+
+ std::thread tid;
+ std::mutex connectLock;
std::atomic_flag threadStarted;
- bool tcpFastOpen{false};
- bool ipBindAddrNoPort{true};
- bool reconnectOnUp{false};
- bool d_tcpCheck{false};
- bool d_tcpOnly{false};
- bool d_addXForwardedHeaders{false}; // for DoH backends
+ bool d_stopped{false};
+public:
+
+ void start();
bool isUp() const
{
- if(availability == Availability::Down)
+ if (d_config.availability == Availability::Down) {
return false;
- if(availability == Availability::Up)
+ }
+ else if (d_config.availability == Availability::Up) {
return true;
+ }
return upStatus;
}
- void setUp() { availability = Availability::Up; }
+
+ void setUp() {
+ d_config.availability = Availability::Up;
+ }
+
void setUpStatus(bool newStatus)
{
upStatus = newStatus;
- if (!upStatus)
+ if (!upStatus) {
latencyUsec = 0.0;
+ }
}
void setDown()
{
- availability = Availability::Down;
+ d_config.availability = Availability::Down;
latencyUsec = 0.0;
}
- void setAuto() { availability = Availability::Auto; }
+ void setAuto() {
+ d_config.availability = Availability::Auto;
+ }
const string& getName() const {
- return name;
+ return d_config.name;
}
const string& getNameWithAddr() const {
- return nameWithAddr;
+ return d_config.nameWithAddr;
}
void setName(const std::string& newName)
{
- name = newName;
- nameWithAddr = newName.empty() ? remote.toStringWithPort() : (name + " (" + remote.toStringWithPort()+ ")");
+ d_config.name = newName;
+ d_config.nameWithAddr = newName.empty() ? d_config.remote.toStringWithPort() : (d_config.name + " (" + d_config.remote.toStringWithPort()+ ")");
}
string getStatus() const
{
string status;
- if(availability == DownstreamState::Availability::Up)
+ if (d_config.availability == DownstreamState::Availability::Up) {
status = "UP";
- else if(availability == DownstreamState::Availability::Down)
+ }
+ else if (d_config.availability == DownstreamState::Availability::Down) {
status = "DOWN";
- else
+ }
+ else {
status = (upStatus ? "up" : "down");
+ }
return status;
}
+
bool reconnect();
void hash();
void setId(const boost::uuids::uuid& newId);
}
const boost::uuids::uuid& getID() const
{
- return id;
+ return *d_config.id;
}
void updateTCPMetrics(size_t nbQueries, uint64_t durationMs)
bool doHealthcheckOverTCP() const
{
- return d_tcpOnly || d_tcpCheck || d_tlsCtx != nullptr;
+ return d_config.d_tcpOnly || d_config.d_tcpCheck || d_tlsCtx != nullptr;
}
bool isTCPOnly() const
{
- return d_tcpOnly || d_tlsCtx != nullptr;
+ return d_config.d_tcpOnly || d_tlsCtx != nullptr;
}
bool isDoH() const
{
- return !d_dohPath.empty();
+ return !d_config.d_dohPath.empty();
}
bool passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq);
private:
void handleTimeout(IDState& ids);
};
-using servers_t =vector<std::shared_ptr<DownstreamState>>;
+using servers_t = vector<std::shared_ptr<DownstreamState>>;
void responderThread(std::shared_ptr<DownstreamState> state);
extern LockGuarded<LuaContext> g_lua;
#include "dnsdist-tcp.hh"
#include "dolog.hh"
-
bool DownstreamState::passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq)
{
- if (d_dohPath.empty()) {
+ if (d_config.d_dohPath.empty()) {
return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq));
}
else {
close(fd);
fd = -1;
}
- if (!IsAnyAddress(remote)) {
- fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
- if (!IsAnyAddress(sourceAddr)) {
+ if (!IsAnyAddress(d_config.remote)) {
+ fd = SSocket(d_config.remote.sin4.sin_family, SOCK_DGRAM, 0);
+ if (!IsAnyAddress(d_config.sourceAddr)) {
SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
- if (!sourceItfName.empty()) {
+ if (!d_config.sourceItfName.empty()) {
#ifdef SO_BINDTODEVICE
- int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
+ int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, d_config.sourceItfName.c_str(), d_config.sourceItfName.length());
if (res != 0) {
- infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
+ infolog("Error setting up the interface on backend socket '%s': %s", d_config.remote.toStringWithPort(), stringerror());
}
#endif
}
- SBind(fd, sourceAddr);
+ SBind(fd, d_config.sourceAddr);
}
try {
- SConnect(fd, remote);
+ SConnect(fd, d_config.remote);
if (sockets.size() > 1) {
(*mplexer.lock())->addReadFD(fd, [](int, boost::any) {});
}
connected = true;
}
catch(const std::runtime_error& error) {
- infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+ infolog("Error connecting to new server with address %s: %s", d_config.remote.toStringWithPort(), error.what());
connected = false;
break;
}
void DownstreamState::hash()
{
- vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
- auto w = weight;
+ vinfolog("Computing hashes for id=%s and weight=%d", *d_config.id, d_config.d_weight);
+ auto w = d_config.d_weight;
+ auto idStr = boost::str(boost::format("%s") % *d_config.id);
auto lockedHashes = hashes.write_lock();
lockedHashes->clear();
lockedHashes->reserve(w);
while (w > 0) {
- std::string uuid = boost::str(boost::format("%s-%d") % id % w);
+ std::string uuid = boost::str(boost::format("%s-%d") % idStr % w);
unsigned int wshash = burtleCI(reinterpret_cast<const unsigned char*>(uuid.c_str()), uuid.size(), g_hashperturb);
lockedHashes->push_back(wshash);
--w;
void DownstreamState::setId(const boost::uuids::uuid& newId)
{
- id = newId;
+ d_config.id = newId;
// compute hashes only if already done
if (hashesComputed) {
hash();
errlog("Error setting server's weight: downstream weight value must be greater than 0.");
return ;
}
- weight = newWeight;
+
+ d_config.d_weight = newWeight;
+
if (hashesComputed) {
hash();
}
}
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_): remote(remote_), sourceAddr(sourceAddr_), sourceItfName(sourceItfName_), name(remote_.toStringWithPort()), nameWithAddr(remote_.toStringWithPort()), sourceItf(sourceItf_)
+DownstreamState::DownstreamState(DownstreamState::Config&& config, std::shared_ptr<TLSCtx> tlsCtx, bool connect): d_config(std::move(config)), d_tlsCtx(std::move(tlsCtx))
{
- id = getUniqueID();
threadStarted.clear();
+ if (d_config.d_qpsLimit > 0) {
+ qps = QPSLimiter(d_config.d_qpsLimit, d_config.d_qpsLimit);
+ }
+
+ if (d_config.id) {
+ setId(*d_config.id);
+ }
+ else {
+ d_config.id = getUniqueID();
+ }
+
+ if (d_config.d_weight > 0) {
+ setWeight(d_config.d_weight);
+ }
+
+ setName(d_config.name);
+
+ if (d_tlsCtx) {
+ if (!d_config.d_dohPath.empty()) {
+#ifdef HAVE_NGHTTP2
+ setupDoHClientProtocolNegotiation(d_tlsCtx);
+
+ if (g_configurationDone && g_outgoingDoHWorkerThreads && *g_outgoingDoHWorkerThreads == 0) {
+ throw std::runtime_error("Error: setOutgoingDoHWorkerThreads() is set to 0 so no outgoing DoH worker thread is available to serve queries");
+ }
+
+ if (!g_outgoingDoHWorkerThreads || *g_outgoingDoHWorkerThreads == 0) {
+ g_outgoingDoHWorkerThreads = 1;
+ }
+#endif /* HAVE_NGHTTP2 */
+ }
+ else {
+ setupDoTProtocolNegotiation(d_tlsCtx);
+ }
+ }
+
+ if (connect && !isTCPOnly()) {
+ if (!IsAnyAddress(d_config.remote)) {
+ connectUDPSockets();
+ }
+ }
+
sw.start();
}
-void DownstreamState::connectUDPSockets(size_t numberOfSockets)
+
+void DownstreamState::start()
+{
+ if (connected && !threadStarted.test_and_set()) {
+ tid = std::thread(responderThread, shared_from_this());
+
+ if (!d_config.d_cpus.empty()) {
+ mapThreadToCPUList(tid.native_handle(), d_config.d_cpus);
+ }
+
+ tid.detach();
+ }
+}
+
+void DownstreamState::connectUDPSockets()
{
if (s_randomizeIDs) {
idStates.clear();
else {
idStates.resize(g_maxOutstanding);
}
- sockets.resize(numberOfSockets);
+ sockets.resize(d_config.d_numberOfSockets);
if (sockets.size() > 1) {
*(mplexer.lock()) = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
fd = -1;
}
}
-
- // we need to either detach or join the thread before it
- // is destroyed
- if (threadStarted.test_and_set()) {
- tid.detach();
- }
}
void DownstreamState::incCurrentConnectionsCount()
--outstanding;
++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
- remote.toStringWithPort(), getName(),
+ d_config.remote.toStringWithPort(), getName(),
ids.qname.toLogString(), QType(ids.qtype).toString(), ids.origRemote.toStringWithPort());
struct timespec ts;
memset(&fake, 0, sizeof(fake));
fake.id = ids.origID;
- g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, remote, getProtocol());
+ g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, d_config.remote, getProtocol());
}
void DownstreamState::handleTimeouts()
newServers->emplace_back(++count, server);
/* we need to reorder based on the server 'order' */
std::stable_sort(newServers->begin(), newServers->end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
- return a.second->order < b.second->order;
+ return a.second->d_config.order < b.second->d_config.order;
});
/* and now we need to renumber for Lua (custom policies) */
size_t idx = 1;
if (!dss->upStatus) {
/* we were marked as down */
dss->consecutiveSuccessfulChecks++;
- if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
+ if (dss->consecutiveSuccessfulChecks < dss->d_config.minRiseSuccesses) {
/* if we need more than one successful check to rise
and we didn't reach the threshold yet,
let's stay down */
if (dss->upStatus) {
/* we are currently up */
dss->currentCheckFailures++;
- if (dss->currentCheckFailures < dss->maxCheckFailures) {
+ if (dss->currentCheckFailures < dss->d_config.maxCheckFailures) {
/* we need more than one failure to be marked as down,
and we did not reach the threshold yet, let's stay up */
newState = true;
if (newState != dss->upStatus) {
warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
- if (newState && !dss->isTCPOnly() && (!dss->connected || dss->reconnectOnUp)) {
+ if (newState && !dss->isTCPOnly() && (!dss->connected || dss->d_config.reconnectOnUp)) {
newState = dss->reconnect();
-
- if (dss->connected && !dss->threadStarted.test_and_set()) {
- dss->tid = std::thread(responderThread, dss);
- }
+ dss->start();
}
dss->setUpStatus(newState);
return false;
}
- if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
+ if (ds->d_config.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
if (g_verboseHealthChecks) {
infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
}
data->d_mplexer.removeReadFD(fd);
ComboAddress from;
- from.sin4.sin_family = data->d_ds->remote.sin4.sin_family;
+ from.sin4.sin_family = data->d_ds->d_config.remote.sin4.sin_family;
auto fromlen = from.getSocklen();
data->d_buffer.resize(512);
auto got = recvfrom(data->d_udpSocket.getHandle(), &data->d_buffer.at(0), data->d_buffer.size(), 0, reinterpret_cast<sockaddr *>(&from), &fromlen);
if (got < 0) {
if (g_verboseHealthChecks) {
- infolog("Error receiving health check response from %s: %s", data->d_ds->remote.toStringWithPort(), stringerror());
+ infolog("Error receiving health check response from %s: %s", data->d_ds->d_config.remote.toStringWithPort(), stringerror());
}
updateHealthCheckResult(data->d_ds, data->d_initial, false);
}
/* we are using a connected socket but hey.. */
- if (from != data->d_ds->remote) {
+ if (from != data->d_ds->d_config.remote) {
if (g_verboseHealthChecks) {
- infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), data->d_ds->remote.toStringWithPort());
+ infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), data->d_ds->d_config.remote.toStringWithPort());
}
updateHealthCheckResult(data->d_ds, data->d_initial, false);
}
try
{
uint16_t queryID = dnsdist::getRandomDNSID();
- DNSName checkName = ds->checkName;
- uint16_t checkType = ds->checkType.getCode();
- uint16_t checkClass = ds->checkClass;
+ DNSName checkName = ds->d_config.checkName;
+ uint16_t checkType = ds->d_config.checkType.getCode();
+ uint16_t checkClass = ds->d_config.checkClass;
dnsheader checkHeader;
memset(&checkHeader, 0, sizeof(checkHeader));
checkHeader.id = queryID;
checkHeader.rd = true;
- if (ds->setCD) {
+ if (ds->d_config.setCD) {
checkHeader.cd = true;
}
- if (ds->checkFunction) {
+ if (ds->d_config.checkFunction) {
auto lock = g_lua.lock();
- auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
+ auto ret = ds->d_config.checkFunction(checkName, checkType, checkClass, &checkHeader);
checkName = std::get<0>(ret);
checkType = std::get<1>(ret);
checkClass = std::get<2>(ret);
uint16_t packetSize = packet.size();
std::string proxyProtocolPayload;
size_t proxyProtocolPayloadSize = 0;
- if (ds->useProxyProtocol) {
+ if (ds->d_config.useProxyProtocol) {
proxyProtocolPayload = makeLocalProxyHeader();
proxyProtocolPayloadSize = proxyProtocolPayload.size();
if (!ds->isDoH()) {
}
}
- Socket sock(ds->remote.sin4.sin_family, ds->doHealthcheckOverTCP() ? SOCK_STREAM : SOCK_DGRAM);
+ Socket sock(ds->d_config.remote.sin4.sin_family, ds->doHealthcheckOverTCP() ? SOCK_STREAM : SOCK_DGRAM);
sock.setNonBlocking();
- if (!IsAnyAddress(ds->sourceAddr)) {
+ if (!IsAnyAddress(ds->d_config.sourceAddr)) {
sock.setReuseAddr();
#ifdef IP_BIND_ADDRESS_NO_PORT
- if (ds->ipBindAddrNoPort) {
+ if (ds->d_config.ipBindAddrNoPort) {
SSetsockopt(sock.getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
}
#endif
- if (!ds->sourceItfName.empty()) {
+ if (!ds->d_config.sourceItfName.empty()) {
#ifdef SO_BINDTODEVICE
- int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
+ int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->d_config.sourceItfName.c_str(), ds->d_config.sourceItfName.length());
if (res != 0 && g_verboseHealthChecks) {
infolog("Error setting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror());
}
#endif
}
- sock.bind(ds->sourceAddr);
+ sock.bind(ds->d_config.sourceAddr);
}
auto data = std::make_shared<HealthCheckData>(*mplexer, ds, std::move(checkName), checkType, checkClass, queryID);
data->d_initial = initialCheck;
gettimeofday(&data->d_ttd, nullptr);
- data->d_ttd.tv_sec += ds->checkTimeout / 1000; /* ms to seconds */
- data->d_ttd.tv_usec += (ds->checkTimeout % 1000) * 1000; /* remaining ms to us */
+ data->d_ttd.tv_sec += ds->d_config.checkTimeout / 1000; /* ms to seconds */
+ data->d_ttd.tv_usec += (ds->d_config.checkTimeout % 1000) * 1000; /* remaining ms to us */
if (data->d_ttd.tv_usec > 1000000) {
++data->d_ttd.tv_sec;
data->d_ttd.tv_usec -= 1000000;
}
if (!ds->doHealthcheckOverTCP()) {
- sock.connect(ds->remote);
+ sock.connect(ds->d_config.remote);
data->d_udpSocket = std::move(sock);
ssize_t sent = udpClientSendRequestToBackend(ds, data->d_udpSocket.getHandle(), packet, true);
if (sent < 0) {
}
else {
time_t now = time(nullptr);
- data->d_tcpHandler = std::make_unique<TCPIOHandler>(ds->d_tlsSubjectName, sock.releaseHandle(), timeval{ds->checkTimeout,0}, ds->d_tlsCtx, now);
+ data->d_tcpHandler = std::make_unique<TCPIOHandler>(ds->d_config.d_tlsSubjectName, sock.releaseHandle(), timeval{ds->d_config.checkTimeout,0}, ds->d_tlsCtx, now);
data->d_ioState = std::make_unique<IOStateHandler>(*mplexer, data->d_tcpHandler->getDescriptor());
if (ds->d_tlsCtx) {
try {
vinfolog("Unable to restore a TLS session for the DoT healthcheck: %s", e.what());
}
}
- data->d_tcpHandler->tryConnect(ds->tcpFastOpen, ds->remote);
+ data->d_tcpHandler->tryConnect(ds->d_config.tcpFastOpen, ds->d_config.remote);
const uint8_t sizeBytes[] = { static_cast<uint8_t>(packetSize / 256), static_cast<uint8_t>(packetSize % 256) };
packet.insert(packet.begin() + proxyProtocolPayloadSize, sizeBytes, sizeBytes + 2);
size_t position = 0;
for(const auto& d : servers) {
if(d.second->isUp()) {
- poss.emplace_back(std::make_tuple(d.second->outstanding.load(), d.second->order, d.second->latencyUsec), position);
+ poss.emplace_back(std::make_tuple(d.second->outstanding.load(), d.second->d_config.order, d.second->latencyUsec), position);
}
++position;
}
for (const auto& pair : servers) {
if (pair.second->isUp()) {
currentLoad += pair.second->outstanding;
- totalWeight += pair.second->weight;
+ totalWeight += pair.second->d_config.d_weight;
}
}
}
for (const auto& d : servers) { // w=1, w=10 -> 1, 11
- if (d.second->isUp() && (g_weightedBalancingFactor == 0 || (d.second->outstanding <= (targetLoad * d.second->weight)))) {
+ if (d.second->isUp() && (g_weightedBalancingFactor == 0 || (d.second->outstanding <= (targetLoad * d.second->d_config.d_weight)))) {
// Don't overflow sum when adding high weights
- if (d.second->weight > max - sum) {
+ if (d.second->d_config.d_weight > max - sum) {
sum = max;
} else {
- sum += d.second->weight;
+ sum += d.second->d_config.d_weight;
}
poss.emplace_back(sum, d.first);
for (const auto& pair : servers) {
if (pair.second->isUp()) {
currentLoad += pair.second->outstanding;
- totalWeight += pair.second->weight;
+ totalWeight += pair.second->d_config.d_weight;
}
}
}
for (const auto& d: servers) {
- if (d.second->isUp() && (g_consistentHashBalancingFactor == 0 || d.second->outstanding <= (targetLoad * d.second->weight))) {
+ if (d.second->isUp() && (g_consistentHashBalancingFactor == 0 || d.second->outstanding <= (targetLoad * d.second->d_config.d_weight))) {
// make sure hashes have been computed
if (!d.second->hashesComputed) {
d.second->hash();
int dnsdist_ffi_server_get_weight(const dnsdist_ffi_server_t* server)
{
- return server->server->weight;
+ return server->server->d_config.d_weight;
}
int dnsdist_ffi_server_get_order(const dnsdist_ffi_server_t* server)
{
- return server->server->order;
+ return server->server->d_config.order;
}
double dnsdist_ffi_server_get_latency(const dnsdist_ffi_server_t* server)
{
auto payloadSize = std::to_string(query.d_buffer.size());
- bool addXForwarded = d_ds->d_addXForwardedHeaders;
+ bool addXForwarded = d_ds->d_config.d_addXForwardedHeaders;
/* We use nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_NAME and nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_VALUE
to avoid a copy and lowercasing but we need to make sure that the data will outlive the request (nghttp2_on_frame_send_callback called), and that it is already lowercased. */
/* Pseudo-headers need to come first (rfc7540 8.1.2.1) */
addStaticHeader(headers, "method-name", "method-value");
addStaticHeader(headers, "scheme-name", "scheme-value");
- addDynamicHeader(headers, "authority-name", d_ds->d_tlsSubjectName);
- addDynamicHeader(headers, "path-name", d_ds->d_dohPath);
+ addDynamicHeader(headers, "authority-name", d_ds->d_config.d_tlsSubjectName);
+ addDynamicHeader(headers, "path-name", d_ds->d_config.d_dohPath);
addStaticHeader(headers, "accept-name", "accept-value");
addStaticHeader(headers, "content-type-name", "content-type-value");
addStaticHeader(headers, "user-agent-name", "user-agent-value");
conn->d_currentStreams.erase(stream->first);
// cerr<<"Query has "<<request.d_query.d_downstreamFailures<<" failures, backend limit is "<<conn->d_ds->d_retries<<endl;
- if (request.d_query.d_downstreamFailures < conn->d_ds->d_retries) {
+ if (request.d_query.d_downstreamFailures < conn->d_ds->d_config.d_retries) {
// cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<<request.d_query.d_buffer.size()<<endl;
++request.d_query.d_downstreamFailures;
auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
DEBUGLOG("Opening TCP connection to backend "<<d_ds->getNameWithAddr());
++d_ds->tcpNewConnections;
try {
- auto socket = std::make_unique<Socket>(d_ds->remote.sin4.sin_family, SOCK_STREAM, 0);
+ auto socket = std::make_unique<Socket>(d_ds->d_config.remote.sin4.sin_family, SOCK_STREAM, 0);
DEBUGLOG("result of socket() is "<<socket->getHandle());
- if (!IsAnyAddress(d_ds->sourceAddr)) {
+ if (!IsAnyAddress(d_ds->d_config.sourceAddr)) {
SSetsockopt(socket->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
#ifdef IP_BIND_ADDRESS_NO_PORT
- if (d_ds->ipBindAddrNoPort) {
+ if (d_ds->d_config.ipBindAddrNoPort) {
SSetsockopt(socket->getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
}
#endif
#ifdef SO_BINDTODEVICE
- if (!d_ds->sourceItfName.empty()) {
- int res = setsockopt(socket->getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->sourceItfName.c_str(), d_ds->sourceItfName.length());
+ if (!d_ds->d_config.sourceItfName.empty()) {
+ int res = setsockopt(socket->getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->d_config.sourceItfName.c_str(), d_ds->d_config.sourceItfName.length());
if (res != 0) {
vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds->getNameWithAddr(), stringerror());
}
}
#endif
- socket->bind(d_ds->sourceAddr, false);
+ socket->bind(d_ds->d_config.sourceAddr, false);
}
socket->setNonBlocking();
gettimeofday(&d_connectionStartTime, nullptr);
- auto handler = std::make_unique<TCPIOHandler>(d_ds->d_tlsSubjectName, socket->releaseHandle(), timeval{0,0}, d_ds->d_tlsCtx, d_connectionStartTime.tv_sec);
+ auto handler = std::make_unique<TCPIOHandler>(d_ds->d_config.d_tlsSubjectName, socket->releaseHandle(), timeval{0,0}, d_ds->d_tlsCtx, d_connectionStartTime.tv_sec);
if (!tlsSession && d_ds->d_tlsCtx) {
tlsSession = g_sessionCache.getSession(d_ds->getID(), d_connectionStartTime.tv_sec);
}
if (tlsSession) {
handler->setTLSSession(tlsSession);
}
- handler->tryConnect(d_ds->tcpFastOpen && isFastOpenEnabled(), d_ds->remote);
+ handler->tryConnect(d_ds->d_config.tcpFastOpen && isFastOpenEnabled(), d_ds->d_config.remote);
d_queries = 0;
d_handler = std::move(handler);
catch (const std::runtime_error& e) {
vinfolog("Connection to downstream server %s failed: %s", d_ds->getName(), e.what());
d_downstreamFailures++;
- if (d_downstreamFailures >= d_ds->d_retries) {
+ if (d_downstreamFailures >= d_ds->d_config.d_retries) {
throw;
}
}
}
- while (d_downstreamFailures < d_ds->d_retries);
+ while (d_downstreamFailures < d_ds->d_config.d_retries);
return false;
}
}
}
-void TCPConnectionToBackend::release()
-{
+void TCPConnectionToBackend::release(){
d_ds->outstanding -= d_pendingResponses.size();
d_pendingResponses.clear();
DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->d_retries);
- if (conn->d_downstreamFailures < conn->d_ds->d_retries) {
+ if (conn->d_downstreamFailures < conn->d_ds->d_config.d_retries) {
conn->d_ioState.reset();
ioGuard.release();
class ConnectionToBackend : public std::enable_shared_from_this<ConnectionToBackend>
{
public:
- ConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->tcpFastOpen)
+ ConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->d_config.tcpFastOpen)
{
reconnect();
}
const ComboAddress& getRemote() const
{
- return d_ds->remote;
+ return d_ds->d_config.remote;
}
const std::string& getBackendName() const
- it cannot be reused for a different client
- we might have different TLV values for each query
*/
- if (d_ds && d_ds->useProxyProtocol == true && !sameClient) {
+ if (d_ds && d_ds->d_config.useProxyProtocol == true && !sameClient) {
return false;
}
return false;
}
- if (d_ds && d_ds->useProxyProtocol == true) {
+ if (d_ds && d_ds->d_config.useProxyProtocol == true) {
return sameClient;
}
if (d_ds == nullptr) {
throw std::runtime_error("getBackendReadTTD() without any backend selected");
}
- if (d_ds->checkTimeout == 0) {
+ if (d_ds->d_config.checkTimeout == 0) {
return boost::none;
}
struct timeval res = now;
- res.tv_sec += d_ds->checkTimeout / 1000; /* ms to s */
- res.tv_usec += (d_ds->checkTimeout % 1000) / 1000; /* remaining ms to µs */
+ res.tv_sec += d_ds->d_config.checkTimeout / 1000; /* ms to s */
+ res.tv_usec += (d_ds->d_config.checkTimeout % 1000) / 1000; /* remaining ms to µs */
return res;
}
if (d_ds == nullptr) {
throw std::runtime_error("getBackendReadTTD() without any backend selected");
}
- if (d_ds->tcpRecvTimeout == 0) {
+ if (d_ds->d_config.tcpRecvTimeout == 0) {
return boost::none;
}
struct timeval res = now;
- res.tv_sec += d_ds->tcpRecvTimeout;
+ res.tv_sec += d_ds->d_config.tcpRecvTimeout;
return res;
}
if (d_ds == nullptr) {
throw std::runtime_error("getBackendWriteTTD() called without any backend selected");
}
- if (d_ds->tcpSendTimeout == 0) {
+ if (d_ds->d_config.tcpSendTimeout == 0) {
return boost::none;
}
struct timeval res = now;
- res.tv_sec += d_ds->tcpSendTimeout;
+ res.tv_sec += d_ds->d_config.tcpSendTimeout;
return res;
}
if (d_ds == nullptr) {
throw std::runtime_error("getBackendConnectTTD() called without any backend selected");
}
- if (d_ds->tcpConnectTimeout == 0) {
+ if (d_ds->d_config.tcpConnectTimeout == 0) {
return boost::none;
}
struct timeval res = now;
- res.tv_sec += d_ds->tcpConnectTimeout;
+ res.tv_sec += d_ds->d_config.tcpConnectTimeout;
return res;
}
bool reachedMaxConcurrentQueries() const override
{
const size_t concurrent = d_pendingQueries.size() + d_pendingResponses.size();
- if (concurrent > 0 && concurrent >= d_ds->d_maxInFlightQueriesPerConn) {
+ if (concurrent > 0 && concurrent >= d_ds->d_config.d_maxInFlightQueriesPerConn) {
return true;
}
return false;
void notifyAllQueriesFailed(const struct timeval& now, FailureReason reason);
bool needProxyProtocolPayload() const
{
- return !d_proxyProtocolPayloadSent && (d_ds && d_ds->useProxyProtocol);
+ return !d_proxyProtocolPayloadSent && (d_ds && d_ds->d_config.useProxyProtocol);
}
class PendingRequest
cleanupClosedConnections(now);
- const bool haveProxyProtocol = ds->useProxyProtocol || !proxyProtocolPayload.empty();
+ const bool haveProxyProtocol = ds->d_config.useProxyProtocol || !proxyProtocolPayload.empty();
if (!haveProxyProtocol) {
const auto& it = d_downstreamConnections.find(backendId);
if (it != d_downstreamConnections.end()) {
}
double udiff = du->ids.sentTime.udiff();
- vinfolog("Got answer from %s, relayed to %s (https), took %f usec", du->downstream->remote.toStringWithPort(), du->ids.origRemote.toStringWithPort(), udiff);
+ vinfolog("Got answer from %s, relayed to %s (https), took %f usec", du->downstream->d_config.remote.toStringWithPort(), du->ids.origRemote.toStringWithPort(), udiff);
- handleResponseSent(du->ids, udiff, *dr.remote, du->downstream->remote, du->response.size(), cleartextDH, du->downstream->getProtocol());
+ handleResponseSent(du->ids, udiff, *dr.remote, du->downstream->d_config.remote, du->response.size(), cleartextDH, du->downstream->getProtocol());
++g_stats.responses;
if (du->ids.cs) {
std::string proxyProtocolPayload;
/* we need to do this _before_ creating the cross protocol query because
after that the buffer will have been moved */
- if (du->downstream->useProxyProtocol) {
+ if (du->downstream->d_config.useProxyProtocol) {
proxyProtocolPayload = getProxyProtocolPayload(dq);
}
ids->destHarvested = false;
}
- if (du->downstream->useProxyProtocol) {
+ if (du->downstream->d_config.useProxyProtocol) {
size_t payloadSize = 0;
if (addProxyProtocol(dq)) {
du->proxyProtocolPayloadSize = payloadSize;
}
double udiff = du->ids.sentTime.udiff();
- vinfolog("Got answer from %s, relayed to %s (https), took %f usec", du->downstream->remote.toStringWithPort(), du->ids.origRemote.toStringWithPort(), udiff);
+ vinfolog("Got answer from %s, relayed to %s (https), took %f usec", du->downstream->d_config.remote.toStringWithPort(), du->ids.origRemote.toStringWithPort(), udiff);
- handleResponseSent(du->ids, udiff, *dr.remote, du->downstream->remote, du->response.size(), cleartextDH, du->downstream->getProtocol());
+ handleResponseSent(du->ids, udiff, *dr.remote, du->downstream->d_config.remote, du->response.size(), cleartextDH, du->downstream->getProtocol());
++g_stats.responses;
if (du->ids.cs) {
return DNSAction::Action::None;
}
+bool setupDoTProtocolNegotiation(std::shared_ptr<TLSCtx>&)
+{
+ return true;
+}
+
+void responderThread(std::shared_ptr<DownstreamState> dss)
+{
+}
+
string g_outputBuffer;
+std::atomic<bool> g_configurationDone{false};
static DNSQuestion getDQ(const DNSName* providedName = nullptr)
{
we need to keep them ordered!).
However the first server has a QPS limit at 10 qps, so any query above that should be routed
to the second server. */
- servers.at(0).second->order = 1;
- servers.at(1).second->order = 2;
+ servers.at(0).second->d_config.order = 1;
+ servers.at(1).second->d_config.order = 2;
servers.at(0).second->qps = QPSLimiter(qpsLimit, qpsLimit);
/* mark the servers as 'up' */
servers.at(0).second->setUp();
/* reset */
for (auto& entry : serversMap) {
entry.second = 0;
- BOOST_CHECK_EQUAL(entry.first->weight, 1);
+ BOOST_CHECK_EQUAL(entry.first->d_config.d_weight, 1);
}
/* reset */
for (auto& entry : serversMap) {
entry.second = 0;
- BOOST_CHECK_EQUAL(entry.first->weight, 1);
+ BOOST_CHECK_EQUAL(entry.first->d_config.d_weight, 1);
}
/* change the weight of the last server to 100, default is 1 */
- servers.at(servers.size()-1).second->weight = 100;
+ servers.at(servers.size()-1).second->d_config.d_weight = 100;
for (size_t idx = 0; idx < 1000; idx++) {
auto server = pol.getSelectedBackend(servers, dq);
uint64_t totalW = 0;
for (const auto& entry : serversMap) {
total += entry.second;
- totalW += entry.first->weight;
+ totalW += entry.first->d_config.d_weight;
}
BOOST_CHECK_EQUAL(total, 1000U);
auto last = servers.at(servers.size()-1).second;
const auto got = serversMap[last];
- float expected = (1000 * 1.0 * last->weight) / totalW;
+ float expected = (1000 * 1.0 * last->d_config.d_weight) / totalW;
BOOST_CHECK_GT(got, expected / 2);
BOOST_CHECK_LT(got, expected * 2);
}
/* reset */
for (auto& entry : serversMap) {
entry.second = 0;
- BOOST_CHECK_EQUAL(entry.first->weight, 1);
+ BOOST_CHECK_EQUAL(entry.first->d_config.d_weight, 1);
}
/* request 1000 times the same name, we should go to the same server every time */
/* reset */
for (auto& entry : serversMap) {
entry.second = 0;
- BOOST_CHECK_EQUAL(entry.first->weight, 1);
+ BOOST_CHECK_EQUAL(entry.first->d_config.d_weight, 1);
}
/* change the weight of the last server to 100, default is 1 */
servers.at(servers.size()-1).second->setWeight(100);
uint64_t totalW = 0;
for (const auto& entry : serversMap) {
total += entry.second;
- totalW += entry.first->weight;
+ totalW += entry.first->d_config.d_weight;
}
BOOST_CHECK_EQUAL(total, names.size());
auto last = servers.at(servers.size()-1).second;
const auto got = serversMap[last];
- float expected = (names.size() * 1.0 * last->weight) / totalW;
+ float expected = (names.size() * 1.0 * last->d_config.d_weight) / totalW;
BOOST_CHECK_GT(got, expected / 2);
BOOST_CHECK_LT(got, expected * 2);
}
/* reset */
for (auto& entry : serversMap) {
entry.second = 0;
- BOOST_CHECK_EQUAL(entry.first->weight, 1000);
+ BOOST_CHECK_EQUAL(entry.first->d_config.d_weight, 1000);
}
/* request 1000 times the same name, we should go to the same server every time */
/* reset */
for (auto& entry : serversMap) {
entry.second = 0;
- BOOST_CHECK_EQUAL(entry.first->weight, 1000);
+ BOOST_CHECK_EQUAL(entry.first->d_config.d_weight, 1000);
}
/* change the weight of the last server to 100000, others stay at 1000 */
servers.at(servers.size()-1).second->setWeight(100000);
uint64_t totalW = 0;
for (const auto& entry : serversMap) {
total += entry.second;
- totalW += entry.first->weight;
+ totalW += entry.first->d_config.d_weight;
}
BOOST_CHECK_EQUAL(total, names.size());
auto last = servers.at(servers.size()-1).second;
const auto got = serversMap[last];
- float expected = (names.size() * 1.0 * last->weight) / totalW;
+ float expected = (names.size() * 1.0 * last->d_config.d_weight) / totalW;
BOOST_CHECK_GT(got, expected / 2);
BOOST_CHECK_LT(got, expected * 2);
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
auto sender = std::make_shared<MockupQuerySender>();
sender->d_id = counter;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
auto sender = std::make_shared<MockupQuerySender>();
sender->d_id = counter;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
}
struct timeval later = now;
- later.tv_sec += backend->tcpSendTimeout + 1;
+ later.tv_sec += backend->d_config.tcpSendTimeout + 1;
auto expiredConns = handleH2Timeouts(*s_mplexer, later);
BOOST_CHECK_EQUAL(expiredConns, 1U);
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
}
struct timeval later = now;
- later.tv_sec += backend->tcpRecvTimeout + 1;
+ later.tv_sec += backend->d_config.tcpRecvTimeout + 1;
auto expiredConns = handleH2Timeouts(*s_mplexer, later);
BOOST_CHECK_EQUAL(expiredConns, 1U);
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
/* set the number of reconnection attempts to a low value to not waste time */
- backend->d_retries = 1;
+ backend->d_config.d_retries = 1;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
}
struct timeval later = now;
- later.tv_sec += backend->tcpRecvTimeout + 1;
+ later.tv_sec += backend->d_config.tcpRecvTimeout + 1;
auto expiredConns = handleH2Timeouts(*s_mplexer, later);
BOOST_CHECK_EQUAL(expiredConns, 1U);
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
- backend->d_tlsSubjectName = "backend.powerdns.com";
- backend->d_dohPath = "/dns-query";
- backend->d_addXForwardedHeaders = true;
- backend->useProxyProtocol = true;
+ backend->d_config.d_tlsSubjectName = "backend.powerdns.com";
+ backend->d_config.d_dohPath = "/dns-query";
+ backend->d_config.d_addXForwardedHeaders = true;
+ backend->d_config.useProxyProtocol = true;
size_t numberOfQueries = 2;
std::vector<std::pair<std::shared_ptr<MockupQuerySender>, InternalQuery>> queries;
auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now);
IncomingTCPConnectionState::handleIO(state, now);
struct timeval later = now;
- later.tv_sec += backend->tcpSendTimeout + 1;
+ later.tv_sec += backend->d_config.tcpSendTimeout + 1;
auto expiredWriteConns = threadData.mplexer->getTimeouts(later, true);
BOOST_CHECK_EQUAL(expiredWriteConns.size(), 1U);
for (const auto& cbData : expiredWriteConns) {
auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now);
IncomingTCPConnectionState::handleIO(state, now);
struct timeval later = now;
- later.tv_sec += backend->tcpRecvTimeout + 1;
+ later.tv_sec += backend->d_config.tcpRecvTimeout + 1;
auto expiredConns = threadData.mplexer->getTimeouts(later, false);
BOOST_CHECK_EQUAL(expiredConns.size(), 1U);
for (const auto& cbData : expiredConns) {
auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now);
IncomingTCPConnectionState::handleIO(state, now);
BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U);
- BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size() * backend->d_retries);
+ BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size() * backend->d_config.d_retries);
BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U);
/* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */
IncomingTCPConnectionState::handleIO(state, now);
BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size());
BOOST_CHECK(s_writeBuffer == query);
- BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size() * backend->d_retries);
+ BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size() * backend->d_config.d_retries);
BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U);
/* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
/* enable out-of-order on the backend side as well */
- backend->d_maxInFlightQueriesPerConn = 65536;
+ backend->d_config.d_maxInFlightQueriesPerConn = 65536;
/* shorter than the client one */
- backend->tcpRecvTimeout = 1;
+ backend->d_config.tcpRecvTimeout = 1;
TCPClientThreadData threadData;
threadData.mplexer = std::make_unique<MockupFDMultiplexer>();
}
struct timeval later = now;
- later.tv_sec += backend->tcpRecvTimeout + 1;
+ later.tv_sec += backend->d_config.tcpRecvTimeout + 1;
auto expiredConns = threadData.mplexer->getTimeouts(later, false);
BOOST_CHECK_EQUAL(expiredConns.size(), 1U);
for (const auto& cbData : expiredConns) {
}
struct timeval later = now;
- later.tv_sec += backend->tcpRecvTimeout + 1;
+ later.tv_sec += backend->d_config.tcpRecvTimeout + 1;
auto expiredConns = threadData.mplexer->getTimeouts(later, false);
BOOST_CHECK_EQUAL(expiredConns.size(), 1U);
for (const auto& cbData : expiredConns) {
appendPayloadEditingID(expectedWriteBuffer, responses.at(4), 4);
/* make sure that the backend's timeout is longer than the client's */
- backend->tcpRecvTimeout = 30;
+ backend->d_config.tcpRecvTimeout = 30;
bool timeout = false;
int backendDescriptor = -1;
auto proxyEnabledBackend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
proxyEnabledBackend->d_tlsCtx = tlsCtx;
/* enable out-of-order on the backend side as well */
- proxyEnabledBackend->d_maxInFlightQueriesPerConn = 65536;
- proxyEnabledBackend->useProxyProtocol = true;
+ proxyEnabledBackend->d_config.d_maxInFlightQueriesPerConn = 65536;
+ proxyEnabledBackend->d_config.useProxyProtocol = true;
expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), proxyPayload.begin(), proxyPayload.end());
appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), 0);
auto proxyEnabledBackend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
proxyEnabledBackend->d_tlsCtx = tlsCtx;
/* enable out-of-order on the backend side as well */
- proxyEnabledBackend->d_maxInFlightQueriesPerConn = 65536;
- proxyEnabledBackend->useProxyProtocol = true;
+ proxyEnabledBackend->d_config.d_maxInFlightQueriesPerConn = 65536;
+ proxyEnabledBackend->d_config.useProxyProtocol = true;
expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), proxyPayload.begin(), proxyPayload.end());
appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), 0);
s_readBuffer.insert(s_readBuffer.end(), queries.at(2).begin(), queries.at(2).end());
/* make sure that the backend's timeout is shorter than the client's */
- backend->tcpConnectTimeout = 1;
+ backend->d_config.tcpConnectTimeout = 1;
g_tcpRecvTimeout = 5;
bool timeout = false;
}
struct timeval later = now;
- later.tv_sec += backend->tcpConnectTimeout + 1;
+ later.tv_sec += backend->d_config.tcpConnectTimeout + 1;
auto expiredConns = threadData.mplexer->getTimeouts(later, true);
BOOST_CHECK_EQUAL(expiredConns.size(), 1U);
for (const auto& cbData : expiredConns) {
BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U);
/* restore */
- backend->tcpSendTimeout = 30;
+ backend->d_config.tcpSendTimeout = 30;
g_tcpRecvTimeout = 2;
/* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */
auto backend1 = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend1->d_tlsCtx = tlsCtx;
/* only two queries in flight! */
- backend1->d_maxInFlightQueriesPerConn = 2;
+ backend1->d_config.d_maxInFlightQueriesPerConn = 2;
int backend1Desc = -1;
int backend2Desc = -1;
auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53));
backend->d_tlsCtx = tlsCtx;
/* shorter than the client one */
- backend->tcpRecvTimeout = 1;
+ backend->d_config.tcpRecvTimeout = 1;
TCPClientThreadData threadData;
threadData.mplexer = std::make_unique<MockupFDMultiplexer>();