2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
28 /* g++ defines __SANITIZE_THREAD__
29 clang++ supports the nice __has_feature(thread_sanitizer),
31 #if defined(__has_feature)
32 #if __has_feature(thread_sanitizer)
33 #define __SANITIZE_THREAD__ 1
37 #if __SANITIZE_THREAD__
38 #if defined __has_include
39 #if __has_include(<sanitizer/tsan_interface.h>)
40 #include <sanitizer/tsan_interface.h>
41 #else /* __has_include(<sanitizer/tsan_interface.h>) */
42 extern "C" void __tsan_acquire(void* addr);
43 extern "C" void __tsan_release(void* addr);
44 #endif /* __has_include(<sanitizer/tsan_interface.h>) */
45 #else /* defined __has_include */
46 extern "C" void __tsan_acquire(void* addr);
47 extern "C" void __tsan_release(void* addr);
48 #endif /* defined __has_include */
49 #endif /* __SANITIZE_THREAD__ */
55 enum class SenderBlockingMode
60 enum class ReceiverBlockingMode
67 * The sender's end of a channel used to pass objects between threads.
69 * A sender can be used by several threads in a safe way.
71 template <typename T, typename D = std::default_delete<T>>
76 Sender(FDWrapper&& descriptor) :
77 d_fd(std::move(descriptor))
80 Sender(const Sender&) = delete;
81 Sender& operator=(const Sender&) = delete;
82 Sender(Sender&&) = default;
83 Sender& operator=(Sender&&) = default;
86 * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode.
88 * \return True if the object was properly sent, False if the channel is full.
90 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
92 bool send(std::unique_ptr<T, D>&&) const;
100 * The receiver's end of a channel used to pass objects between threads.
102 * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen.
104 template <typename T, typename D = std::default_delete<T>>
108 Receiver() = default;
109 Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) :
110 d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF)
113 Receiver(const Receiver&) = delete;
114 Receiver& operator=(const Receiver&) = delete;
115 Receiver(Receiver&&) = default;
116 Receiver& operator=(Receiver&&) = default;
117 ~Receiver() = default;
119 * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode.
121 * \return An object if one was available, and std::nullopt otherwise.
123 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
125 std::optional<std::unique_ptr<T, D>> receive();
126 std::optional<std::unique_ptr<T, D>> receive(D deleter);
129 * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available.
131 * \return A valid descriptor or -1 if the Receiver was not properly initialized.
133 int getDescriptor() const
135 return d_fd.getHandle();
138 * \brief Whether the remote end has closed the channel.
140 bool isClosed() const
147 bool d_closed{false};
148 bool d_throwOnEOF{true};
152 * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers.
154 * \return A pair of Sender and Receiver objects.
156 * \throw runtime_error if the channel creation failed.
158 template <typename T, typename D = std::default_delete<T>>
159 std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true);
162 * The notifier's end of a channel used to communicate between threads.
164 * A notifier can be used by several threads in a safe way.
169 Notifier() = default;
170 Notifier(FDWrapper&&);
171 Notifier(const Notifier&) = delete;
172 Notifier& operator=(const Notifier&) = delete;
173 Notifier(Notifier&&) = default;
174 Notifier& operator=(Notifier&&) = default;
175 ~Notifier() = default;
178 * \brief Queue a notification to wake up the other end of the channel.
180 * \return True if the notification was properly sent, False if the channel is full.
182 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
191 * The waiter's end of a channel used to communicate between threads.
193 * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen.
199 Waiter(FDWrapper&&, bool throwOnEOF = true);
200 Waiter(const Waiter&) = delete;
201 Waiter& operator=(const Waiter&) = delete;
202 Waiter(Waiter&&) = default;
203 Waiter& operator=(Waiter&&) = default;
207 * \brief Clear all notifications queued on that channel, if any.
211 * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive.
213 * \return A valid descriptor or -1 if the Waiter was not properly initialized.
215 int getDescriptor() const;
217 * \brief Whether the remote end has closed the channel.
219 bool isClosed() const
226 bool d_closed{false};
227 bool d_throwOnEOF{true};
231 * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers.
233 * \return A pair of Notifier and Sender objects.
235 * \throw runtime_error if the channel creation failed.
237 std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
239 template <typename T, typename D>
240 bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const
242 /* we cannot touch the initial unique pointer after writing to the pipe,
243 not even to release it, so let's transfer it to a local object */
244 auto localObj = std::move(object);
245 auto ptr = localObj.get();
246 static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail");
248 #if __SANITIZE_THREAD__
250 #endif /* __SANITIZE_THREAD__ */
251 ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
253 if (sent == sizeof(ptr)) {
254 // coverity[leaked_storage]
258 else if (sent == 0) {
259 #if __SANITIZE_THREAD__
261 #endif /* __SANITIZE_THREAD__ */
262 throw std::runtime_error("Unable to write to channel: remote end has been closed");
265 #if __SANITIZE_THREAD__
267 #endif /* __SANITIZE_THREAD__ */
268 if (errno == EINTR) {
271 if (errno == EAGAIN || errno == EWOULDBLOCK) {
272 object = std::move(localObj);
276 throw std::runtime_error("Unable to write to channel:" + stringerror());
282 template <typename T, typename D>
283 void Sender<T, D>::close()
288 template <typename T, typename D>
289 std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive()
294 template <typename T, typename D>
295 std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter)
298 std::optional<std::unique_ptr<T, D>> result;
300 ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr));
301 if (got == sizeof(objPtr)) {
302 #if __SANITIZE_THREAD__
303 __tsan_acquire(objPtr);
304 #endif /* __SANITIZE_THREAD__ */
305 return std::unique_ptr<T, D>(objPtr, deleter);
312 throw std::runtime_error("EOF while reading from Channel receiver");
314 else if (got == -1) {
315 if (errno == EINTR) {
318 if (errno == EAGAIN || errno == EWOULDBLOCK) {
321 throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
324 throw std::runtime_error("Partial read from Channel receiver");
329 template <typename T, typename D>
330 std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF)
332 int fds[2] = {-1, -1};
334 throw std::runtime_error("Error creating channel pipe: " + stringerror());
337 FDWrapper sender(fds[1]);
338 FDWrapper receiver(fds[0]);
339 if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) {
341 throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
344 if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) {
346 throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
349 if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) {
350 setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
353 return {Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF)};