From 592d7ade3824a43b20f3b6ad39692415edcbd508 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Thu, 17 Jan 2019 10:41:33 +0100 Subject: [PATCH] rec: Try another worker before failing if the first pipe was full When the recursor is configured to distribute incoming queries to its worker threads itself, and in the unlikely case that the pipe to the selected thread is full, randomly select another worker to handle the query. If this one fails too, we give up. --- pdns/pdns_recursor.cc | 50 ++++++++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index ca40f1bb92..643b218832 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -2806,17 +2806,8 @@ void broadcastFunction(const pipefunc_t& func) } } -// This function is only called by the distributor threads, when pdns-distributes-queries is set -void distributeAsyncFunction(const string& packet, const pipefunc_t& func) +static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) { - if (!isDistributorThread()) { - g_log<func = func; - tmsg->wantAnswer = false; ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg)); if (written > 0) { @@ -2837,13 +2825,45 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func) } else { int error = errno; - delete tmsg; if (error == EAGAIN || error == EWOULDBLOCK) { - g_stats.queryPipeFullDrops++; + return false; } else { + delete tmsg; unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error)); } } + + return true; +} + +// This function is only called by the distributor threads, when pdns-distributes-queries is set +void distributeAsyncFunction(const string& packet, const pipefunc_t& func) +{ + if (!isDistributorThread()) { + g_log<func = func; + tmsg->wantAnswer = false; + + if (!trySendingQueryToWorker(target, tmsg)) { + /* if this function failed but did not raise an exception, it means that the pipe + was full, let's try another one */ + unsigned int newTarget = 0; + do { + newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads); + } while (newTarget == target); + + if (!trySendingQueryToWorker(newTarget, tmsg)) { + g_stats.queryPipeFullDrops++; + delete tmsg; + } + } } static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var) -- 2.47.2