From 5841c6fdc698cc7012ae428c4643f9426d1392cc Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Fri, 21 Oct 2022 20:12:01 +0200 Subject: [PATCH] dnsdist: Move the DelayPipe to pdns::channel --- pdns/channel.cc | 35 ++++--- pdns/channel.hh | 135 +++++++++++++++----------- pdns/delaypipe.cc | 58 ++++------- pdns/delaypipe.hh | 7 +- pdns/dnsdist-tcp.cc | 6 +- pdns/dnsdistdist/dnsdist-nghttp2.cc | 2 +- pdns/dnsdistdist/doh.cc | 4 +- pdns/dnsdistdist/test-delaypipe_hh.cc | 10 +- pdns/misc.hh | 13 ++- pdns/snmp-agent.cc | 2 +- 10 files changed, 146 insertions(+), 126 deletions(-) diff --git a/pdns/channel.cc b/pdns/channel.cc index dd25c6d34e..bebbdf4380 100644 --- a/pdns/channel.cc +++ b/pdns/channel.cc @@ -35,20 +35,25 @@ namespace channel bool Notifier::notify() const { char data = 'a'; - auto sent = write(d_fd.getHandle(), &data, sizeof(data)); - if (sent != sizeof(data)) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return false; - } - else { - throw std::runtime_error("Unable to write to channel notifier pipe: " + stringerror()); + while (true) { + auto sent = write(d_fd.getHandle(), &data, sizeof(data)); + if (sent != sizeof(data)) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return false; + } + else { + throw std::runtime_error("Unable to write to channel notifier pipe: " + stringerror()); + } } + return true; } - return true; } - Waiter::Waiter(FDWrapper&& fd) : - d_fd(std::move(fd)) + Waiter::Waiter(FDWrapper&& fd, bool throwOnEOF) : + d_fd(std::move(fd)), d_throwOnEOF(throwOnEOF) { } @@ -59,9 +64,15 @@ namespace channel char data; got = read(d_fd.getHandle(), &data, sizeof(data)); if (got == 0) { + if (!d_throwOnEOF) { + return; + } throw std::runtime_error("EOF while clearing channel notifier pipe"); } else if (got == -1) { + if (errno == EINTR) { + continue; + } if (errno == EAGAIN || errno == EWOULDBLOCK) { break; } @@ -75,7 +86,7 @@ namespace channel return d_fd.getHandle(); } - std::pair createNotificationQueue(bool nonBlocking, size_t pipeBufferSize) + std::pair createNotificationQueue(bool nonBlocking, size_t pipeBufferSize, bool throwOnEOF) { int fds[2] = {-1, -1}; if (pipe(fds) < 0) { @@ -99,7 +110,7 @@ namespace channel setPipeBufferSize(receiver.getHandle(), pipeBufferSize); } - return std::pair(Notifier(std::move(sender)), Waiter(std::move(receiver))); + return std::pair(Notifier(std::move(sender)), Waiter(std::move(receiver), throwOnEOF)); } } } diff --git a/pdns/channel.hh b/pdns/channel.hh index 5c352870ed..fd82ca1970 100644 --- a/pdns/channel.hh +++ b/pdns/channel.hh @@ -57,6 +57,7 @@ namespace channel * \throw runtime_error if the channel is broken, for example if the other end has been closed. */ bool send(std::unique_ptr&&) const; + void close(); private: FDWrapper d_fd; @@ -74,8 +75,8 @@ namespace channel Receiver() { } - Receiver(FDWrapper&& fd) : - d_fd(std::move(fd)) + Receiver(FDWrapper&& fd, bool throwOnEOF = true) : + d_fd(std::move(fd)), d_throwOnEOF(throwOnEOF) { } Receiver(const Receiver&) = delete; @@ -89,8 +90,8 @@ namespace channel * * \throw runtime_error if the channel is broken, for example if the other end has been closed. */ - std::optional> receive() const; - std::optional> receive(D deleter) const; + std::optional> receive(); + std::optional> receive(D deleter); /** * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available. @@ -101,9 +102,18 @@ namespace channel { return d_fd.getHandle(); } + /** + * \brief Whether the remote end has closed the channel. + */ + bool isClosed() const + { + return d_closed; + } private: FDWrapper d_fd; + bool d_closed{false}; + bool d_throwOnEOF{true}; }; /** @@ -114,7 +124,7 @@ namespace channel * \throw runtime_error if the channel creation failed. */ template > - std::pair, Receiver> createObjectQueue(bool nonBlocking = true, size_t pipeBufferSize = 0); + std::pair, Receiver> createObjectQueue(bool sendNonBlocking = true, bool writeNonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true); /** * The notifier's end of a channel used to communicate between threads. @@ -154,7 +164,7 @@ namespace channel class Waiter { public: - Waiter(FDWrapper&&); + Waiter(FDWrapper&&, bool throwOnEOF = true); Waiter(const Waiter&) = delete; Waiter& operator=(const Waiter&) = delete; Waiter(Waiter&&) = default; @@ -170,9 +180,18 @@ namespace channel * \return A valid descriptor or -1 if the Waiter was not properly initialized. */ int getDescriptor() const; + /** + * \brief Whether the remote end has closed the channel. + */ + bool isClosed() const + { + return d_closed; + } private: FDWrapper d_fd; + bool d_closed{false}; + bool d_throwOnEOF{true}; }; /** @@ -182,7 +201,7 @@ namespace channel * * \throw runtime_error if the channel creation failed. */ - std::pair createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0); + std::pair createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true); template bool Sender::send(std::unique_ptr&& object) const @@ -190,72 +209,74 @@ namespace channel // we do not release right away because we might need the custom deleter later auto ptr = object.get(); static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail"); - ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr)); + while (true) { + ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr)); - if (sent == sizeof(ptr)) { - // we cannot touch it anymore - object.release(); - } - else { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return false; + if (sent == sizeof(ptr)) { + // we cannot touch it anymore + object.release(); + return true; } else { - throw std::runtime_error("Unable to write to channel:" + stringerror()); + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return false; + } + else { + throw std::runtime_error("Unable to write to channel:" + stringerror()); + } } } + } - return true; + template + void Sender::close() + { + d_fd.reset(); } template - std::optional> Receiver::receive() const + std::optional> Receiver::receive() { - std::optional> result; - T* obj{nullptr}; - ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj)); - if (got == sizeof(obj)) { - return std::unique_ptr(obj); - } - else if (got == 0) { - throw std::runtime_error("EOF while reading from Channel receiver"); - } - else if (got == -1) { - if (errno == EAGAIN || errno == EINTR) { - return result; - } - throw std::runtime_error("Error while reading from Channel receiver: " + stringerror()); - } - else { - throw std::runtime_error("Partial read from Channel receiver"); - } + return receive(std::default_delete()); } template - std::optional> Receiver::receive(D deleter) const + std::optional> Receiver::receive(D deleter) { - std::optional> result; - T* obj{nullptr}; - ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj)); - if (got == sizeof(obj)) { - return std::unique_ptr(obj, deleter); - } - else if (got == 0) { - throw std::runtime_error("EOF while reading from Channel receiver"); - } - else if (got == -1) { - if (errno == EAGAIN || errno == EINTR) { - return result; + while (true) { + std::optional> result; + T* obj{nullptr}; + ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj)); + if (got == sizeof(obj)) { + return std::unique_ptr(obj, deleter); + } + else if (got == 0) { + d_closed = true; + if (!d_throwOnEOF) { + return result; + } + throw std::runtime_error("EOF while reading from Channel receiver"); + } + else if (got == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return result; + } + throw std::runtime_error("Error while reading from Channel receiver: " + stringerror()); + } + else { + throw std::runtime_error("Partial read from Channel receiver"); } - throw std::runtime_error("Error while reading from Channel receiver: " + stringerror()); - } - else { - throw std::runtime_error("Partial read from Channel receiver"); } } template - std::pair, Receiver> createObjectQueue(bool nonBlocking, size_t pipeBufferSize) + std::pair, Receiver> createObjectQueue(bool sendNonBlocking, bool receiveNonBlocking, size_t pipeBufferSize, bool throwOnEOF) { int fds[2] = {-1, -1}; if (pipe(fds) < 0) { @@ -265,12 +286,12 @@ namespace channel FDWrapper sender(fds[1]); FDWrapper receiver(fds[0]); - if (nonBlocking && !setNonBlocking(receiver.getHandle())) { + if (receiveNonBlocking && !setNonBlocking(receiver.getHandle())) { int err = errno; throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); } - if (nonBlocking && !setNonBlocking(sender.getHandle())) { + if (sendNonBlocking && !setNonBlocking(sender.getHandle())) { int err = errno; throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); } @@ -279,7 +300,7 @@ namespace channel setPipeBufferSize(receiver.getHandle(), pipeBufferSize); } - return std::pair(Sender(std::move(sender)), Receiver(std::move(receiver))); + return std::pair(Sender(std::move(sender)), Receiver(std::move(receiver), throwOnEOF)); } } } diff --git a/pdns/delaypipe.cc b/pdns/delaypipe.cc index be363d6391..645bcf5610 100644 --- a/pdns/delaypipe.cc +++ b/pdns/delaypipe.cc @@ -28,34 +28,23 @@ template ObjectPipe::ObjectPipe() { - if(pipe(d_fds)) - unixDie("pipe"); -} - -template -ObjectPipe::~ObjectPipe() -{ - ::close(d_fds[0]); - if(d_fds[1] >= 0) - ::close(d_fds[1]); + auto [sender, receiver] = pdns::channel::createObjectQueue(false, true, 0, false); + d_sender = std::move(sender); + d_receiver = std::move(receiver); } template void ObjectPipe::close() { - if(d_fds[1] < 0) - return; - ::close(d_fds[1]); // the writing side - d_fds[1]=-1; + d_sender.close(); } template void ObjectPipe::write(T& t) { - auto ptr = new T(t); - if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) { - delete ptr; - unixDie("write"); + auto ptr = std::make_unique(t); + if (!d_sender.send(std::move(ptr))) { + unixDie("writing to the DelayPipe"); } } @@ -63,7 +52,7 @@ template int ObjectPipe::readTimeout(T* t, double msec) { while (true) { - int ret = waitForData(d_fds[0], 0, 1000*msec); + int ret = waitForData(d_receiver.getDescriptor(), 0, 1000*msec); if (ret < 0) { if (errno == EINTR) { continue; @@ -74,26 +63,21 @@ int ObjectPipe::readTimeout(T* t, double msec) return -1; } - T* ptr = nullptr; - ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING! - - if (ret < 0) { - if (errno == EINTR) { + try { + auto tmp = d_receiver.receive(); + if (!tmp) { + if (d_receiver.isClosed()) { + return 0; + } continue; } - unixDie("read"); - } - else if (ret == 0) { - return false; - } - if (ret != sizeof(ptr)) { - throw std::runtime_error("Partial read, should not happen 2"); + *t = **tmp; + return 1; + } + catch (const std::exception& e) { + throw std::runtime_error("reading from the delay pipe: " + std::string(e.what())); } - - *t = *ptr; - delete ptr; - return 1; } } @@ -149,7 +133,7 @@ void DelayPipe::worker() The other special case is that the first we have to do.. is in the past, so we need to do it immediately. */ - + double delay=-1; // infinite struct timespec now; if(!d_work.empty()) { @@ -160,7 +144,7 @@ void DelayPipe::worker() } } if(delay != 0 ) { - int ret = d_pipe.readTimeout(&c, delay); + int ret = d_pipe.readTimeout(&c, delay); if(ret > 0) { // we got an object d_work.emplace(c.when, c.what); } diff --git a/pdns/delaypipe.hh b/pdns/delaypipe.hh index ad1626a999..b12fd50eac 100644 --- a/pdns/delaypipe.hh +++ b/pdns/delaypipe.hh @@ -20,10 +20,11 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #pragma once -#include #include #include +#include "channel.hh" + /** General idea: many threads submit work to this class, but only one executes it. The work should therefore be entirely trivial. The implementation is that submitter threads create an object that represents the work, and it gets sent over a pipe @@ -41,12 +42,12 @@ class ObjectPipe { public: ObjectPipe(); - ~ObjectPipe(); void write(T& t); int readTimeout(T* t, double msec); //!< -1 is timeout, 0 is no data, 1 is data. msec<0 waits infinitely long. msec==0 = undefined void close(); private: - int d_fds[2]; + pdns::channel::Sender d_sender; + pdns::channel::Receiver d_receiver; }; template diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 17309a4f66..b72f0e0f5f 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -130,11 +130,11 @@ TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector& tcpAcceptStates) { try { - auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue(true, g_tcpInternalPipeBufferSize); + auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue(true, true, g_tcpInternalPipeBufferSize); - auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue(true, g_tcpInternalPipeBufferSize); + auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue(true, true, g_tcpInternalPipeBufferSize); - auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue(true, g_tcpInternalPipeBufferSize); + auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue(true, true, g_tcpInternalPipeBufferSize); vinfolog("Adding TCP Client thread"); diff --git a/pdns/dnsdistdist/dnsdist-nghttp2.cc b/pdns/dnsdistdist/dnsdist-nghttp2.cc index 34e8abe5eb..52403b9b04 100644 --- a/pdns/dnsdistdist/dnsdist-nghttp2.cc +++ b/pdns/dnsdistdist/dnsdist-nghttp2.cc @@ -1015,7 +1015,7 @@ void DoHClientCollection::addThread() { #ifdef HAVE_NGHTTP2 try { - auto [sender, receiver] = pdns::channel::createObjectQueue(true, g_tcpInternalPipeBufferSize); + auto [sender, receiver] = pdns::channel::createObjectQueue(true, true, g_tcpInternalPipeBufferSize); vinfolog("Adding DoH Client thread"); std::lock_guard lock(d_mutex); diff --git a/pdns/dnsdistdist/doh.cc b/pdns/dnsdistdist/doh.cc index 9feca65218..57da686ee5 100644 --- a/pdns/dnsdistdist/doh.cc +++ b/pdns/dnsdistdist/doh.cc @@ -174,14 +174,14 @@ struct DOHServerConfig { #ifndef USE_SINGLE_ACCEPTOR_THREAD { - auto [sender, receiver] = pdns::channel::createObjectQueue(true, internalPipeBufferSize); + auto [sender, receiver] = pdns::channel::createObjectQueue(true, true, internalPipeBufferSize); d_querySender = std::move(sender); d_queryReceiver = std::move(receiver); } #endif /* USE_SINGLE_ACCEPTOR_THREAD */ { - auto [sender, receiver] = pdns::channel::createObjectQueue(true, internalPipeBufferSize); + auto [sender, receiver] = pdns::channel::createObjectQueue(true, true, internalPipeBufferSize); d_responseSender = std::move(sender); d_responseReceiver = std::move(receiver); } diff --git a/pdns/dnsdistdist/test-delaypipe_hh.cc b/pdns/dnsdistdist/test-delaypipe_hh.cc index 9e678ee40e..41040a906f 100644 --- a/pdns/dnsdistdist/test-delaypipe_hh.cc +++ b/pdns/dnsdistdist/test-delaypipe_hh.cc @@ -22,8 +22,7 @@ BOOST_AUTO_TEST_CASE(test_object_pipe) { op.close(); BOOST_CHECK_EQUAL(op.readTimeout(&i, 1), 0); - -}; +} std::atomic done = 0; BOOST_AUTO_TEST_CASE(test_delay_pipe_small) { @@ -53,9 +52,9 @@ BOOST_AUTO_TEST_CASE(test_delay_pipe_small) { sleep(1); BOOST_CHECK_EQUAL(done, n); -}; +} -BOOST_AUTO_TEST_CASE(test_delay_pipe_big) { +BOOST_AUTO_TEST_CASE(test_delay_pipe_big) { done=0; struct Work { @@ -74,7 +73,6 @@ BOOST_AUTO_TEST_CASE(test_delay_pipe_big) { sleep(1); BOOST_CHECK_EQUAL(done, n); -}; - +} BOOST_AUTO_TEST_SUITE_END(); diff --git a/pdns/misc.hh b/pdns/misc.hh index 5757cccc5c..1a40a14d2b 100644 --- a/pdns/misc.hh +++ b/pdns/misc.hh @@ -794,10 +794,7 @@ struct FDWrapper ~FDWrapper() { - if (d_fd != -1) { - close(d_fd); - d_fd = -1; - } + reset(); } FDWrapper(FDWrapper&& rhs) noexcept : d_fd(rhs.d_fd) @@ -825,6 +822,14 @@ struct FDWrapper return d_fd; } + void reset() + { + if (d_fd != -1) { + ::close(d_fd); + d_fd = -1; + } + } + private: int d_fd{-1}; }; diff --git a/pdns/snmp-agent.cc b/pdns/snmp-agent.cc index b70a5b280d..1ab0bd938f 100644 --- a/pdns/snmp-agent.cc +++ b/pdns/snmp-agent.cc @@ -186,7 +186,7 @@ SNMPAgent::SNMPAgent(const std::string& name, const std::string& daemonSocket) init_snmp(name.c_str()); - auto [sender, receiver] = pdns::channel::createObjectQueue(true); + auto [sender, receiver] = pdns::channel::createObjectQueue(true, true); d_sender = std::move(sender); d_receiver = std::move(receiver); #endif /* HAVE_NET_SNMP */ -- 2.47.2