]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Move the DelayPipe to pdns::channel
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 21 Oct 2022 18:12:01 +0000 (20:12 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 13 Jun 2023 07:59:38 +0000 (09:59 +0200)
pdns/channel.cc
pdns/channel.hh
pdns/delaypipe.cc
pdns/delaypipe.hh
pdns/dnsdist-tcp.cc
pdns/dnsdistdist/dnsdist-nghttp2.cc
pdns/dnsdistdist/doh.cc
pdns/dnsdistdist/test-delaypipe_hh.cc
pdns/misc.hh
pdns/snmp-agent.cc

index dd25c6d34eeef1a21502659733cb23fa134292a6..bebbdf4380b4aff48e041b05a75eed7eafa4ad2e 100644 (file)
@@ -35,20 +35,25 @@ namespace channel
   bool Notifier::notify() const
   {
     char data = 'a';
-    auto sent = write(d_fd.getHandle(), &data, sizeof(data));
-    if (sent != sizeof(data)) {
-      if (errno == EAGAIN || errno == EWOULDBLOCK) {
-        return false;
-      }
-      else {
-        throw std::runtime_error("Unable to write to channel notifier pipe: " + stringerror());
+    while (true) {
+      auto sent = write(d_fd.getHandle(), &data, sizeof(data));
+      if (sent != sizeof(data)) {
+        if (errno == EINTR) {
+          continue;
+        }
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          return false;
+        }
+        else {
+          throw std::runtime_error("Unable to write to channel notifier pipe: " + stringerror());
+        }
       }
+      return true;
     }
-    return true;
   }
 
-  Waiter::Waiter(FDWrapper&& fd) :
-    d_fd(std::move(fd))
+  Waiter::Waiter(FDWrapper&& fd, bool throwOnEOF) :
+    d_fd(std::move(fd)), d_throwOnEOF(throwOnEOF)
   {
   }
 
@@ -59,9 +64,15 @@ namespace channel
       char data;
       got = read(d_fd.getHandle(), &data, sizeof(data));
       if (got == 0) {
+        if (!d_throwOnEOF) {
+          return;
+        }
         throw std::runtime_error("EOF while clearing channel notifier pipe");
       }
       else if (got == -1) {
+        if (errno == EINTR) {
+          continue;
+        }
         if (errno == EAGAIN || errno == EWOULDBLOCK) {
           break;
         }
@@ -75,7 +86,7 @@ namespace channel
     return d_fd.getHandle();
   }
 
-  std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking, size_t pipeBufferSize)
+  std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking, size_t pipeBufferSize, bool throwOnEOF)
   {
     int fds[2] = {-1, -1};
     if (pipe(fds) < 0) {
@@ -99,7 +110,7 @@ namespace channel
       setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
     }
 
-    return std::pair(Notifier(std::move(sender)), Waiter(std::move(receiver)));
+    return std::pair(Notifier(std::move(sender)), Waiter(std::move(receiver), throwOnEOF));
   }
 }
 }
index 5c352870ed6051363ad53d86432833b0707bff66..fd82ca1970339d232f746ff8e49834699d969751 100644 (file)
@@ -57,6 +57,7 @@ namespace channel
      * \throw runtime_error if the channel is broken, for example if the other end has been closed.
      */
     bool send(std::unique_ptr<T, D>&&) const;
+    void close();
 
   private:
     FDWrapper d_fd;
@@ -74,8 +75,8 @@ namespace channel
     Receiver()
     {
     }
-    Receiver(FDWrapper&& fd) :
-      d_fd(std::move(fd))
+    Receiver(FDWrapper&& fd, bool throwOnEOF = true) :
+      d_fd(std::move(fd)), d_throwOnEOF(throwOnEOF)
     {
     }
     Receiver(const Receiver&) = delete;
@@ -89,8 +90,8 @@ namespace channel
      *
      * \throw runtime_error if the channel is broken, for example if the other end has been closed.
      */
-    std::optional<std::unique_ptr<T, D>> receive() const;
-    std::optional<std::unique_ptr<T, D>> receive(D deleter) const;
+    std::optional<std::unique_ptr<T, D>> receive();
+    std::optional<std::unique_ptr<T, D>> receive(D deleter);
 
     /**
      * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available.
@@ -101,9 +102,18 @@ namespace channel
     {
       return d_fd.getHandle();
     }
+    /**
+     * \brief Whether the remote end has closed the channel.
+     */
+    bool isClosed() const
+    {
+      return d_closed;
+    }
 
   private:
     FDWrapper d_fd;
+    bool d_closed{false};
+    bool d_throwOnEOF{true};
   };
 
   /**
@@ -114,7 +124,7 @@ namespace channel
    * \throw runtime_error if the channel creation failed.
    */
   template <typename T, typename D = std::default_delete<T>>
