* `setCacheCleaningDelay(n)`: set the interval in seconds between two runs of the cache cleaning algorithm, removing expired entries
* `setCacheCleaningPercentage(n)`: set the percentage of the cache that the cache cleaning algorithm will try to free by removing expired entries. By default (100), all expired entries are removed
* `setStaleCacheEntriesTTL(n)`: allows using cache entries expired for at most `n` seconds when no backend available to answer for a query
+ * `setTCPDownstreamCleanupInterval(interval)`: minimum interval in seconds between two cleanups of the idle TCP downstream connections. Defaults to 60s
* `setTCPUseSinglePipe(bool)`: whether the incoming TCP connections should be put into a single queue instead of using per-thread queues. Defaults to false
* `setTCPRecvTimeout(n)`: set the read timeout on TCP connections from the client, in seconds
* `setTCPSendTimeout(n)`: set the write timeout on TCP connections from the client, in seconds
{ "setServerPolicy", true, "policy", "set server selection policy to that policy" },
{ "setServerPolicyLua", true, "name, function", "set server selection policy to one named 'name' and provided by 'function'" },
{ "setServFailWhenNoServer", true, "bool", "if set, return a ServFail when no servers are available, instead of the default behaviour of dropping the query" },
+ { "setTCPDownstreamCleanupInterval", true, "interval", "minimum interval in seconds between two cleanups of the idle TCP downstream connections" },
{ "setTCPUseSinglePipe", true, "bool", "whether the incoming TCP connections should be put into a single queue instead of using per-thread queues. Defaults to false" },
{ "setTCPRecvTimeout", true, "n", "set the read timeout on TCP connections from the client, in seconds" },
{ "setTCPSendTimeout", true, "n", "set the write timeout on TCP connections from the client, in seconds" },
g_outputBuffer=poolObj->policy->name+"\n";
}
});
+
+ g_lua.writeFunction("setTCPDownstreamCleanupInterval", [](uint16_t interval) {
+ setLuaSideEffect();
+ g_downstreamTCPCleanupInterval = interval;
+ });
}
static std::mutex tcpClientsCountMutex;
static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
bool g_useTCPSinglePipe{false};
+std::atomic<uint16_t> g_downstreamTCPCleanupInterval{60};
void* tcpClientThread(int pipefd);
return false;
}
+void cleanupClosedTCPConnections(std::map<ComboAddress,int>& sockets)
+{
+ for(auto it = sockets.begin(); it != sockets.end(); ) {
+ if (isTCPSocketUsable(it->second)) {
+ ++it;
+ }
+ else {
+ close(it->second);
+ it = sockets.erase(it);
+ }
+ }
+}
+
std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
void* tcpClientThread(int pipefd)
bool outstanding = false;
blockfilter_t blockFilter = 0;
+ time_t lastTCPCleanup = time(nullptr);
{
std::lock_guard<std::mutex> lock(g_luamutex);
--ds->outstanding;
}
decrementTCPClientCount(ci.remote);
+
+ if (g_downstreamTCPCleanupInterval > 0 && (connectionStartTime > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) {
+ cleanupClosedTCPConnections(sockets);
+ lastTCPCleanup = time(nullptr);
+ }
}
return 0;
}
extern bool g_servFailOnNoPolicy;
extern uint32_t g_hashperturb;
extern bool g_useTCPSinglePipe;
+extern std::atomic<uint16_t> g_downstreamTCPCleanupInterval;
struct ConsoleKeyword {
std::string name;
return false;
}
+
+/* requires a non-blocking socket.
+ On Linux, we could use MSG_DONTWAIT on a blocking socket
+ but this is not portable.
+*/
+bool isTCPSocketUsable(int sock)
+{
+ int err = 0;
+ char buf = '\0';
+ size_t buf_size = sizeof(buf);
+
+ do {
+ ssize_t got = recv(sock, &buf, buf_size, MSG_PEEK);
+
+ if (got > 0) {
+ /* socket is usable, some data is even waiting to be read */
+ return true;
+ }
+ else if (got == 0) {
+ /* other end has closed the socket */
+ return false;
+ }
+ else {
+ int err = errno;
+
+ if (err == EAGAIN || err == EWOULDBLOCK) {
+ /* socket is usable, no data waiting */
+ return true;
+ }
+ else {
+ if (err != EINTR) {
+ /* something is wrong, could be ECONNRESET,
+ ENOTCONN, EPIPE, but anyway this socket is
+ not usable. */
+ return false;
+ }
+ }
+ }
+ } while (err == EINTR);
+
+ return false;
+}
ssize_t sendfromto(int sock, const char* data, size_t len, int flags, const ComboAddress& from, const ComboAddress& to);
ssize_t sendMsgWithTimeout(int fd, const char* buffer, size_t len, int timeout, ComboAddress& dest, const ComboAddress& local, unsigned int localItf);
bool sendSizeAndMsgWithTimeout(int sock, uint16_t bufferLen, const char* buffer, int idleTimeout, const ComboAddress* dest, const ComboAddress* local, unsigned int localItf, int totalTimeout, int flags);
+/* requires a non-blocking, connected TCP socket */
+bool isTCPSocketUsable(int sock);
extern template class NetmaskTree<bool>;