}
}
-// This function is only called by the distributor thread, when pdns-distributes-queries is set
-void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
{
- if (t_id != s_distributorThreadID) {
- L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
- exit(1);
- }
-
- unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
- unsigned int target = 1 + (hash % (g_pipes.size()-1));
-
if(target == static_cast<unsigned int>(s_distributorThreadID)) {
L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
exit(1);
}
ThreadPipeSet& tps = g_pipes[target];
- ThreadMSG* tmsg = new ThreadMSG();
- tmsg->func = func;
- tmsg->wantAnswer = false;
-
ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
if (written > 0) {
if (static_cast<size_t>(written) != sizeof(tmsg)) {
}
else {
int error = errno;
- delete tmsg;
if (error == EAGAIN || error == EWOULDBLOCK) {
- g_stats.queryPipeFullDrops++;
+ /* the pipe is full, sorry */
+ return false;
} else {
+ delete tmsg;
unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
}
}
+
+ return true;
+}
+
+// This function is only called by the distributor thread, when pdns-distributes-queries is set
+void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+{
+ if (t_id != s_distributorThreadID) {
+ L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+ }
+
+ unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
+ unsigned int target = 1 + (hash % (g_pipes.size()-1));
+
+ ThreadMSG* tmsg = new ThreadMSG();
+ tmsg->func = func;
+ tmsg->wantAnswer = false;
+
+ if (!trySendingQueryToWorker(target, tmsg)) {
+ /* if this function failed but did not raise an exception, it means that the pipe
+ was full, let's try another one */
+ unsigned int newTarget = 0;
+ do {
+ newTarget = 1 + dns_random(g_pipes.size()-1);
+ } while (newTarget == target);
+
+ if (!trySendingQueryToWorker(newTarget, tmsg)) {
+ g_stats.queryPipeFullDrops++;
+ delete tmsg;
+ }
+ }
}
static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)