helper threads like SNMP might have t_id == 0 as well)
then the distributor threads if any
and finally the workers */
-std::vector<RecThreadInfo> g_threadInfos;
+std::vector<RecThreadInfo> RecThreadInfo::s_threadInfos;
bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
unsigned int RecThreadInfo::s_numDistributorThreads;
}
/* thread 0 is the handler / SNMP, worker threads start at 1 */
- for (unsigned int n = 0; n <= (RecThreadInfo::s_numWorkerThreads + RecThreadInfo::s_numDistributorThreads); ++n) {
- auto& threadInfos = g_threadInfos.at(n);
+ for (unsigned int n = 0; n <= RecThreadInfo::numThreads(); ++n) {
+ auto& threadInfo = RecThreadInfo::info(n);
int fd[2];
if (pipe(fd) < 0)
unixDie("Creating pipe for inter-thread communications");
- threadInfos.pipes.readToThread = fd[0];
- threadInfos.pipes.writeToThread = fd[1];
+ threadInfo.pipes.readToThread = fd[0];
+ threadInfo.pipes.writeToThread = fd[1];
// handler thread only gets first pipe, not the others
if (n == 0) {
if (pipe(fd) < 0)
unixDie("Creating pipe for inter-thread communications");
- threadInfos.pipes.readFromThread = fd[0];
- threadInfos.pipes.writeFromThread = fd[1];
+ threadInfo.pipes.readFromThread = fd[0];
+ threadInfo.pipes.writeFromThread = fd[1];
if (pipe(fd) < 0)
unixDie("Creating pipe for inter-thread communications");
- threadInfos.pipes.readQueriesToThread = fd[0];
- threadInfos.pipes.writeQueriesToThread = fd[1];
+ threadInfo.pipes.readQueriesToThread = fd[0];
+ threadInfo.pipes.writeQueriesToThread = fd[1];
if (pipeBufferSize > 0) {
- if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
+ if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) {
int err = errno;
g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl;
- auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
+ auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread);
if (existingSize > 0) {
g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl;
}
}
}
- if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
+ if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) {
unixDie("Making pipe for inter-thread communications non-blocking");
}
}
g_log << Logger::Notice << "stats: " << pcSize << " packet cache entries, " << ratePercentage(pcHits, SyncRes::s_queries) << "% packet cache hits" << endl;
size_t idx = 0;
- for (const auto& threadInfo : g_threadInfos) {
+ for (const auto& threadInfo : RecThreadInfo::infos()) {
if (threadInfo.isWorker()) {
g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl;
++idx;
for the initialization of ACLs and domain maps. After that it should only
be called by the handler. */
- if (g_threadInfos.empty() && RecThreadInfo::id() == 0) {
+ if (RecThreadInfo::infos().empty() && RecThreadInfo::id() == 0) {
/* the handler and distributors will call themselves below, but
during startup we get called while g_threadInfos has not been
populated yet to update the ACL or domain maps, so we need to
}
unsigned int n = 0;
- for (const auto& threadInfo : g_threadInfos) {
+ for (const auto& threadInfo : RecThreadInfo::infos()) {
if (n++ == RecThreadInfo::id()) {
func(); // don't write to ourselves!
continue;
unsigned int n = 0;
T ret = T();
- for (const auto& threadInfo : g_threadInfos) {
+ for (const auto& threadInfo : RecThreadInfo::infos()) {
if (n++ == RecThreadInfo::id()) {
continue;
}
g_reusePort = ::arg().mustDo("reuseport");
#endif
- g_threadInfos.resize(RecThreadInfo::s_numDistributorThreads + RecThreadInfo::s_numWorkerThreads + /* handler */ 1);
+ RecThreadInfo::infos().resize(RecThreadInfo::numThreads() + /* handler */ 1);
if (g_reusePort) {
if (RecThreadInfo::s_weDistributeQueries) {
/* first thread is the handler, then distributors */
for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
- auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds;
- auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets;
+ auto& info = RecThreadInfo::info(threadId);
+ auto& deferredAdds = info.deferredAdds;
+ auto& tcpSockets = info.tcpSockets;
makeUDPServerSockets(deferredAdds);
makeTCPServerSockets(deferredAdds, tcpSockets);
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
- auto& deferredAdds = g_threadInfos.at(threadId).deferredAdds;
- auto& tcpSockets = g_threadInfos.at(threadId).tcpSockets;
+ auto& info = RecThreadInfo::info(threadId);
+ auto& deferredAdds = info.deferredAdds;
+ auto& tcpSockets = info.tcpSockets;
makeUDPServerSockets(deferredAdds);
makeTCPServerSockets(deferredAdds, tcpSockets);
}
if (RecThreadInfo::s_weDistributeQueries) {
/* first thread is the handler, then distributors */
for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numDistributorThreads; threadId++) {
- g_threadInfos.at(threadId).tcpSockets = tcpSockets;
+ RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
}
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
for (unsigned int threadId = 1; threadId <= RecThreadInfo::s_numWorkerThreads; threadId++) {
- g_threadInfos.at(threadId).tcpSockets = tcpSockets;
+ RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
}
}
}
#endif
/* This thread handles the web server, carbon, statistics and the control channel */
- auto& handlerInfo = g_threadInfos.at(0);
+ auto& handlerInfo = RecThreadInfo::info(0);
handlerInfo.setHandler();
handlerInfo.start(0, "web+stat");
setCPUMap(cpusMap, currentThreadId, pthread_self());
- auto& info = g_threadInfos.at(currentThreadId);
+ auto& info = RecThreadInfo::info(currentThreadId);
info.setListener();
info.setWorker();
info.setThreadId(currentThreadId++);
else {
if (RecThreadInfo::s_weDistributeQueries) {
for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
- g_threadInfos.at(currentThreadId + n).setListener();
+ RecThreadInfo::info(currentThreadId + n).setListener();
}
}
for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
- auto& info = g_threadInfos.at(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n);
+ auto& info = RecThreadInfo::info(currentThreadId + (RecThreadInfo::s_weDistributeQueries ? RecThreadInfo::s_numDistributorThreads : 0) + n);
info.setListener(!RecThreadInfo::s_weDistributeQueries);
info.setWorker();
}
if (RecThreadInfo::s_weDistributeQueries) {
g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numDistributorThreads << " distributor threads" << endl;
for (unsigned int n = 0; n < RecThreadInfo::s_numDistributorThreads; ++n) {
- auto& info = g_threadInfos.at(currentThreadId);
+ auto& info = RecThreadInfo::info(currentThreadId);
info.start(currentThreadId++, "distr");
- setCPUMap(cpusMap, currentThreadId, info.thread.native_handle());
+ setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
}
}
g_log << Logger::Warning << "Launching " << RecThreadInfo::s_numWorkerThreads << " worker threads" << endl;
for (unsigned int n = 0; n < RecThreadInfo::s_numWorkerThreads; ++n) {
- auto& info = g_threadInfos.at(currentThreadId);
+ auto& info = RecThreadInfo::info(currentThreadId);
info.start(currentThreadId++, "worker");
- setCPUMap(cpusMap, currentThreadId, info.thread.native_handle());
+ setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
}
#ifdef HAVE_SYSTEMD
#endif
/* This thread handles the web server, carbon, statistics and the control channel */
- auto& info = g_threadInfos.at(0);
+ auto& info = RecThreadInfo::info(0);
info.setHandler();
info.start(0, "web+stat");
- for (auto& ti : g_threadInfos) {
+ for (auto& ti : RecThreadInfo::infos()) {
ti.thread.join();
if (ti.exitCode != 0) {
ret = ti.exitCode;
return hadError;
}
-struct RecThreadInfo;
-/* first we have the handler thread, t_id == 0 (some other
- helper threads like SNMP might have t_id == 0 as well)
- then the distributor threads if any
- and finally the workers */
-extern std::vector<RecThreadInfo> g_threadInfos;
void* recursorThread();
-// for communicating with our threads
-// effectively readonly after startup
+// For communicating with our threads effectively readonly after
+// startup.
+// First we have the handler thread, t_id == 0 (some other helper
+// threads like SNMP might have t_id == 0 as well) then the
+// distributor threads if any and finally the workers
struct RecThreadInfo
{
struct ThreadPipeSet
public:
static RecThreadInfo& self()
{
- return g_threadInfos.at(t_id);
+ return s_threadInfos.at(t_id);
+ }
+
+ static RecThreadInfo& info(unsigned int i)
+ {
+ return s_threadInfos.at(i);
+ }
+
+ static vector<RecThreadInfo>& infos()
+ {
+ return s_threadInfos;
}
bool isDistributor() const
setThreadName(threadPrefix + name);
recursorThread();
});
- sleep(1);
}
static unsigned int id()
t_id = id;
}
- /* FD corresponding to TCP sockets this thread is listening
- on.
- These FDs are also in deferredAdds when we have one
- socket per listener, and in g_deferredAdds instead. */
+ // FD corresponding to TCP sockets this thread is listening on.
+ // These FDs are also in deferredAdds when we have one socket per
+ // listener, and in g_deferredAdds instead.
std::set<int> tcpSockets;
- /* FD corresponding to listening sockets if we have one socket per
- listener (with reuseport), otherwise all listeners share the
- same FD and g_deferredAdds is then used instead */
+ // FD corresponding to listening sockets if we have one socket per
+ // listener (with reuseport), otherwise all listeners share the
+ // same FD and g_deferredAdds is then used instead
deferredAdd_t deferredAdds;
+
struct ThreadPipeSet pipes;
std::thread thread;
MT_t* mt{nullptr};
/* process queries */
bool worker{false};
static thread_local unsigned int t_id;
+ static std::vector<RecThreadInfo> s_threadInfos;
};
struct ThreadMSG