The maximum number of threads in the TCP pool is controlled by the
`setMaxTCPClientThreads()` directive, and defaults to 10. This number can be
increased to handle a large number of simultaneous TCP connections.
+If all the TCP threads are busy, new TCP connections are queued while
+they wait to be picked up. The maximum number of queued connections
+can be configured with `setMaxTCPQueuedConnections()`, and any value other
+than 0 (the default) will cause new connections to be dropped if there
+are already too many queued.
When dispatching UDP queries to backend servers, `dnsdist` keeps track of at
most `n` outstanding queries for each backend. This number `n` can be tuned by
* `setTCPRecvTimeout(n)`: set the read timeout on TCP connections from the client, in seconds
* `setTCPSendTimeout(n)`: set the write timeout on TCP connections from the client, in seconds
* `setMaxTCPClientThreads(n)`: set the maximum of TCP client threads, handling TCP connections
+ * `setMaxTCPQueuedConnections(n)`: set the maximum number of TCP connections queued (waiting to be picked up by a client thread)
* `setMaxUDPOutstanding(n)`: set the maximum number of outstanding UDP queries to a given backend server. This can only be set at configuration time and defaults to 10240
* `setCacheCleaningDelay(n)`: set the interval in seconds between two runs of the cache cleaning algorithm, removing expired entries
* `setStaleCacheEntriesTTL(n)`: allows using cache entries expired for at most `n` seconds when no backend available to answer for a query
cout<<endl;
}
}
+ close(fd);
}
void doConsole()
"QTypeRule(",
"setACL(", "setDNSSECPool(", "setECSOverride(",
"setECSSourcePrefixV4(", "setECSSourcePrefixV6(", "setKey(", "setLocal(",
- "setMaxTCPClientThreads(", "setMaxUDPOutstanding(", "setServerPolicy(", "setServerPolicyLua(",
+ "setMaxTCPClientThreads(", "setMaxTCPQueuedConnections(", "setMaxUDPOutstanding(", "setServerPolicy(",
+ "setServerPolicyLua(",
"setTCPRecvTimeout(", "setTCPSendTimeout(", "setVerboseHealthChecks(", "show(", "showACL()",
"showDNSCryptBinds()", "showDynBlocks()", "showResponseLatency()", "showRules()",
"showServerPolicy()", "showServers()", "shutdown()", "SpoofAction(",
g_lua.registerMember<bool (DNSQuestion::*)>("tcp", [](const DNSQuestion& dq) -> bool { return dq.tcp; }, [](DNSQuestion& dq, bool newTcp) { (void) newTcp; });
g_lua.registerMember<bool (DNSQuestion::*)>("skipCache", [](const DNSQuestion& dq) -> bool { return dq.skipCache; }, [](DNSQuestion& dq, bool newSkipCache) { dq.skipCache = newSkipCache; });
- g_lua.writeFunction("setMaxTCPClientThreads", [](uint64_t max) { g_maxTCPClientThreads = max; });
+ g_lua.writeFunction("setMaxTCPClientThreads", [](uint64_t max) {
+ if (!g_configurationDone) {
+ g_maxTCPClientThreads = max;
+ } else {
+ g_outputBuffer="Maximum TCP client threads count cannot be altered at runtime!\n";
+ }
+ });
+
+ g_lua.writeFunction("setMaxTCPQueuedConnections", [](uint64_t max) {
+ if (!g_configurationDone) {
+ g_maxTCPQueuedConnections = max;
+ } else {
+ g_outputBuffer="The maximum number of queued TCP connections cannot be altered at runtime!\n";
+ }
+ });
g_lua.writeFunction("setCacheCleaningDelay", [](uint32_t delay) { g_cacheCleaningDelay = delay; });
ClientState* cs;
};
+uint64_t g_maxTCPQueuedConnections{0};
void* tcpClientThread(int pipefd);
// Should not be called simultaneously!
void TCPClientCollection::addTCPClientThread()
{
if (d_numthreads >= d_tcpclientthreads.capacity()) {
- warnlog("Adding a new TCP client thread would exceed the vector capacity, skipping");
+ warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
return;
}
if(pipe(pipefds) < 0)
unixDie("Creating pipe");
- if (!setNonBlocking(pipefds[1]))
+ if (!setNonBlocking(pipefds[1])) {
+ close(pipefds[0]);
+ close(pipefds[1]);
unixDie("Setting pipe non-blocking");
+ }
d_tcpclientthreads.push_back(pipefds[1]);
+ ++d_numthreads;
thread t1(tcpClientThread, pipefds[0]);
t1.detach();
- ++d_numthreads;
}
static bool getNonBlockingMsgLen(int fd, uint16_t* len, int timeout)
continue;
}
+ if(g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->d_queued >= g_maxTCPQueuedConnections) {
+ close(ci->fd);
+ delete ci;
+ ci=nullptr;
+ vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
+ continue;
+ }
+
vinfolog("Got TCP connection from %s", remote.toStringWithPort());
ci->remote = remote;
writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
}
else {
+ --g_tcpclientthreads->d_queued;
close(ci->fd);
delete ci;
}
return false;
}
-std::atomic<uint64_t> g_maxTCPClientThreads{10};
+uint64_t g_maxTCPClientThreads{10};
std::atomic<uint16_t> g_cacheCleaningDelay{60};
void* maintThread()
for(;;) {
sleep(interval);
- if(g_tcpclientthreads->d_queued > 1 && g_tcpclientthreads->d_numthreads < g_maxTCPClientThreads)
+ if(g_tcpclientthreads->d_queued > 1 && g_tcpclientthreads->d_numthreads < g_tcpclientthreads->d_maxthreads)
g_tcpclientthreads->addTCPClientThread();
for(auto& dss : g_dstates.getCopy()) { // this points to the actual shared_ptrs!
std::atomic<uint64_t> d_pos{0};
public:
std::atomic<uint64_t> d_queued{0}, d_numthreads{0};
+ uint64_t d_maxthreads{0};
TCPClientCollection(size_t maxThreads)
{
+ d_maxthreads = maxThreads;
d_tcpclientthreads.reserve(maxThreads);
}
int getThread()
{
- int pos = d_pos++;
+ uint64_t pos = d_pos++;
++d_queued;
return d_tcpclientthreads[pos % d_numthreads];
}
extern int g_tcpSendTimeout;
extern uint16_t g_maxOutstanding;
extern std::atomic<bool> g_configurationDone;
-extern std::atomic<uint64_t> g_maxTCPClientThreads;
+extern uint64_t g_maxTCPClientThreads;
+extern uint64_t g_maxTCPQueuedConnections;
extern std::atomic<uint16_t> g_cacheCleaningDelay;
extern uint16_t g_ECSSourcePrefixV4;
extern uint16_t g_ECSSourcePrefixV6;