-  std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool nonBlocking = true, size_t pipeBufferSize = 0);
+  std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking = true, bool writeNonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
 
   /**
    * The notifier's end of a channel used to communicate between threads.
@@ -154,7 +164,7 @@ namespace channel
   class Waiter
   {
   public:
-    Waiter(FDWrapper&&);
+    Waiter(FDWrapper&&, bool throwOnEOF = true);
     Waiter(const Waiter&) = delete;
     Waiter& operator=(const Waiter&) = delete;
     Waiter(Waiter&&) = default;
@@ -170,9 +180,18 @@ namespace channel
      * \return A valid descriptor or -1 if the Waiter was not properly initialized.
      */
     int getDescriptor() const;
+    /**
+     * \brief Whether the remote end has closed the channel.
+     */
+    bool isClosed() const
+    {
+      return d_closed;
+    }
 
   private:
     FDWrapper d_fd;
+    bool d_closed{false};
+    bool d_throwOnEOF{true};
   };
 
   /**
@@ -182,7 +201,7 @@ namespace channel
    *
    * \throw runtime_error if the channel creation failed.
    */
-  std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0);
+  std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
 
   template <typename T, typename D>
   bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const
@@ -190,72 +209,74 @@ namespace channel
     // we do not release right away because we might need the custom deleter later
     auto ptr = object.get();
     static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail");
-    ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
+    while (true) {
+      ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
 
-    if (sent == sizeof(ptr)) {
-      // we cannot touch it anymore
-      object.release();
-    }
-    else {
-      if (errno == EAGAIN || errno == EWOULDBLOCK) {
-        return false;
+      if (sent == sizeof(ptr)) {
+        // we cannot touch it anymore
+        object.release();
+        return true;
       }
       else {
-        throw std::runtime_error("Unable to write to channel:" + stringerror());
+        if (errno == EINTR) {
+          continue;
+        }
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          return false;
+        }
+        else {
+          throw std::runtime_error("Unable to write to channel:" + stringerror());
+        }
       }
     }
+  }
 
-    return true;
+  template <typename T, typename D>
+  void Sender<T, D>::close()
+  {
+    d_fd.reset();
   }
 
   template <typename T, typename D>
-  std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive() const
+  std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive()
   {
-    std::optional<std::unique_ptr<T, D>> result;
-    T* obj{nullptr};
-    ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj));
-    if (got == sizeof(obj)) {
-      return std::unique_ptr<T, D>(obj);
-    }
-    else if (got == 0) {
-      throw std::runtime_error("EOF while reading from Channel receiver");
-    }
-    else if (got == -1) {
-      if (errno == EAGAIN || errno == EINTR) {
-        return result;
-      }
-      throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
-    }
-    else {
-      throw std::runtime_error("Partial read from Channel receiver");
-    }
+    return receive(std::default_delete<T>());
   }
 
   template <typename T, typename D>
-  std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter) const
+  std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter)
   {
-    std::optional<std::unique_ptr<T, D>> result;
-    T* obj{nullptr};
-    ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj));
-    if (got == sizeof(obj)) {
-      return std::unique_ptr<T, D>(obj, deleter);
-    }
-    else if (got == 0) {
-      throw std::runtime_error("EOF while reading from Channel receiver");
-    }
-    else if (got == -1) {
-      if (errno == EAGAIN || errno == EINTR) {
-        return result;
+    while (true) {
+      std::optional<std::unique_ptr<T, D>> result;
+      T* obj{nullptr};
+      ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj));
+      if (got == sizeof(obj)) {
+        return std::unique_ptr<T, D>(obj, deleter);
+      }
+      else if (got == 0) {
+        d_closed = true;
+        if (!d_throwOnEOF) {
+          return result;
+        }
+        throw std::runtime_error("EOF while reading from Channel receiver");
+      }
+      else if (got == -1) {
+        if (errno == EINTR) {
+          continue;
+        }
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          return result;
+        }
+        throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
+      }
+      else {
+        throw std::runtime_error("Partial read from Channel receiver");
       }
-      throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
-    }
-    else {
-      throw std::runtime_error("Partial read from Channel receiver");
     }
   }
 
   template <typename T, typename D>
