]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/channel.hh
Merge pull request #14195 from rgacogne/ddist-no-assertions
[thirdparty/pdns.git] / pdns / channel.hh
CommitLineData
7f98f4ba
RG
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22#pragma once
23#include <memory>
24#include <optional>
25
26#include "misc.hh"
27
91bf355b
RG
28/* g++ defines __SANITIZE_THREAD__
29 clang++ supports the nice __has_feature(thread_sanitizer),
30 let's merge them */
31#if defined(__has_feature)
32#if __has_feature(thread_sanitizer)
33#define __SANITIZE_THREAD__ 1
34#endif
35#endif
36
37#if __SANITIZE_THREAD__
04856fed
RG
38#if defined __has_include
39#if __has_include(<sanitizer/tsan_interface.h>)
40#include <sanitizer/tsan_interface.h>
41#else /* __has_include(<sanitizer/tsan_interface.h>) */
eaf6fce1
RG
42extern "C" void __tsan_acquire(void* addr);
43extern "C" void __tsan_release(void* addr);
04856fed
RG
44#endif /* __has_include(<sanitizer/tsan_interface.h>) */
45#else /* defined __has_include */
46extern "C" void __tsan_acquire(void* addr);
47extern "C" void __tsan_release(void* addr);
48#endif /* defined __has_include */
49#endif /* __SANITIZE_THREAD__ */
91bf355b 50
7f98f4ba
RG
51namespace pdns
52{
53namespace channel
54{
c1d76521
RG
55 enum class SenderBlockingMode
56 {
57 SenderNonBlocking,
58 SenderBlocking
59 };
60 enum class ReceiverBlockingMode
61 {
62 ReceiverNonBlocking,
63 ReceiverBlocking
64 };
65
7f98f4ba
RG
66 /**
67 * The sender's end of a channel used to pass objects between threads.
68 *
69 * A sender can be used by several threads in a safe way.
70 */
71 template <typename T, typename D = std::default_delete<T>>
72 class Sender
73 {
74 public:
38a9c77a 75 Sender() = default;
98769adc
RG
76 Sender(FDWrapper&& descriptor) :
77 d_fd(std::move(descriptor))
7f98f4ba
RG
78 {
79 }
80 Sender(const Sender&) = delete;
81 Sender& operator=(const Sender&) = delete;
82 Sender(Sender&&) = default;
83 Sender& operator=(Sender&&) = default;
38a9c77a 84 ~Sender() = default;
7f98f4ba
RG
85 /**
86 * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode.
87 *
88 * \return True if the object was properly sent, False if the channel is full.
89 *
90 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
91 */
92 bool send(std::unique_ptr<T, D>&&) const;
5841c6fd 93 void close();
7f98f4ba
RG
94
95 private:
96 FDWrapper d_fd;
97 };
98
99 /**
100 * The receiver's end of a channel used to pass objects between threads.
101 *
102 * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen.
103 */
104 template <typename T, typename D = std::default_delete<T>>
105 class Receiver
106 {
107 public:
38a9c77a 108 Receiver() = default;
98769adc
RG
109 Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) :
110 d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF)
7f98f4ba
RG
111 {
112 }
113 Receiver(const Receiver&) = delete;
114 Receiver& operator=(const Receiver&) = delete;
115 Receiver(Receiver&&) = default;
116 Receiver& operator=(Receiver&&) = default;
38a9c77a 117 ~Receiver() = default;
7f98f4ba
RG
118 /**
119 * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode.
120 *
121 * \return An object if one was available, and std::nullopt otherwise.
122 *
123 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
124 */
5841c6fd
RG
125 std::optional<std::unique_ptr<T, D>> receive();
126 std::optional<std::unique_ptr<T, D>> receive(D deleter);
7f98f4ba
RG
127
128 /**
129 * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available.
130 *
131 * \return A valid descriptor or -1 if the Receiver was not properly initialized.
132 */
133 int getDescriptor() const
134 {
135 return d_fd.getHandle();
136 }
5841c6fd
RG
137 /**
138 * \brief Whether the remote end has closed the channel.
139 */
140 bool isClosed() const
141 {
142 return d_closed;
143 }
7f98f4ba
RG
144
145 private:
146 FDWrapper d_fd;
5841c6fd
RG
147 bool d_closed{false};
148 bool d_throwOnEOF{true};
7f98f4ba
RG
149 };
150
151 /**
152 * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers.
153 *
154 * \return A pair of Sender and Receiver objects.
155 *
156 * \throw runtime_error if the channel creation failed.
157 */
158 template <typename T, typename D = std::default_delete<T>>
c1d76521 159 std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true);
7f98f4ba
RG
160
161 /**
162 * The notifier's end of a channel used to communicate between threads.
163 *
164 * A notifier can be used by several threads in a safe way.
165 */
166 class Notifier
167 {
168 public:
38a9c77a 169 Notifier() = default;
7f98f4ba
RG
170 Notifier(FDWrapper&&);
171 Notifier(const Notifier&) = delete;
172 Notifier& operator=(const Notifier&) = delete;
173 Notifier(Notifier&&) = default;
174 Notifier& operator=(Notifier&&) = default;
38a9c77a 175 ~Notifier() = default;
7f98f4ba
RG
176
177 /**
178 * \brief Queue a notification to wake up the other end of the channel.
179 *
180 * \return True if the notification was properly sent, False if the channel is full.
181 *
182 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
183 */
184 bool notify() const;
185
186 private:
187 FDWrapper d_fd;
188 };
189
190 /**
191 * The waiter's end of a channel used to communicate between threads.
192 *
193 * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen.
194 */
195 class Waiter
196 {
197 public:
38a9c77a 198 Waiter() = default;
5841c6fd 199 Waiter(FDWrapper&&, bool throwOnEOF = true);
7f98f4ba
RG
200 Waiter(const Waiter&) = delete;
201 Waiter& operator=(const Waiter&) = delete;
202 Waiter(Waiter&&) = default;
203 Waiter& operator=(Waiter&&) = default;
38a9c77a 204 ~Waiter() = default;
7f98f4ba
RG
205
206 /**
207 * \brief Clear all notifications queued on that channel, if any.
208 */
cd93bea9 209 void clear();
7f98f4ba
RG
210 /**
211 * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive.
212 *
213 * \return A valid descriptor or -1 if the Waiter was not properly initialized.
214 */
215 int getDescriptor() const;
5841c6fd
RG
216 /**
217 * \brief Whether the remote end has closed the channel.
218 */
219 bool isClosed() const
220 {
221 return d_closed;
222 }
7f98f4ba
RG
223
224 private:
225 FDWrapper d_fd;
5841c6fd
RG
226 bool d_closed{false};
227 bool d_throwOnEOF{true};
7f98f4ba
RG
228 };
229
230 /**
231 * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers.
232 *
233 * \return A pair of Notifier and Sender objects.
234 *
235 * \throw runtime_error if the channel creation failed.
236 */
5841c6fd 237 std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
7f98f4ba
RG
238
239 template <typename T, typename D>
240 bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const
241 {
465d1e15
RG
242 /* we cannot touch the initial unique pointer after writing to the pipe,
243 not even to release it, so let's transfer it to a local object */
244 auto localObj = std::move(object);
245 auto ptr = localObj.get();
7f98f4ba 246 static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail");
5841c6fd 247 while (true) {
91bf355b
RG
248#if __SANITIZE_THREAD__
249 __tsan_release(ptr);
250#endif /* __SANITIZE_THREAD__ */
5841c6fd 251 ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
7f98f4ba 252
5841c6fd 253 if (sent == sizeof(ptr)) {
773cd531 254 // coverity[leaked_storage]
465d1e15 255 localObj.release();
5841c6fd 256 return true;
7f98f4ba 257 }
43ec3a21 258 else if (sent == 0) {
465d1e15
RG
259#if __SANITIZE_THREAD__
260 __tsan_acquire(ptr);
261#endif /* __SANITIZE_THREAD__ */
43ec3a21
RG
262 throw std::runtime_error("Unable to write to channel: remote end has been closed");
263 }
7f98f4ba 264 else {
91bf355b 265#if __SANITIZE_THREAD__
eaf6fce1 266 __tsan_acquire(ptr);
91bf355b 267#endif /* __SANITIZE_THREAD__ */
5841c6fd
RG
268 if (errno == EINTR) {
269 continue;
270 }
271 if (errno == EAGAIN || errno == EWOULDBLOCK) {
465d1e15 272 object = std::move(localObj);
5841c6fd
RG
273 return false;
274 }
275 else {
276 throw std::runtime_error("Unable to write to channel:" + stringerror());
277 }
7f98f4ba
RG
278 }
279 }
5841c6fd 280 }
7f98f4ba 281
5841c6fd
RG
282 template <typename T, typename D>
283 void Sender<T, D>::close()
284 {
285 d_fd.reset();
7f98f4ba
RG
286 }
287
288 template <typename T, typename D>
5841c6fd 289 std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive()
7f98f4ba 290 {
c3be1471 291 return receive(D());
7f98f4ba
RG
292 }
293
294 template <typename T, typename D>
5841c6fd 295 std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter)
7f98f4ba 296 {
5841c6fd
RG
297 while (true) {
298 std::optional<std::unique_ptr<T, D>> result;
f6abb8ae
RG
299 T* objPtr{nullptr};
300 ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr));
301 if (got == sizeof(objPtr)) {
91bf355b 302#if __SANITIZE_THREAD__
f6abb8ae 303 __tsan_acquire(objPtr);
91bf355b 304#endif /* __SANITIZE_THREAD__ */
f6abb8ae 305 return std::unique_ptr<T, D>(objPtr, deleter);
5841c6fd
RG
306 }
307 else if (got == 0) {
308 d_closed = true;
309 if (!d_throwOnEOF) {
310 return result;
311 }
312 throw std::runtime_error("EOF while reading from Channel receiver");
313 }
314 else if (got == -1) {
315 if (errno == EINTR) {
316 continue;
317 }
318 if (errno == EAGAIN || errno == EWOULDBLOCK) {
319 return result;
320 }
321 throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
322 }
323 else {
324 throw std::runtime_error("Partial read from Channel receiver");
7f98f4ba 325 }
7f98f4ba
RG
326 }
327 }
328
329 template <typename T, typename D>
c1d76521 330 std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF)
7f98f4ba
RG
331 {
332 int fds[2] = {-1, -1};
333 if (pipe(fds) < 0) {
334 throw std::runtime_error("Error creating channel pipe: " + stringerror());
335 }
336
337 FDWrapper sender(fds[1]);
338 FDWrapper receiver(fds[0]);
c1d76521 339 if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) {
7f98f4ba
RG
340 int err = errno;
341 throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
342 }
343
c1d76521 344 if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) {
7f98f4ba
RG
345 int err = errno;
346 throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
347 }
348
349 if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) {
350 setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
351 }
352
0b0882f5 353 return {Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF)};
7f98f4ba
RG
354 }
355}
356}