bool Notifier::notify() const
{
char data = 'a';
- auto sent = write(d_fd.getHandle(), &data, sizeof(data));
- if (sent != sizeof(data)) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- return false;
- }
- else {
- throw std::runtime_error("Unable to write to channel notifier pipe: " + stringerror());
+ while (true) {
+ auto sent = write(d_fd.getHandle(), &data, sizeof(data));
+ if (sent != sizeof(data)) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return false;
+ }
+ else {
+ throw std::runtime_error("Unable to write to channel notifier pipe: " + stringerror());
+ }
}
+ return true;
}
- return true;
}
- Waiter::Waiter(FDWrapper&& fd) :
- d_fd(std::move(fd))
+ Waiter::Waiter(FDWrapper&& fd, bool throwOnEOF) :
+ d_fd(std::move(fd)), d_throwOnEOF(throwOnEOF)
{
}
char data;
got = read(d_fd.getHandle(), &data, sizeof(data));
if (got == 0) {
+ if (!d_throwOnEOF) {
+ return;
+ }
throw std::runtime_error("EOF while clearing channel notifier pipe");
}
else if (got == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
return d_fd.getHandle();
}
- std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking, size_t pipeBufferSize)
+ std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking, size_t pipeBufferSize, bool throwOnEOF)
{
int fds[2] = {-1, -1};
if (pipe(fds) < 0) {
setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
}
- return std::pair(Notifier(std::move(sender)), Waiter(std::move(receiver)));
+ return std::pair(Notifier(std::move(sender)), Waiter(std::move(receiver), throwOnEOF));
}
}
}
* \throw runtime_error if the channel is broken, for example if the other end has been closed.
*/
bool send(std::unique_ptr<T, D>&&) const;
+ void close();
private:
FDWrapper d_fd;
Receiver()
{
}
- Receiver(FDWrapper&& fd) :
- d_fd(std::move(fd))
+ Receiver(FDWrapper&& fd, bool throwOnEOF = true) :
+ d_fd(std::move(fd)), d_throwOnEOF(throwOnEOF)
{
}
Receiver(const Receiver&) = delete;
*
* \throw runtime_error if the channel is broken, for example if the other end has been closed.
*/
- std::optional<std::unique_ptr<T, D>> receive() const;
- std::optional<std::unique_ptr<T, D>> receive(D deleter) const;
+ std::optional<std::unique_ptr<T, D>> receive();
+ std::optional<std::unique_ptr<T, D>> receive(D deleter);
/**
* \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available.
{
return d_fd.getHandle();
}
+ /**
+ * \brief Whether the remote end has closed the channel.
+ */
+ bool isClosed() const
+ {
+ return d_closed;
+ }
private:
FDWrapper d_fd;
+ bool d_closed{false};
+ bool d_throwOnEOF{true};
};
/**
* \throw runtime_error if the channel creation failed.
*/
template <typename T, typename D = std::default_delete<T>>
- std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool nonBlocking = true, size_t pipeBufferSize = 0);
+ std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking = true, bool writeNonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
/**
* The notifier's end of a channel used to communicate between threads.
class Waiter
{
public:
- Waiter(FDWrapper&&);
+ Waiter(FDWrapper&&, bool throwOnEOF = true);
Waiter(const Waiter&) = delete;
Waiter& operator=(const Waiter&) = delete;
Waiter(Waiter&&) = default;
* \return A valid descriptor or -1 if the Waiter was not properly initialized.
*/
int getDescriptor() const;
+ /**
+ * \brief Whether the remote end has closed the channel.
+ */
+ bool isClosed() const
+ {
+ return d_closed;
+ }
private:
FDWrapper d_fd;
+ bool d_closed{false};
+ bool d_throwOnEOF{true};
};
/**
*
* \throw runtime_error if the channel creation failed.
*/
- std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0);
+ std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
template <typename T, typename D>
bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const
// we do not release right away because we might need the custom deleter later
auto ptr = object.get();
static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail");
- ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
+ while (true) {
+ ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
- if (sent == sizeof(ptr)) {
- // we cannot touch it anymore
- object.release();
- }
- else {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- return false;
+ if (sent == sizeof(ptr)) {
+ // we cannot touch it anymore
+ object.release();
+ return true;
}
else {
- throw std::runtime_error("Unable to write to channel:" + stringerror());
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return false;
+ }
+ else {
+ throw std::runtime_error("Unable to write to channel:" + stringerror());
+ }
}
}
+ }
- return true;
+ template <typename T, typename D>
+ void Sender<T, D>::close()
+ {
+ d_fd.reset();
}
template <typename T, typename D>
- std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive() const
+ std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive()
{
- std::optional<std::unique_ptr<T, D>> result;
- T* obj{nullptr};
- ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj));
- if (got == sizeof(obj)) {
- return std::unique_ptr<T, D>(obj);
- }
- else if (got == 0) {
- throw std::runtime_error("EOF while reading from Channel receiver");
- }
- else if (got == -1) {
- if (errno == EAGAIN || errno == EINTR) {
- return result;
- }
- throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
- }
- else {
- throw std::runtime_error("Partial read from Channel receiver");
- }
+ return receive(std::default_delete<T>());
}
template <typename T, typename D>
- std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter) const
+ std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter)
{
- std::optional<std::unique_ptr<T, D>> result;
- T* obj{nullptr};
- ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj));
- if (got == sizeof(obj)) {
- return std::unique_ptr<T, D>(obj, deleter);
- }
- else if (got == 0) {
- throw std::runtime_error("EOF while reading from Channel receiver");
- }
- else if (got == -1) {
- if (errno == EAGAIN || errno == EINTR) {
- return result;
+ while (true) {
+ std::optional<std::unique_ptr<T, D>> result;
+ T* obj{nullptr};
+ ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj));
+ if (got == sizeof(obj)) {
+ return std::unique_ptr<T, D>(obj, deleter);
+ }
+ else if (got == 0) {
+ d_closed = true;
+ if (!d_throwOnEOF) {
+ return result;
+ }
+ throw std::runtime_error("EOF while reading from Channel receiver");
+ }
+ else if (got == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return result;
+ }
+ throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
+ }
+ else {
+ throw std::runtime_error("Partial read from Channel receiver");
}
- throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
- }
- else {
- throw std::runtime_error("Partial read from Channel receiver");
}
}
template <typename T, typename D>
- std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool nonBlocking, size_t pipeBufferSize)
+ std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking, bool receiveNonBlocking, size_t pipeBufferSize, bool throwOnEOF)
{
int fds[2] = {-1, -1};
if (pipe(fds) < 0) {
FDWrapper sender(fds[1]);
FDWrapper receiver(fds[0]);
- if (nonBlocking && !setNonBlocking(receiver.getHandle())) {
+ if (receiveNonBlocking && !setNonBlocking(receiver.getHandle())) {
int err = errno;
throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
}
- if (nonBlocking && !setNonBlocking(sender.getHandle())) {
+ if (sendNonBlocking && !setNonBlocking(sender.getHandle())) {
int err = errno;
throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
}
setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
}
- return std::pair(Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver)));
+ return std::pair(Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF));
}
}
}
template<class T>
ObjectPipe<T>::ObjectPipe()
{
- if(pipe(d_fds))
- unixDie("pipe");
-}
-
-template<class T>
-ObjectPipe<T>::~ObjectPipe()
-{
- ::close(d_fds[0]);
- if(d_fds[1] >= 0)
- ::close(d_fds[1]);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<T>(false, true, 0, false);
+ d_sender = std::move(sender);
+ d_receiver = std::move(receiver);
}
template<class T>
void ObjectPipe<T>::close()
{
- if(d_fds[1] < 0)
- return;
- ::close(d_fds[1]); // the writing side
- d_fds[1]=-1;
+ d_sender.close();
}
template<class T>
void ObjectPipe<T>::write(T& t)
{
- auto ptr = new T(t);
- if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) {
- delete ptr;
- unixDie("write");
+ auto ptr = std::make_unique<T>(t);
+ if (!d_sender.send(std::move(ptr))) {
+ unixDie("writing to the DelayPipe");
}
}
int ObjectPipe<T>::readTimeout(T* t, double msec)
{
while (true) {
- int ret = waitForData(d_fds[0], 0, 1000*msec);
+ int ret = waitForData(d_receiver.getDescriptor(), 0, 1000*msec);
if (ret < 0) {
if (errno == EINTR) {
continue;
return -1;
}
- T* ptr = nullptr;
- ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING!
-
- if (ret < 0) {
- if (errno == EINTR) {
+ try {
+ auto tmp = d_receiver.receive();
+ if (!tmp) {
+ if (d_receiver.isClosed()) {
+ return 0;
+ }
continue;
}
- unixDie("read");
- }
- else if (ret == 0) {
- return false;
- }
- if (ret != sizeof(ptr)) {
- throw std::runtime_error("Partial read, should not happen 2");
+ *t = **tmp;
+ return 1;
+ }
+ catch (const std::exception& e) {
+ throw std::runtime_error("reading from the delay pipe: " + std::string(e.what()));
}
-
- *t = *ptr;
- delete ptr;
- return 1;
}
}
The other special case is that the first we have to do.. is in the past, so we need to do it
immediately. */
-
+
double delay=-1; // infinite
struct timespec now;
if(!d_work.empty()) {
}
}
if(delay != 0 ) {
- int ret = d_pipe.readTimeout(&c, delay);
+ int ret = d_pipe.readTimeout(&c, delay);
if(ret > 0) { // we got an object
d_work.emplace(c.when, c.what);
}