*
* A sender can be used by several threads in a safe way.
*/
- template <typename T>
+ template <typename T, typename D = std::default_delete<T>>
class Sender
{
public:
*
* \throw runtime_error if the channel is broken, for example if the other end has been closed.
*/
- bool send(std::unique_ptr<T>&&) const;
+ bool send(std::unique_ptr<T, D>&&) const;
private:
FDWrapper d_fd;
*
* A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen.
*/
- template <typename T>
+ template <typename T, typename D = std::default_delete<T>>
class Receiver
{
public:
*
* \throw runtime_error if the channel is broken, for example if the other end has been closed.
*/
- std::optional<std::unique_ptr<T>> receive() const;
+ std::optional<std::unique_ptr<T, D>> receive() const;
+ std::optional<std::unique_ptr<T, D>> receive(D deleter) const;
/**
* \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available.
*
* \throw runtime_error if the channel creation failed.
*/
- template <typename T>
- std::pair<Sender<T>, Receiver<T>> createObjectQueue(bool nonBlocking = true, size_t pipeBufferSize = 0);
+ template <typename T, typename D = std::default_delete<T>>
+ std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool nonBlocking = true, size_t pipeBufferSize = 0);
/**
* The notifier's end of a channel used to communicate between threads.
*/
std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0);
- template <typename T>
- bool Sender<T>::send(std::unique_ptr<T>&& object) const
+ template <typename T, typename D>
+ bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const
{
- auto ptr = object.release();
+ // we do not release right away because we might need the custom deleter later
+ auto ptr = object.get();
static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail");
ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
- if (sent != sizeof(ptr)) {
- delete ptr;
+ if (sent == sizeof(ptr)) {
+ // we cannot touch it anymore
+ object.release();
+ }
+ else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return false;
}
return true;
}
- template <typename T>
- std::optional<std::unique_ptr<T>> Receiver<T>::receive() const
+ template <typename T, typename D>
+ std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive() const
+ {
+ std::optional<std::unique_ptr<T, D>> result;
+ T* obj{nullptr};
+ ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj));
+ if (got == sizeof(obj)) {
+ return std::unique_ptr<T, D>(obj);
+ }
+ else if (got == 0) {
+ throw std::runtime_error("EOF while reading from Channel receiver");
+ }
+ else if (got == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+ return result;
+ }
+ throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
+ }
+ else {
+ throw std::runtime_error("Partial read from Channel receiver");
+ }
+ }
+
+ template <typename T, typename D>
+ std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter) const
{
- std::optional<std::unique_ptr<T>> result;
+ std::optional<std::unique_ptr<T, D>> result;
T* obj{nullptr};
ssize_t got = read(d_fd.getHandle(), &obj, sizeof(obj));
if (got == sizeof(obj)) {
- return std::unique_ptr<T>(obj);
+ return std::unique_ptr<T, D>(obj, deleter);
}
else if (got == 0) {
throw std::runtime_error("EOF while reading from Channel receiver");
}
}
- template <typename T>
- std::pair<Sender<T>, Receiver<T>> createObjectQueue(bool nonBlocking, size_t pipeBufferSize)
+ template <typename T, typename D>
+ std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool nonBlocking, size_t pipeBufferSize)
{
int fds[2] = {-1, -1};
if (pipe(fds) < 0) {
setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
}
- return std::pair(Sender<T>(std::move(sender)), Receiver<T>(std::move(receiver)));
+ return std::pair(Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver)));
}
}
}
return SNMP_ERR_NOERROR;
}
-bool SNMPAgent::sendTrap(int fd,
+bool SNMPAgent::sendTrap(pdns::channel::Sender<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>& sender,
netsnmp_variable_list* varList)
{
- ssize_t written = write(fd, &varList, sizeof(varList));
-
- if (written != sizeof(varList)) {
- snmp_free_varbind(varList);
+ try {
+ auto obj = std::unique_ptr<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>(varList, snmp_free_varbind);
+ return sender.send(std::move(obj));
+ }
+ catch (...) {
return false;
}
- return true;
}
void SNMPAgent::handleTrapsEvent()
{
- netsnmp_variable_list* varList = nullptr;
- ssize_t got = 0;
-
- do {
- got = read(d_trapPipe[0], &varList, sizeof(varList));
-
- if (got == sizeof(varList)) {
- send_v2trap(varList);
- snmp_free_varbind(varList);
+ try {
+ while (true) {
+ auto obj = d_receiver.receive(snmp_free_varbind);
+ if (!obj) {
+ break;
+ }
+ send_v2trap(obj->get());
}
}
- while (got > 0);
+ catch (const std::exception& e) {
+ }
}
void SNMPAgent::handleSNMPQueryEvent(int fd)
/* we want to be notified if a trap is waiting
to be sent */
- mplexer->addReadFD(d_trapPipe[0], &handleTrapsCB, this);
+ mplexer->addReadFD(d_receiver.getDescriptor(), &handleTrapsCB, this);
while(true) {
netsnmp_large_fd_set_init(&fdset, FD_SETSIZE);
init_snmp(name.c_str());
- if (pipe(d_trapPipe) < 0)
- unixDie("Creating pipe");
-
- if (!setNonBlocking(d_trapPipe[0])) {
- close(d_trapPipe[0]);
- close(d_trapPipe[1]);
- unixDie("Setting pipe non-blocking");
- }
-
- if (!setNonBlocking(d_trapPipe[1])) {
- close(d_trapPipe[0]);
- close(d_trapPipe[1]);
- unixDie("Setting pipe non-blocking");
- }
-
+ auto [sender, receiver] = pdns::channel::createObjectQueue<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>(true);
+ d_sender = std::move(sender);
+ d_receiver = std::move(receiver);
#endif /* HAVE_NET_SNMP */
}
#endif /* HAVE_NET_SNMP */
#include "mplexer.hh"
+#include "channel.hh"
class SNMPAgent
{
SNMPAgent(const std::string& name, const std::string& daemonSocket);
virtual ~SNMPAgent()
{
-#ifdef HAVE_NET_SNMP
-
- close(d_trapPipe[0]);
- close(d_trapPipe[1]);
-#endif /* HAVE_NET_SNMP */
}
void run()
static const oid snmpTrapOID[];
static const size_t snmpTrapOIDLen;
- static bool sendTrap(int fd,
+ static bool sendTrap(pdns::channel::Sender<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>& sender,
netsnmp_variable_list* varList);
- int d_trapPipe[2] = { -1, -1};
+ pdns::channel::Sender<netsnmp_variable_list, void(*)(netsnmp_variable_list*)> d_sender;
+ pdns::channel::Receiver<netsnmp_variable_list, void(*)(netsnmp_variable_list*)> d_receiver;
#endif /* HAVE_NET_SNMP */
private:
void worker();