unsigned int RecThreadInfo::s_numWorkerThreads;
thread_local unsigned int RecThreadInfo::t_id;
-static std::map<unsigned int, std::set<int>> parseCPUMap()
+static std::map<unsigned int, std::set<int>> parseCPUMap(std::shared_ptr<Logr::Logger>& log)
{
std::map<unsigned int, std::set<int>> result;
const std::string value = ::arg()["cpu-map"];
if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
- g_log << Logger::Warning << "CPU mapping requested but not supported, skipping" << endl;
+ SLOG(g_log << Logger::Warning << "CPU mapping requested but not supported, skipping" << endl,
+ log->info(Logr::Warning, "CPU mapping requested but not supported, skipping"));
return result;
}
}
}
catch (const std::exception& e) {
- g_log << Logger::Error << "Error parsing cpu-map entry '" << part << "': " << e.what() << endl;
+ SLOG(g_log << Logger::Error << "Error parsing cpu-map entry '" << part << "': " << e.what() << endl,
+ log->error(Logr::Error, e.what(), "Error parsing cpu-map entry", "entry", Logging::Loggable(part)));
}
}
return result;
}
-static void setCPUMap(const std::map<unsigned int, std::set<int>>& cpusMap, unsigned int n, pthread_t tid)
+static void setCPUMap(const std::map<unsigned int, std::set<int>>& cpusMap, unsigned int n, pthread_t tid, std::shared_ptr<Logr::Logger>& log)
{
const auto& cpuMapping = cpusMap.find(n);
if (cpuMapping == cpusMap.cend()) {
}
int rc = mapThreadToCPUList(tid, cpuMapping->second);
if (rc == 0) {
- g_log << Logger::Info << "CPU affinity for thread " << n << " has been set to CPU map:";
- for (const auto cpu : cpuMapping->second) {
- g_log << Logger::Info << " " << cpu;
+ if (!g_slogStructured) {
+ g_log << Logger::Info << "CPU affinity for thread " << n << " has been set to CPU map:";
+ for (const auto cpu : cpuMapping->second) {
+ g_log << Logger::Info << " " << cpu;
+ }
+ g_log << Logger::Info << endl;
+ }
+ else {
+ log->info(Logr::Info, "CPU affinity has been set", "thread", Logging::Loggable(n), "cpumap", Logging::Loggable(cpuMapping->second));
}
- g_log << Logger::Info << endl;
}
else {
- g_log << Logger::Warning << "Error setting CPU affinity for thread " << n << " to CPU map:";
- for (const auto cpu : cpuMapping->second) {
- g_log << Logger::Info << " " << cpu;
+ if (!g_slogStructured) {
+ g_log << Logger::Warning << "Error setting CPU affinity for thread " << n << " to CPU map:";
+ for (const auto cpu : cpuMapping->second) {
+ g_log << Logger::Info << " " << cpu;
+ }
+ g_log << Logger::Info << ' ' << strerror(rc) << endl;
+ }
+ else {
+ log->info(Logr::Warning, "Error setting CPU affinity", "thread", Logging::Loggable(n), "cpumap", Logging::Loggable(cpuMapping->second));
}
- g_log << Logger::Info << ' ' << strerror(rc) << endl;
}
}
static void recursorThread();
-void RecThreadInfo::start(unsigned int id, const string& tname, const std::map<unsigned int, std::set<int>>& cpusMap)
+void RecThreadInfo::start(unsigned int id, const string& tname, const std::map<unsigned int, std::set<int>>& cpusMap, std::shared_ptr<Logr::Logger>& log)
{
name = tname;
thread = std::thread([id, tname] {
setThreadName(threadPrefix + tname);
recursorThread();
});
- setCPUMap(cpusMap, id, thread.native_handle());
+ setCPUMap(cpusMap, id, thread.native_handle(), log);
}
int RecThreadInfo::runThreads(std::shared_ptr<Logr::Logger>& log)
{
int ret = EXIT_SUCCESS;
unsigned int currentThreadId = 1;
- const auto cpusMap = parseCPUMap();
+ const auto cpusMap = parseCPUMap(log);
if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
SLOG(g_log << Logger::Warning << "Operating with single distributor/worker thread" << endl,
/* This thread handles the web server, carbon, statistics and the control channel */
auto& handlerInfo = RecThreadInfo::info(0);
handlerInfo.setHandler();
- handlerInfo.start(0, "web+stat", cpusMap);
+ handlerInfo.start(0, "web+stat", cpusMap, log);
auto& taskInfo = RecThreadInfo::info(2);
taskInfo.setTaskThread();
- taskInfo.start(2, "taskThread", cpusMap);
+ taskInfo.start(2, "taskThread", cpusMap, log);
auto& info = RecThreadInfo::info(currentThreadId);
info.setListener();
log->info(Logr::Notice, "Launching distributor threads", "count", Logging::Loggable(RecThreadInfo::numDistributors())));
for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "distr", cpusMap);
+ info.start(currentThreadId++, "distr", cpusMap, log);
}
}
SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl,
for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "worker", cpusMap);
+ info.start(currentThreadId++, "worker", cpusMap, log);
}
for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "taskThread", cpusMap);
+ info.start(currentThreadId++, "taskThread", cpusMap, log);
}
/* This thread handles the web server, carbon, statistics and the control channel */
auto& info = RecThreadInfo::info(0);
info.setHandler();
- info.start(0, "web+stat", cpusMap);
+ info.start(0, "web+stat", cpusMap, log);
for (auto& ti : RecThreadInfo::infos()) {
ti.thread.join();
return ret;
}
-void RecThreadInfo::makeThreadPipes()
+void RecThreadInfo::makeThreadPipes(std::shared_ptr<Logr::Logger>& log)
{
auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
if (pipeBufferSize > 0) {
- g_log << Logger::Info << "Resizing the buffer of the distribution pipe to " << pipeBufferSize << endl;
+ SLOG(g_log << Logger::Info << "Resizing the buffer of the distribution pipe to " << pipeBufferSize << endl,
+ log->info(Logr::Info, "Resizing the buffer of the distribution pipe", "size", Logging::Loggable(pipeBufferSize)));
}
/* thread 0 is the handler / SNMP, worker threads start at 1 */
if (pipeBufferSize > 0) {
if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) {
int err = errno;
- g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl;
+ SLOG(g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl,
+ log->error(Logr::Warning, err, "Error resizing the buffer of the distribution pipe for thread", "thread", Logging::Loggable(n), "size", Logging::Loggable(pipeBufferSize)));
auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread);
if (existingSize > 0) {
- g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl;
+ SLOG(g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl,
+ log->info(Logr::Warning, "The current size of the distribution pipe's buffer for thread", "thread", Logging::Loggable(n), "size", Logging::Loggable(existingSize)));
}
}
}
startLuaConfigDelayedThreads(delayedLuaThreads, g_luaconfs.getCopy().generation);
delayedLuaThreads.rpzPrimaryThreads.clear(); // no longer needed
- RecThreadInfo::makeThreadPipes();
+ RecThreadInfo::makeThreadPipes(log);
g_tcpTimeout = ::arg().asNum("client-tcp-timeout");
g_maxTCPPerClient = ::arg().asNum("max-tcp-per-client");