-  std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool nonBlocking, size_t pipeBufferSize)
+  std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking, bool receiveNonBlocking, size_t pipeBufferSize, bool throwOnEOF)
   {
     int fds[2] = {-1, -1};
     if (pipe(fds) < 0) {
@@ -265,12 +286,12 @@ namespace channel
     FDWrapper sender(fds[1]);
     FDWrapper receiver(fds[0]);
 
-    if (nonBlocking && !setNonBlocking(receiver.getHandle())) {
+    if (receiveNonBlocking && !setNonBlocking(receiver.getHandle())) {
       int err = errno;
       throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
     }
 
-    if (nonBlocking && !setNonBlocking(sender.getHandle())) {
+    if (sendNonBlocking && !setNonBlocking(sender.getHandle())) {
       int err = errno;
       throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
     }
@@ -279,7 +300,7 @@ namespace channel
       setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
     }
 
-    return std::pair(Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver)));
+    return std::pair(Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF));
   }
 }
 }
index be363d639191c4851038304aa94eefa2e1d6c33d..645bcf56109f4c8d545a2a693d01d87ca06762a9 100644 (file)
 template<class T>
 ObjectPipe<T>::ObjectPipe()
 {
-  if(pipe(d_fds))
-    unixDie("pipe");
-}
-
-template<class T>
-ObjectPipe<T>::~ObjectPipe()
-{
-  ::close(d_fds[0]);
-  if(d_fds[1] >= 0)
-    ::close(d_fds[1]);
+  auto [sender, receiver] = pdns::channel::createObjectQueue<T>(false, true, 0, false);
+  d_sender = std::move(sender);
+  d_receiver = std::move(receiver);
 }
 
 template<class T>
 void ObjectPipe<T>::close()
 {
-  if(d_fds[1] < 0)
-    return;
-  ::close(d_fds[1]); // the writing side
-  d_fds[1]=-1;
+  d_sender.close();
 }
 
 template<class T>
 void ObjectPipe<T>::write(T& t)
 {
-  auto ptr = new T(t);
-  if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) {
-    delete ptr;
-    unixDie("write");
+  auto ptr = std::make_unique<T>(t);
+  if (!d_sender.send(std::move(ptr))) {
+    unixDie("writing to the DelayPipe");
   }
 }
 
@@ -63,7 +52,7 @@ template<class T>
 int ObjectPipe<T>::readTimeout(T* t, double msec)
 {
   while (true) {
-    int ret = waitForData(d_fds[0], 0, 1000*msec);
+    int ret = waitForData(d_receiver.getDescriptor(), 0, 1000*msec);
     if (ret < 0) {
       if (errno == EINTR) {
         continue;
@@ -74,26 +63,21 @@ int ObjectPipe<T>::readTimeout(T* t, double msec)
       return -1;
     }
 
-    T* ptr = nullptr;
-    ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING!
-
-    if (ret < 0) {
-      if (errno == EINTR) {
+    try {
+      auto tmp = d_receiver.receive();
+      if (!tmp) {
+        if (d_receiver.isClosed()) {
+          return 0;
+        }
         continue;
       }
-      unixDie("read");
-    }
-    else if (ret == 0) {
-      return false;
-    }
 
-    if (ret != sizeof(ptr)) {
-      throw std::runtime_error("Partial read, should not happen 2");
+      *t = **tmp;
+      return 1;
+    }
+    catch (const std::exception& e) {
+      throw std::runtime_error("reading from the delay pipe: " + std::string(e.what()));
     }
-
-    *t = *ptr;
-    delete ptr;
-    return 1;
   }
 }
 
@@ -149,7 +133,7 @@ void DelayPipe<T>::worker()
        The other special case is that the first we have to do.. is in the past, so we need to do it
        immediately. */
 
-       
+
     double delay=-1;  // infinite
     struct timespec now;
     if(!d_work.empty()) {
@@ -160,7 +144,7 @@ void DelayPipe<T>::worker()
       }
     }
     if(delay != 0 ) {
-      int ret = d_pipe.readTimeout(&c, delay); 
+      int ret = d_pipe.readTimeout(&c, delay);
       if(ret > 0) {  // we got an object
         d_work.emplace(c.when, c.what);
       }
index ad1626a9990eca6727aa5b3794a0e132089f7893..b12fd50eac1bb0d6fe49ad6164d6d377d7a84c9b 100644 (file)
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 #pragma once
-#include <map>
 #include <time.h>
 #include <thread>
 
+#include "channel.hh"
+
 /**
    General idea: many threads submit work to this class, but only one executes it. The work should therefore be entirely trivial.
    The implementation is that submitter threads create an object that represents the work, and it gets sent over a pipe 
@@ -41,12 +42,12 @@ class ObjectPipe
 {
 public:
   ObjectPipe();
-  ~ObjectPipe();
   void write(T& t);
   int readTimeout(T* t, double msec); //!< -1 is timeout, 0 is no data, 1 is data. msec<0 waits infinitely long. msec==0 = undefined
   void close(); 
 private:
-  int d_fds[2];
+  pdns::channel::Sender<T> d_sender;
+  pdns::channel::Receiver<T> d_receiver;
 };
 
 template<class T>
index 17309a4f664277f0d71588dd36e723b1d0ed65ad..b72f0e0f5f9ef11876ddba06d5c8f9fe88701a7a 100644 (file)
@@ -130,11 +130,11 @@ TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector<ClientSt
 void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
 {
   try {
-    auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(true, g_tcpInternalPipeBufferSize);
+    auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(true, true, g_tcpInternalPipeBufferSize);
 
-    auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(true, g_tcpInternalPipeBufferSize);
+    auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(true, true, g_tcpInternalPipeBufferSize);
 
-    auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(true, g_tcpInternalPipeBufferSize);
+    auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(true, true, g_tcpInternalPipeBufferSize);
 
     vinfolog("Adding TCP Client thread");
 
index 34e8abe5eb43dc7bfe300320795d8a783b5420e3..52403b9b048f74753741708079ae15d1d7544420 100644 (file)
@@ -1015,7 +1015,7 @@ void DoHClientCollection::addThread()
 {
 #ifdef HAVE_NGHTTP2
   try {
-    auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(true, g_tcpInternalPipeBufferSize);
+    auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(true, true, g_tcpInternalPipeBufferSize);
 
     vinfolog("Adding DoH Client thread");
     std::lock_guard<std::mutex> lock(d_mutex);
index 9feca652187628e6254d9a81e99de31aca51c010..57da686ee53f6481a4cc41c98e99262a2ba9cbdb 100644 (file)
@@ -174,14 +174,14 @@ struct DOHServerConfig
   {
 #ifndef USE_SINGLE_ACCEPTOR_THREAD
     {
-      auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, internalPipeBufferSize);
+      auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, true, internalPipeBufferSize);
       d_querySender = std::move(sender);
       d_queryReceiver = std::move(receiver);
     }
 #endif /* USE_SINGLE_ACCEPTOR_THREAD */
 
     {
-      auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, internalPipeBufferSize);
+      auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, true, internalPipeBufferSize);
       d_responseSender = std::move(sender);
       d_responseReceiver = std::move(receiver);
     }
index 9e678ee40e47d87fec4504f73ce801f673ea8a3d..41040a906f3334d5ff3cd71128395a6768da6bb9 100644 (file)
@@ -22,8 +22,7 @@ BOOST_AUTO_TEST_CASE(test_object_pipe) {
 
   op.close();
   BOOST_CHECK_EQUAL(op.readTimeout(&i, 1), 0);
-
-};
+}
 
 std::atomic<int> done = 0;
 BOOST_AUTO_TEST_CASE(test_delay_pipe_small) {
@@ -53,9 +52,9 @@ BOOST_AUTO_TEST_CASE(test_delay_pipe_small) {
   sleep(1);
   BOOST_CHECK_EQUAL(done, n);
 
-};
+}
 
