class Sender
{
public:
- Sender()
- {
- }
+ Sender() = default;
Sender(FDWrapper&& descriptor) :
d_fd(std::move(descriptor))
{
Sender& operator=(const Sender&) = delete;
Sender(Sender&&) = default;
Sender& operator=(Sender&&) = default;
+ ~Sender() = default;
/**
* \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode.
*
class Receiver
{
public:
- Receiver()
- {
- }
+ Receiver() = default;
Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) :
d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF)
{
Receiver& operator=(const Receiver&) = delete;
Receiver(Receiver&&) = default;
Receiver& operator=(Receiver&&) = default;
+ ~Receiver() = default;
/**
* \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode.
*
class Notifier
{
public:
- Notifier()
- {
- }
+ Notifier() = default;
Notifier(FDWrapper&&);
Notifier(const Notifier&) = delete;
Notifier& operator=(const Notifier&) = delete;
Notifier(Notifier&&) = default;
Notifier& operator=(Notifier&&) = default;
+ ~Notifier() = default;
/**
* \brief Queue a notification to wake up the other end of the channel.
class Waiter
{
public:
+ Waiter() = default;
Waiter(FDWrapper&&, bool throwOnEOF = true);
Waiter(const Waiter&) = delete;
Waiter& operator=(const Waiter&) = delete;
Waiter(Waiter&&) = default;
Waiter& operator=(Waiter&&) = default;
+ ~Waiter() = default;
/**
* \brief Clear all notifications queued on that channel, if any.
namespace dnsdist
{
-AsynchronousHolder::AsynchronousHolder(bool failOpen) :
- d_data(std::make_shared<Data>())
+AsynchronousHolder::Data::Data(bool failOpen) :
+ d_failOpen(failOpen)
{
- d_data->d_failOpen = failOpen;
-
- int fds[2] = {-1, -1};
- if (pipe(fds) < 0) {
- throw std::runtime_error("Error creating the AsynchronousHolder pipe: " + stringerror());
- }
-
- for (size_t idx = 0; idx < (sizeof(fds) / sizeof(*fds)); idx++) {
- if (!setNonBlocking(fds[idx])) {
- int err = errno;
- close(fds[0]);
- close(fds[1]);
- throw std::runtime_error("Error setting the AsynchronousHolder pipe non-blocking: " + stringerror(err));
- }
- }
-
- d_data->d_notifyPipe = FDWrapper(fds[1]);
- d_data->d_watchPipe = FDWrapper(fds[0]);
+ auto [notifier, waiter] = pdns::channel::createNotificationQueue(true);
+ d_waiter = std::move(waiter);
+ d_notifier = std::move(notifier);
+}
+AsynchronousHolder::AsynchronousHolder(bool failOpen) :
+ d_data(std::make_shared<Data>(failOpen))
+{
std::thread main([data = this->d_data] { mainThread(data); });
main.detach();
}
bool AsynchronousHolder::notify() const
{
- const char data = 0;
- bool failed = false;
- do {
- auto written = write(d_data->d_notifyPipe.getHandle(), &data, sizeof(data));
- if (written == 0) {
- break;
- }
- if (written > 0 && static_cast<size_t>(written) == sizeof(data)) {
- return true;
- }
- if (errno != EINTR) {
- failed = true;
- }
- } while (!failed);
-
- return false;
+ return d_data->d_notifier.notify();
}
-bool AsynchronousHolder::wait(const AsynchronousHolder::Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs)
+bool AsynchronousHolder::wait(AsynchronousHolder::Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs)
{
readyFDs.clear();
mplexer.getAvailableFDs(readyFDs, atMostMs);
return true;
}
- while (true) {
- /* we might have been notified several times, let's read
- as much as possible before returning */
- char dummy = 0;
- auto got = read(data.d_watchPipe.getHandle(), &dummy, sizeof(dummy));
- if (got == 0) {
- break;
- }
- if (got > 0 && static_cast<size_t>(got) != sizeof(dummy)) {
- continue;
- }
- if (got == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- break;
- }
- }
-
+ data.d_waiter.clear();
return false;
}
std::list<std::pair<uint16_t, std::unique_ptr<CrossProtocolQuery>>> expiredEvents;
auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(1));
- mplexer->addReadFD(data->d_watchPipe.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
+ mplexer->addReadFD(data->d_waiter.getDescriptor(), [](int, FDMultiplexer::funcparam_t&) {});
std::vector<int> readyFDs;
while (true) {
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/key_extractors.hpp>
+#include "channel.hh"
#include "dnsdist-tcp.hh"
namespace dnsdist
struct Data
{
+ Data(bool failOpen);
+ Data(const Data&) = delete;
+ Data(Data&&) = delete;
+ Data& operator=(const Data&) = delete;
+ Data& operator=(Data&&) = delete;
+ ~Data() = default;
+
LockGuarded<content_t> d_content;
- FDWrapper d_notifyPipe;
- FDWrapper d_watchPipe;
+ pdns::channel::Notifier d_notifier;
+ pdns::channel::Waiter d_waiter;
bool d_failOpen{true};
bool d_done{false};
};
std::shared_ptr<Data> d_data{nullptr};
static void mainThread(std::shared_ptr<Data> data);
- static bool wait(const Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs);
+ static bool wait(Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs);
bool notify() const;
};