{
namespace channel
{
+ enum class SenderBlockingMode
+ {
+ SenderNonBlocking,
+ SenderBlocking
+ };
+ enum class ReceiverBlockingMode
+ {
+ ReceiverNonBlocking,
+ ReceiverBlocking
+ };
+
/**
* The sender's end of a channel used to pass objects between threads.
*
* \throw runtime_error if the channel creation failed.
*/
template <typename T, typename D = std::default_delete<T>>
- std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking = true, bool receiveNonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
+ std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true);
/**
* The notifier's end of a channel used to communicate between threads.
}
template <typename T, typename D>
- std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking, bool receiveNonBlocking, size_t pipeBufferSize, bool throwOnEOF)
+ std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF)
{
int fds[2] = {-1, -1};
if (pipe(fds) < 0) {
FDWrapper sender(fds[1]);
FDWrapper receiver(fds[0]);
-
- if (receiveNonBlocking && !setNonBlocking(receiver.getHandle())) {
+ if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) {
int err = errno;
throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
}
- if (sendNonBlocking && !setNonBlocking(sender.getHandle())) {
+ if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) {
int err = errno;
throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
}
template<class T>
ObjectPipe<T>::ObjectPipe()
{
- auto [sender, receiver] = pdns::channel::createObjectQueue<T>(false, true, 0, false);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<T>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0, false);
d_sender = std::move(sender);
d_receiver = std::move(receiver);
}
}
for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
- auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(false, false);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking);
d_senders.push_back(std::move(sender));
d_receivers.push_back(std::move(receiver));
}
void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
{
try {
- auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(true, true, g_tcpInternalPipeBufferSize);
+ auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
- auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(true, true, g_tcpInternalPipeBufferSize);
+ auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
- auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(true, true, g_tcpInternalPipeBufferSize);
+ auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
vinfolog("Adding TCP Client thread");
{
#ifdef HAVE_NGHTTP2
try {
- auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(true, true, g_tcpInternalPipeBufferSize);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
vinfolog("Adding DoH Client thread");
std::lock_guard<std::mutex> lock(d_mutex);
{
#ifndef USE_SINGLE_ACCEPTOR_THREAD
{
- auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, false, internalPipeBufferSize);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking, internalPipeBufferSize);
d_querySender = std::move(sender);
d_queryReceiver = std::move(receiver);
}
#endif /* USE_SINGLE_ACCEPTOR_THREAD */
{
- auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, true, internalPipeBufferSize);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
d_responseSender = std::move(sender);
d_responseReceiver = std::move(receiver);
}
init_snmp(name.c_str());
- auto [sender, receiver] = pdns::channel::createObjectQueue<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>(true, true);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>();
d_sender = std::move(sender);
d_receiver = std::move(receiver);
#endif /* HAVE_NET_SNMP */
BOOST_AUTO_TEST_CASE(test_object_queue_do_not_throw_on_eof)
{
- auto [sender, receiver] = pdns::channel::createObjectQueue<MyObject>(true, true, 0U, false);
+ auto [sender, receiver] = pdns::channel::createObjectQueue<MyObject>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0U, false);
sender.close();
auto got = receiver.receive();
BOOST_CHECK(got == std::nullopt);