-BOOST_AUTO_TEST_CASE(test_delay_pipe_big) {  
+BOOST_AUTO_TEST_CASE(test_delay_pipe_big) {
   done=0;
   struct Work
   {
@@ -74,7 +73,6 @@ BOOST_AUTO_TEST_CASE(test_delay_pipe_big) {
 
   sleep(1);
   BOOST_CHECK_EQUAL(done, n);
-};
-
+}
 
 BOOST_AUTO_TEST_SUITE_END();
index 5757cccc5c27f589aac3ca06744ff474c5c290c7..1a40a14d2bcfb7ff40bfa4b6c7138f0647be55b2 100644 (file)
@@ -794,10 +794,7 @@ struct FDWrapper
 
   ~FDWrapper()
   {
-    if (d_fd != -1) {
-      close(d_fd);
-      d_fd = -1;
-    }
+    reset();
   }
 
   FDWrapper(FDWrapper&& rhs) noexcept : d_fd(rhs.d_fd)
@@ -825,6 +822,14 @@ struct FDWrapper
     return d_fd;
   }
 
+  void reset()
+  {
+    if (d_fd != -1) {
+      ::close(d_fd);
+      d_fd = -1;
+    }
+  }
+
 private:
   int d_fd{-1};
 };
index b70a5b280dc4ebea81b4c722616e6c920e68ba26..1ab0bd938fe4896e35e9985f03c5e4484e8081c5 100644 (file)
@@ -186,7 +186,7 @@ SNMPAgent::SNMPAgent(const std::string& name, const std::string& daemonSocket)
 
   init_snmp(name.c_str());
 
-  auto [sender, receiver] = pdns::channel::createObjectQueue<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>(true);
+  auto [sender, receiver] = pdns::channel::createObjectQueue<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>(true, true);
   d_sender = std::move(sender);
   d_receiver = std::move(receiver);
 #endif /* HAVE_NET_SNMP */