]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Try another worker before failing if the first pipe was full 7377/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 16 Jan 2019 14:19:17 +0000 (15:19 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 16 Jan 2019 14:19:17 +0000 (15:19 +0100)
pdns/pdns_recursor.cc

index 9ca5461626e9edb835927efc280728923096253f..bc37565e81fef9c5b95768d9c89cf95064390229 100644 (file)
@@ -2310,27 +2310,14 @@ void broadcastFunction(const pipefunc_t& func)
   }
 }
 
-// This function is only called by the distributor thread, when pdns-distributes-queries is set
-void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 {
-  if (t_id != s_distributorThreadID) {
-    L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
-    exit(1);
-  }
-
-  unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
-  unsigned int target = 1 + (hash % (g_pipes.size()-1));
-
   if(target == static_cast<unsigned int>(s_distributorThreadID)) {
     L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
     exit(1);
   }
 
   ThreadPipeSet& tps = g_pipes[target];
-  ThreadMSG* tmsg = new ThreadMSG();
-  tmsg->func = func;
-  tmsg->wantAnswer = false;
-
   ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
   if (written > 0) {
     if (static_cast<size_t>(written) != sizeof(tmsg)) {
@@ -2340,13 +2327,46 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
   }
   else {
     int error = errno;
-    delete tmsg;
     if (error == EAGAIN || error == EWOULDBLOCK) {
-      g_stats.queryPipeFullDrops++;
+      /* the pipe is full, sorry */
+      return false;
     } else {
+      delete tmsg;
       unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
     }
   }
+
+  return true;
+}
+
+// This function is only called by the distributor thread, when pdns-distributes-queries is set
+void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+{
+  if (t_id != s_distributorThreadID) {
+    L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+    exit(1);
+  }
+
+  unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
+  unsigned int target = 1 + (hash % (g_pipes.size()-1));
+
+  ThreadMSG* tmsg = new ThreadMSG();
+  tmsg->func = func;
+  tmsg->wantAnswer = false;
+
+  if (!trySendingQueryToWorker(target, tmsg)) {
+    /* if this function failed but did not raise an exception, it means that the pipe
+       was full, let's try another one */
+    unsigned int newTarget = 0;
+    do {
+      newTarget = 1 + dns_random(g_pipes.size()-1);
+    } while (newTarget == target);
+
+    if (!trySendingQueryToWorker(newTarget, tmsg)) {
+      g_stats.queryPipeFullDrops++;
+      delete tmsg;
+    }
+  }
 }
 
 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)