From: Remi Gacogne Date: Thu, 17 Mar 2022 16:02:05 +0000 (+0100) Subject: dnsdist: Implement Channels for communication between threads X-Git-Tag: rec-5.0.0-alpha1~161^2~24 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=3fc6d61ca79cd59d3a620badb2255429a04e6155;p=thirdparty%2Fpdns.git dnsdist: Implement Channels for communication between threads Internally this uses the same mechanism as before, passing pointers over a pipe, but the new classes make that easier to use: - clear separation between sender and receiver - clear ownership of the descriptor - less code duplication --- diff --git a/pdns/dnsdistdist/Makefile.am b/pdns/dnsdistdist/Makefile.am index 439ca398ab..44487953ff 100644 --- a/pdns/dnsdistdist/Makefile.am +++ b/pdns/dnsdistdist/Makefile.am @@ -130,6 +130,7 @@ dnsdist_SOURCES = \ burtle.hh \ cachecleaner.hh \ capabilities.cc capabilities.hh \ + channel.cc channel.hh \ circular_buffer.hh \ connection-management.hh \ credentials.cc credentials.hh \ @@ -242,6 +243,7 @@ testrunner_SOURCES = \ base64.hh \ bpf-filter.cc bpf-filter.hh \ cachecleaner.hh \ + channel.cc channel.hh \ circular_buffer.hh \ connection-management.hh \ credentials.cc credentials.hh \ diff --git a/pdns/dnsdistdist/channel.cc b/pdns/dnsdistdist/channel.cc new file mode 100644 index 0000000000..dfa3369765 --- /dev/null +++ b/pdns/dnsdistdist/channel.cc @@ -0,0 +1,109 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "channel.hh" + +namespace pdns +{ +namespace channel +{ + + Notifier::Notifier(FDWrapper&& fd) : + d_fd(std::move(fd)) + { + } + + bool Notifier::notify() const + { + char data = 'a'; + auto sent = write(d_fd.getHandle(), &data, sizeof(data)); + if (sent != sizeof(data)) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return false; + } + else { + throw std::runtime_error("Unable to write to channel notifier pipe: " + stringerror()); + } + } + return true; + } + + Waiter::Waiter(FDWrapper&& fd) : + d_fd(std::move(fd)) + { + } + + void Waiter::clear() const + { + ssize_t got; + do { + char data; + got = read(d_fd.getHandle(), &data, sizeof(data)); + if (got == 0) { + throw std::runtime_error("EOF while clearing channel notifier pipe"); + } + else if (got == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + throw std::runtime_error("Error while clearing channel notifier pipe: " + stringerror()); + } + } while (got); + } + + int Waiter::getDescriptor() const + { + return d_fd.getHandle(); + } + + std::pair createNotificationQueue(bool nonBlocking, size_t pipeBufferSize) + { + int fds[2] = { -1, -1}; + if (pipe(fds) < 0) { + throw std::runtime_error("Error creating notification channel pipe: " + stringerror()); + } + + if (nonBlocking && !setNonBlocking(fds[0])) { + int err = errno; + close(fds[0]); + close(fds[1]); + throw std::runtime_error("Error making notification channel pipe non-blocking: " + stringerror(err)); + } + + if (nonBlocking && !setNonBlocking(fds[1])) { + int err = errno; + close(fds[0]); + close(fds[1]); + throw std::runtime_error("Error making notification channel pipe non-blocking: " + stringerror(err)); + } + + if (pipeBufferSize > 0 && getPipeBufferSize(fds[0]) < pipeBufferSize) { + setPipeBufferSize(fds[0], pipeBufferSize); + } + + FDWrapper sender(fds[1]); + FDWrapper receiver(fds[0]); + + return std::pair(Notifier(std::move(sender)), Waiter(std::move(receiver))); + } +} +} diff --git a/pdns/dnsdistdist/channel.hh b/pdns/dnsdistdist/channel.hh new file mode 100644 index 0000000000..247264c5d4 --- /dev/null +++ b/pdns/dnsdistdist/channel.hh @@ -0,0 +1,261 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once +#include +#include + +#include "misc.hh" + +namespace pdns +{ +namespace channel +{ + /** + * The sender's end of a channel used to pass objects between threads. + * + * A sender can be used by several threads in a safe way. + */ + template + class Sender + { + public: + Sender() + { + } + Sender(FDWrapper&& fd) : + d_fd(std::move(fd)) + { + } + Sender(const Sender&) = delete; + Sender& operator=(const Sender&) = delete; + Sender(Sender&&) = default; + Sender& operator=(Sender&&) = default; + /** + * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode. + * + * \return True if the object was properly sent, False if the channel is full. + * + * \throw runtime_error if the channel is broken, for example if the other end has been closed. + */ + bool send(std::unique_ptr&&) const; + + private: + FDWrapper d_fd; + }; + + /** + * The receiver's end of a channel used to pass objects between threads. + * + * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen. + */ + template + class Receiver + { + public: + Receiver() + { + } + Receiver(FDWrapper&& fd) : + d_fd(std::move(fd)) + { + } + Receiver(const Receiver&) = delete; + Receiver& operator=(const Receiver&) = delete; + Receiver(Receiver&&) = default; + Receiver& operator=(Receiver&&) = default; + /** + * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode. + * + * \return An object if one was available, and std::nullopt otherwise. + * + * \throw runtime_error if the channel is broken, for example if the other end has been closed. + */ + std::optional> receive() const; + + /** + * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available. + * + * \return A valid descriptor or -1 if the Receiver was not properly initialized. + */ + int getDescriptor() const + { + return d_fd.getHandle(); + } + + private: + FDWrapper d_fd; + }; + + /** + * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers. + * + * \return A pair of Sender and Receiver objects. + * + * \throw runtime_error if the channel creation failed. + */ + template + std::pair, Receiver> createObjectQueue(bool nonBlocking = true, size_t pipeBufferSize = 0); + + /** + * The notifier's end of a channel used to communicate between threads. + * + * A notifier can be used by several threads in a safe way. + */ + class Notifier + { + public: + Notifier() + { + } + Notifier(FDWrapper&&); + Notifier(const Notifier&) = delete; + Notifier& operator=(const Notifier&) = delete; + Notifier(Notifier&&) = default; + Notifier& operator=(Notifier&&) = default; + + /** + * \brief Queue a notification to wake up the other end of the channel. + * + * \return True if the notification was properly sent, False if the channel is full. + * + * \throw runtime_error if the channel is broken, for example if the other end has been closed. + */ + bool notify() const; + + private: + FDWrapper d_fd; + }; + + /** + * The waiter's end of a channel used to communicate between threads. + * + * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen. + */ + class Waiter + { + public: + Waiter(FDWrapper&&); + Waiter(const Waiter&) = delete; + Waiter& operator=(const Waiter&) = delete; + Waiter(Waiter&&) = default; + Waiter& operator=(Waiter&&) = default; + + /** + * \brief Clear all notifications queued on that channel, if any. + */ + void clear() const; + /** + * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive. + * + * \return A valid descriptor or -1 if the Waiter was not properly initialized. + */ + int getDescriptor() const; + + private: + FDWrapper d_fd; + }; + + /** + * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers. + * + * \return A pair of Notifier and Sender objects. + * + * \throw runtime_error if the channel creation failed. + */ + std::pair createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0); + + template + bool Sender::send(std::unique_ptr&& object) const + { + auto ptr = object.release(); + static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail"); + ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr)); + + if (sent != sizeof(ptr)) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return false; + } + else { + throw std::runtime_error("Unable to write to channel:" + stringerror()); + } + delete ptr; + } + + return true; + } + + template + std::optional> Receiver::receive() const + { + std::optional> result; + T* obj{nullptr}; + ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj)); + if (got == sizeof(obj)) { + return std::unique_ptr(obj); + } + else if (got == 0) { + throw std::runtime_error("EOF while reading from Channel receiver"); + } + else if (got == -1) { + if (errno == EAGAIN || errno == EINTR) { + return result; + } + throw std::runtime_error("Error while reading from Channel receiver: " + stringerror()); + } + else { + throw std::runtime_error("Partial read from Channel receiver"); + } + } + + template + std::pair, Receiver> createObjectQueue(bool nonBlocking, size_t pipeBufferSize) + { + int fds[2] = { -1, -1}; + if (pipe(fds) < 0) { + throw std::runtime_error("Error creating channel pipe: " + stringerror()); + } + + if (nonBlocking && !setNonBlocking(fds[0])) { + int err = errno; + close(fds[0]); + close(fds[1]); + throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); + } + + if (nonBlocking && !setNonBlocking(fds[1])) { + int err = errno; + close(fds[0]); + close(fds[1]); + throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); + } + + if (pipeBufferSize > 0 && getPipeBufferSize(fds[0]) < pipeBufferSize) { + setPipeBufferSize(fds[0], pipeBufferSize); + } + + FDWrapper sender(fds[1]); + FDWrapper receiver(fds[0]); + + return std::pair(Sender(std::move(sender)), Receiver(std::move(receiver))); + } +} +}