]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Allow worker threads to send tasks to handler thread
authorKevin P. Fleming <kevin@km6g.us>
Fri, 24 Sep 2021 20:52:03 +0000 (16:52 -0400)
committerKevin P. Fleming <kevin@km6g.us>
Thu, 11 Nov 2021 14:46:34 +0000 (09:46 -0500)
Extend the ThreadMSG mechanism to allow worker threads to submit
tasks to be executed by the handler thread (one-way only, no
answers can be returned).

pdns/pdns_recursor.cc

index edc77dbb1b87896736ab11252d5b8214c15ec74b..9f00634dcee9e31ebf12323516495d3b8c014647 100644 (file)
@@ -281,6 +281,12 @@ GlobalStateHolder<SuffixMatchNode> g_DoTToAuthNames;
 #define BAD_NETS   "0.0.0.0/8, 192.0.0.0/24, 192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24, 240.0.0.0/4, ::/96, ::ffff:0:0/96, 100::/64, 2001:db8::/32"
 #define DONT_QUERY LOCAL_NETS ", " BAD_NETS
 
+struct ThreadMSG
+{
+  pipefunc_t func;
+  bool wantAnswer;
+};
+
 //! used to send information to a newborn mthread
 struct DNSComboWriter {
   DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query)
@@ -3917,8 +3923,8 @@ static void makeThreadPipes()
     g_log<<Logger::Info<<"Resizing the buffer of the distribution pipe to "<<pipeBufferSize<<endl;
   }
 
-  /* thread 0 is the handler / SNMP, we start at 1 */
-  for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
+  /* thread 0 is the handler / SNMP, worker threads start at 1 */
+  for(unsigned int n = 0; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
     auto& threadInfos = s_threadInfos.at(n);
 
     int fd[2];
@@ -3928,6 +3934,11 @@ static void makeThreadPipes()
     threadInfos.pipes.readToThread = fd[0];
     threadInfos.pipes.writeToThread = fd[1];
 
+    // handler thread only gets first pipe, not the others
+    if(n==0) {
+      continue;
+    }
+
     if(pipe(fd) < 0)
       unixDie("Creating pipe for inter-thread communications");
 
@@ -3957,12 +3968,6 @@ static void makeThreadPipes()
   }
 }
 
-struct ThreadMSG
-{
-  pipefunc_t func;
-  bool wantAnswer;
-};
-
 void broadcastFunction(const pipefunc_t& func)
 {
   /* This function might be called by the worker with t_id 0 during startup
@@ -5596,7 +5601,9 @@ try
   t_fdm=getMultiplexer();
 
   RecursorWebServer *rws = nullptr;
-  
+
+  t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
+
   if(threadInfo.isHandler) {
     if(::arg().mustDo("webserver")) {
       g_log<<Logger::Warning << "Enabling web server" << endl;
@@ -5611,8 +5618,6 @@ try
     g_log<<Logger::Info<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
   }
   else {
-
-    t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
     t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
 
     if (threadInfo.isListener) {