]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Implement the AsynchronousHolder with the new channel feature
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 23 Jun 2023 13:01:05 +0000 (15:01 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 23 Jun 2023 13:08:27 +0000 (15:08 +0200)
pdns/channel.hh
pdns/dnsdistdist/dnsdist-async.cc
pdns/dnsdistdist/dnsdist-async.hh

index 7beaa198d190cbeea7a8d151f7e6461f00f5a9ab..6947d3b26d4b6c083a1dc20504e51d7611e6b48b 100644 (file)
@@ -52,9 +52,7 @@ namespace channel
   class Sender
   {
   public:
-    Sender()
-    {
-    }
+    Sender() = default;
     Sender(FDWrapper&& descriptor) :
       d_fd(std::move(descriptor))
     {
@@ -63,6 +61,7 @@ namespace channel
     Sender& operator=(const Sender&) = delete;
     Sender(Sender&&) = default;
     Sender& operator=(Sender&&) = default;
+    ~Sender() = default;
     /**
      * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode.
      *
@@ -86,9 +85,7 @@ namespace channel
   class Receiver
   {
   public:
-    Receiver()
-    {
-    }
+    Receiver() = default;
     Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) :
       d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF)
     {
@@ -97,6 +94,7 @@ namespace channel
     Receiver& operator=(const Receiver&) = delete;
     Receiver(Receiver&&) = default;
     Receiver& operator=(Receiver&&) = default;
+    ~Receiver() = default;
     /**
      * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode.
      *
@@ -148,14 +146,13 @@ namespace channel
   class Notifier
   {
   public:
-    Notifier()
-    {
-    }
+    Notifier() = default;
     Notifier(FDWrapper&&);
     Notifier(const Notifier&) = delete;
     Notifier& operator=(const Notifier&) = delete;
     Notifier(Notifier&&) = default;
     Notifier& operator=(Notifier&&) = default;
+    ~Notifier() = default;
 
     /**
      * \brief Queue a notification to wake up the other end of the channel.
@@ -178,11 +175,13 @@ namespace channel
   class Waiter
   {
   public:
+    Waiter() = default;
     Waiter(FDWrapper&&, bool throwOnEOF = true);
     Waiter(const Waiter&) = delete;
     Waiter& operator=(const Waiter&) = delete;
     Waiter(Waiter&&) = default;
     Waiter& operator=(Waiter&&) = default;
+    ~Waiter() = default;
 
     /**
      * \brief Clear all notifications queued on that channel, if any.
index e1acef87a4392b99f5e487a7d612c04292314649..8c26b88449e16806334871c00cd7145a17cd93ac 100644 (file)
 namespace dnsdist
 {
 
-AsynchronousHolder::AsynchronousHolder(bool failOpen) :
-  d_data(std::make_shared<Data>())
+AsynchronousHolder::Data::Data(bool failOpen) :
+  d_failOpen(failOpen)
 {
-  d_data->d_failOpen = failOpen;
-
-  int fds[2] = {-1, -1};
-  if (pipe(fds) < 0) {
-    throw std::runtime_error("Error creating the AsynchronousHolder pipe: " + stringerror());
-  }
-
-  for (size_t idx = 0; idx < (sizeof(fds) / sizeof(*fds)); idx++) {
-    if (!setNonBlocking(fds[idx])) {
-      int err = errno;
-      close(fds[0]);
-      close(fds[1]);
-      throw std::runtime_error("Error setting the AsynchronousHolder pipe non-blocking: " + stringerror(err));
-    }
-  }
-
-  d_data->d_notifyPipe = FDWrapper(fds[1]);
-  d_data->d_watchPipe = FDWrapper(fds[0]);
+  auto [notifier, waiter] = pdns::channel::createNotificationQueue(true);
+  d_waiter = std::move(waiter);
+  d_notifier = std::move(notifier);
+}
 
+AsynchronousHolder::AsynchronousHolder(bool failOpen) :
+  d_data(std::make_shared<Data>(failOpen))
+{
   std::thread main([data = this->d_data] { mainThread(data); });
   main.detach();
 }
@@ -64,25 +53,10 @@ AsynchronousHolder::~AsynchronousHolder()
 
 bool AsynchronousHolder::notify() const
 {
-  const char data = 0;
-  bool failed = false;
-  do {
-    auto written = write(d_data->d_notifyPipe.getHandle(), &data, sizeof(data));
-    if (written == 0) {
-      break;
-    }
-    if (written > 0 && static_cast<size_t>(written) == sizeof(data)) {
-      return true;
-    }
-    if (errno != EINTR) {
-      failed = true;
-    }
-  } while (!failed);
-
-  return false;
+  return d_data->d_notifier.notify();
 }
 
-bool AsynchronousHolder::wait(const AsynchronousHolder::Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs)
+bool AsynchronousHolder::wait(AsynchronousHolder::Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs)
 {
   readyFDs.clear();
   mplexer.getAvailableFDs(readyFDs, atMostMs);
@@ -91,22 +65,7 @@ bool AsynchronousHolder::wait(const AsynchronousHolder::Data& data, FDMultiplexe
     return true;
   }
 
-  while (true) {
-    /* we might have been notified several times, let's read
-       as much as possible before returning */
-    char dummy = 0;
-    auto got = read(data.d_watchPipe.getHandle(), &dummy, sizeof(dummy));
-    if (got == 0) {
-      break;
-    }
-    if (got > 0 && static_cast<size_t>(got) != sizeof(dummy)) {
-      continue;
-    }
-    if (got == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      break;
-    }
-  }
-
+  data.d_waiter.clear();
   return false;
 }
 
@@ -127,7 +86,7 @@ void AsynchronousHolder::mainThread(std::shared_ptr<Data> data)
   std::list<std::pair<uint16_t, std::unique_ptr<CrossProtocolQuery>>> expiredEvents;
 
   auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(1));
-  mplexer->addReadFD(data->d_watchPipe.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
+  mplexer->addReadFD(data->d_waiter.getDescriptor(), [](int, FDMultiplexer::funcparam_t&) {});
   std::vector<int> readyFDs;
 
   while (true) {
index 5a8c0908f59c20282b3bee1798e552fa2683ef5b..f6a0a97f0043fe453f855e82fdb773fae915cddf 100644 (file)
@@ -27,6 +27,7 @@
 #include <boost/multi_index/ordered_index.hpp>
 #include <boost/multi_index/key_extractors.hpp>
 
+#include "channel.hh"
 #include "dnsdist-tcp.hh"
 
 namespace dnsdist
@@ -75,16 +76,23 @@ private:
 
   struct Data
   {
+    Data(bool failOpen);
+    Data(const Data&) = delete;
+    Data(Data&&) = delete;
+    Data& operator=(const Data&) = delete;
+    Data& operator=(Data&&) = delete;
+    ~Data() = default;
+
     LockGuarded<content_t> d_content;
-    FDWrapper d_notifyPipe;
-    FDWrapper d_watchPipe;
+    pdns::channel::Notifier d_notifier;
+    pdns::channel::Waiter d_waiter;
     bool d_failOpen{true};
     bool d_done{false};
   };
   std::shared_ptr<Data> d_data{nullptr};
 
   static void mainThread(std::shared_ptr<Data> data);
-  static bool wait(const Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs);
+  static bool wait(Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs);
   bool notify() const;
 };