setLuaNoSideEffect();
boost::format fmt("%-10d %-10d %-10d %-10d\n");
g_outputBuffer += (fmt % "Clients" % "MaxClients" % "Queued" % "MaxQueued").str();
- g_outputBuffer += (fmt % g_tcpclientthreads->d_numthreads % g_maxTCPClientThreads % g_tcpclientthreads->d_queued % g_maxTCPQueuedConnections).str();
+ g_outputBuffer += (fmt % g_tcpclientthreads->getThreadsCount() % g_maxTCPClientThreads % g_tcpclientthreads->getQueuedCount() % g_maxTCPQueuedConnections).str();
});
g_lua.writeFunction("setCacheCleaningDelay", [](uint32_t delay) { g_cacheCleaningDelay = delay; });
uint64_t g_maxTCPQueuedConnections{1000};
void* tcpClientThread(int pipefd);
-// Should not be called simultaneously!
void TCPClientCollection::addTCPClientThread()
{
- if (d_numthreads >= d_tcpclientthreads.capacity()) {
- warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
- return;
- }
-
vinfolog("Adding TCP Client thread");
int pipefds[2] = { -1, -1};
return;
}
- d_tcpclientthreads.push_back(pipefds[1]);
+ {
+ std::lock_guard<std::mutex> lock(d_mutex);
+
+ if (d_numthreads >= d_tcpclientthreads.capacity()) {
+ warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
+ close(pipefds[0]);
+ close(pipefds[1]);
+ return;
+ }
+
+ d_tcpclientthreads.push_back(pipefds[1]);
+ }
+
++d_numthreads;
}
throw std::runtime_error("Error reading from TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode: " + e.what());
}
- --g_tcpclientthreads->d_queued;
+ g_tcpclientthreads->decrementQueuedCount();
ci=*citmp;
delete citmp;
continue;
}
- if(g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->d_queued >= g_maxTCPQueuedConnections) {
+ if(g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
close(ci->fd);
delete ci;
ci=nullptr;
writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
}
else {
- --g_tcpclientthreads->d_queued;
+ g_tcpclientthreads->decrementQueuedCount();
queuedCounterIncremented = false;
close(ci->fd);
delete ci;
delete ci;
ci = nullptr;
if (queuedCounterIncremented) {
- --g_tcpclientthreads->d_queued;
+ g_tcpclientthreads->decrementQueuedCount();
}
}
catch(...){}
class TCPClientCollection {
std::vector<int> d_tcpclientthreads;
+ std::atomic<uint64_t> d_numthreads{0};
std::atomic<uint64_t> d_pos{0};
-public:
- std::atomic<uint64_t> d_queued{0}, d_numthreads{0};
+ std::atomic<uint64_t> d_queued{0};
uint64_t d_maxthreads{0};
+ std::mutex d_mutex;
+public:
TCPClientCollection(size_t maxThreads)
{
d_maxthreads = maxThreads;
d_tcpclientthreads.reserve(maxThreads);
}
-
int getThread()
{
uint64_t pos = d_pos++;
++d_queued;
return d_tcpclientthreads[pos % d_numthreads];
}
+ bool hasReachedMaxThreads() const
+ {
+ return d_numthreads >= d_maxthreads;
+ }
+ uint64_t getThreadsCount() const
+ {
+ return d_numthreads;
+ }
+ uint64_t getQueuedCount() const
+ {
+ return d_queued;
+ }
+ void decrementQueuedCount()
+ {
+ --d_queued;
+ }
void addTCPClientThread();
};