]>
Commit | Line | Data |
---|---|---|
7f98f4ba RG |
1 | /* |
2 | * This file is part of PowerDNS or dnsdist. | |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
22 | #pragma once | |
23 | #include <memory> | |
24 | #include <optional> | |
25 | ||
26 | #include "misc.hh" | |
27 | ||
91bf355b RG |
28 | /* g++ defines __SANITIZE_THREAD__ |
29 | clang++ supports the nice __has_feature(thread_sanitizer), | |
30 | let's merge them */ | |
31 | #if defined(__has_feature) | |
32 | #if __has_feature(thread_sanitizer) | |
33 | #define __SANITIZE_THREAD__ 1 | |
34 | #endif | |
35 | #endif | |
36 | ||
37 | #if __SANITIZE_THREAD__ | |
04856fed RG |
38 | #if defined __has_include |
39 | #if __has_include(<sanitizer/tsan_interface.h>) | |
40 | #include <sanitizer/tsan_interface.h> | |
41 | #else /* __has_include(<sanitizer/tsan_interface.h>) */ | |
eaf6fce1 RG |
42 | extern "C" void __tsan_acquire(void* addr); |
43 | extern "C" void __tsan_release(void* addr); | |
04856fed RG |
44 | #endif /* __has_include(<sanitizer/tsan_interface.h>) */ |
45 | #else /* defined __has_include */ | |
46 | extern "C" void __tsan_acquire(void* addr); | |
47 | extern "C" void __tsan_release(void* addr); | |
48 | #endif /* defined __has_include */ | |
49 | #endif /* __SANITIZE_THREAD__ */ | |
91bf355b | 50 | |
7f98f4ba RG |
51 | namespace pdns |
52 | { | |
53 | namespace channel | |
54 | { | |
c1d76521 RG |
55 | enum class SenderBlockingMode |
56 | { | |
57 | SenderNonBlocking, | |
58 | SenderBlocking | |
59 | }; | |
60 | enum class ReceiverBlockingMode | |
61 | { | |
62 | ReceiverNonBlocking, | |
63 | ReceiverBlocking | |
64 | }; | |
65 | ||
7f98f4ba RG |
66 | /** |
67 | * The sender's end of a channel used to pass objects between threads. | |
68 | * | |
69 | * A sender can be used by several threads in a safe way. | |
70 | */ | |
71 | template <typename T, typename D = std::default_delete<T>> | |
72 | class Sender | |
73 | { | |
74 | public: | |
38a9c77a | 75 | Sender() = default; |
98769adc RG |
76 | Sender(FDWrapper&& descriptor) : |
77 | d_fd(std::move(descriptor)) | |
7f98f4ba RG |
78 | { |
79 | } | |
80 | Sender(const Sender&) = delete; | |
81 | Sender& operator=(const Sender&) = delete; | |
82 | Sender(Sender&&) = default; | |
83 | Sender& operator=(Sender&&) = default; | |
38a9c77a | 84 | ~Sender() = default; |
7f98f4ba RG |
85 | /** |
86 | * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode. | |
87 | * | |
88 | * \return True if the object was properly sent, False if the channel is full. | |
89 | * | |
90 | * \throw runtime_error if the channel is broken, for example if the other end has been closed. | |
91 | */ | |
92 | bool send(std::unique_ptr<T, D>&&) const; | |
5841c6fd | 93 | void close(); |
7f98f4ba RG |
94 | |
95 | private: | |
96 | FDWrapper d_fd; | |
97 | }; | |
98 | ||
99 | /** | |
100 | * The receiver's end of a channel used to pass objects between threads. | |
101 | * | |
102 | * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen. | |
103 | */ | |
104 | template <typename T, typename D = std::default_delete<T>> | |
105 | class Receiver | |
106 | { | |
107 | public: | |
38a9c77a | 108 | Receiver() = default; |
98769adc RG |
109 | Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) : |
110 | d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF) | |
7f98f4ba RG |
111 | { |
112 | } | |
113 | Receiver(const Receiver&) = delete; | |
114 | Receiver& operator=(const Receiver&) = delete; | |
115 | Receiver(Receiver&&) = default; | |
116 | Receiver& operator=(Receiver&&) = default; | |
38a9c77a | 117 | ~Receiver() = default; |
7f98f4ba RG |
118 | /** |
119 | * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode. | |
120 | * | |
121 | * \return An object if one was available, and std::nullopt otherwise. | |
122 | * | |
123 | * \throw runtime_error if the channel is broken, for example if the other end has been closed. | |
124 | */ | |
5841c6fd RG |
125 | std::optional<std::unique_ptr<T, D>> receive(); |
126 | std::optional<std::unique_ptr<T, D>> receive(D deleter); | |
7f98f4ba RG |
127 | |
128 | /** | |
129 | * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available. | |
130 | * | |
131 | * \return A valid descriptor or -1 if the Receiver was not properly initialized. | |
132 | */ | |
133 | int getDescriptor() const | |
134 | { | |
135 | return d_fd.getHandle(); | |
136 | } | |
5841c6fd RG |
137 | /** |
138 | * \brief Whether the remote end has closed the channel. | |
139 | */ | |
140 | bool isClosed() const | |
141 | { | |
142 | return d_closed; | |
143 | } | |
7f98f4ba RG |
144 | |
145 | private: | |
146 | FDWrapper d_fd; | |
5841c6fd RG |
147 | bool d_closed{false}; |
148 | bool d_throwOnEOF{true}; | |
7f98f4ba RG |
149 | }; |
150 | ||
151 | /** | |
152 | * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers. | |
153 | * | |
154 | * \return A pair of Sender and Receiver objects. | |
155 | * | |
156 | * \throw runtime_error if the channel creation failed. | |
157 | */ | |
158 | template <typename T, typename D = std::default_delete<T>> | |
c1d76521 | 159 | std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true); |
7f98f4ba RG |
160 | |
161 | /** | |
162 | * The notifier's end of a channel used to communicate between threads. | |
163 | * | |
164 | * A notifier can be used by several threads in a safe way. | |
165 | */ | |
166 | class Notifier | |
167 | { | |
168 | public: | |
38a9c77a | 169 | Notifier() = default; |
7f98f4ba RG |
170 | Notifier(FDWrapper&&); |
171 | Notifier(const Notifier&) = delete; | |
172 | Notifier& operator=(const Notifier&) = delete; | |
173 | Notifier(Notifier&&) = default; | |
174 | Notifier& operator=(Notifier&&) = default; | |
38a9c77a | 175 | ~Notifier() = default; |
7f98f4ba RG |
176 | |
177 | /** | |
178 | * \brief Queue a notification to wake up the other end of the channel. | |
179 | * | |
180 | * \return True if the notification was properly sent, False if the channel is full. | |
181 | * | |
182 | * \throw runtime_error if the channel is broken, for example if the other end has been closed. | |
183 | */ | |
184 | bool notify() const; | |
185 | ||
186 | private: | |
187 | FDWrapper d_fd; | |
188 | }; | |
189 | ||
190 | /** | |
191 | * The waiter's end of a channel used to communicate between threads. | |
192 | * | |
193 | * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen. | |
194 | */ | |
195 | class Waiter | |
196 | { | |
197 | public: | |
38a9c77a | 198 | Waiter() = default; |
5841c6fd | 199 | Waiter(FDWrapper&&, bool throwOnEOF = true); |
7f98f4ba RG |
200 | Waiter(const Waiter&) = delete; |
201 | Waiter& operator=(const Waiter&) = delete; | |
202 | Waiter(Waiter&&) = default; | |
203 | Waiter& operator=(Waiter&&) = default; | |
38a9c77a | 204 | ~Waiter() = default; |
7f98f4ba RG |
205 | |
206 | /** | |
207 | * \brief Clear all notifications queued on that channel, if any. | |
208 | */ | |
cd93bea9 | 209 | void clear(); |
7f98f4ba RG |
210 | /** |
211 | * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive. | |
212 | * | |
213 | * \return A valid descriptor or -1 if the Waiter was not properly initialized. | |
214 | */ | |
215 | int getDescriptor() const; | |
5841c6fd RG |
216 | /** |
217 | * \brief Whether the remote end has closed the channel. | |
218 | */ | |
219 | bool isClosed() const | |
220 | { | |
221 | return d_closed; | |
222 | } | |
7f98f4ba RG |
223 | |
224 | private: | |
225 | FDWrapper d_fd; | |
5841c6fd RG |
226 | bool d_closed{false}; |
227 | bool d_throwOnEOF{true}; | |
7f98f4ba RG |
228 | }; |
229 | ||
230 | /** | |
231 | * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers. | |
232 | * | |
233 | * \return A pair of Notifier and Sender objects. | |
234 | * | |
235 | * \throw runtime_error if the channel creation failed. | |
236 | */ | |
5841c6fd | 237 | std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true); |
7f98f4ba RG |
238 | |
239 | template <typename T, typename D> | |
240 | bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const | |
241 | { | |
465d1e15 RG |
242 | /* we cannot touch the initial unique pointer after writing to the pipe, |
243 | not even to release it, so let's transfer it to a local object */ | |
244 | auto localObj = std::move(object); | |
245 | auto ptr = localObj.get(); | |
7f98f4ba | 246 | static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail"); |
5841c6fd | 247 | while (true) { |
91bf355b RG |
248 | #if __SANITIZE_THREAD__ |
249 | __tsan_release(ptr); | |
250 | #endif /* __SANITIZE_THREAD__ */ | |
5841c6fd | 251 | ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr)); |
7f98f4ba | 252 | |
5841c6fd | 253 | if (sent == sizeof(ptr)) { |
773cd531 | 254 | // coverity[leaked_storage] |
465d1e15 | 255 | localObj.release(); |
5841c6fd | 256 | return true; |
7f98f4ba | 257 | } |
43ec3a21 | 258 | else if (sent == 0) { |
465d1e15 RG |
259 | #if __SANITIZE_THREAD__ |
260 | __tsan_acquire(ptr); | |
261 | #endif /* __SANITIZE_THREAD__ */ | |
43ec3a21 RG |
262 | throw std::runtime_error("Unable to write to channel: remote end has been closed"); |
263 | } | |
7f98f4ba | 264 | else { |
91bf355b | 265 | #if __SANITIZE_THREAD__ |
eaf6fce1 | 266 | __tsan_acquire(ptr); |
91bf355b | 267 | #endif /* __SANITIZE_THREAD__ */ |
5841c6fd RG |
268 | if (errno == EINTR) { |
269 | continue; | |
270 | } | |
271 | if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
465d1e15 | 272 | object = std::move(localObj); |
5841c6fd RG |
273 | return false; |
274 | } | |
275 | else { | |
276 | throw std::runtime_error("Unable to write to channel:" + stringerror()); | |
277 | } | |
7f98f4ba RG |
278 | } |
279 | } | |
5841c6fd | 280 | } |
7f98f4ba | 281 | |
5841c6fd RG |
282 | template <typename T, typename D> |
283 | void Sender<T, D>::close() | |
284 | { | |
285 | d_fd.reset(); | |
7f98f4ba RG |
286 | } |
287 | ||
288 | template <typename T, typename D> | |
5841c6fd | 289 | std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive() |
7f98f4ba | 290 | { |
c3be1471 | 291 | return receive(D()); |
7f98f4ba RG |
292 | } |
293 | ||
294 | template <typename T, typename D> | |
5841c6fd | 295 | std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter) |
7f98f4ba | 296 | { |
5841c6fd RG |
297 | while (true) { |
298 | std::optional<std::unique_ptr<T, D>> result; | |
f6abb8ae RG |
299 | T* objPtr{nullptr}; |
300 | ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr)); | |
301 | if (got == sizeof(objPtr)) { | |
91bf355b | 302 | #if __SANITIZE_THREAD__ |
f6abb8ae | 303 | __tsan_acquire(objPtr); |
91bf355b | 304 | #endif /* __SANITIZE_THREAD__ */ |
f6abb8ae | 305 | return std::unique_ptr<T, D>(objPtr, deleter); |
5841c6fd RG |
306 | } |
307 | else if (got == 0) { | |
308 | d_closed = true; | |
309 | if (!d_throwOnEOF) { | |
310 | return result; | |
311 | } | |
312 | throw std::runtime_error("EOF while reading from Channel receiver"); | |
313 | } | |
314 | else if (got == -1) { | |
315 | if (errno == EINTR) { | |
316 | continue; | |
317 | } | |
318 | if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
319 | return result; | |
320 | } | |
321 | throw std::runtime_error("Error while reading from Channel receiver: " + stringerror()); | |
322 | } | |
323 | else { | |
324 | throw std::runtime_error("Partial read from Channel receiver"); | |
7f98f4ba | 325 | } |
7f98f4ba RG |
326 | } |
327 | } | |
328 | ||
329 | template <typename T, typename D> | |
c1d76521 | 330 | std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF) |
7f98f4ba RG |
331 | { |
332 | int fds[2] = {-1, -1}; | |
333 | if (pipe(fds) < 0) { | |
334 | throw std::runtime_error("Error creating channel pipe: " + stringerror()); | |
335 | } | |
336 | ||
337 | FDWrapper sender(fds[1]); | |
338 | FDWrapper receiver(fds[0]); | |
c1d76521 | 339 | if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) { |
7f98f4ba RG |
340 | int err = errno; |
341 | throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); | |
342 | } | |
343 | ||
c1d76521 | 344 | if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) { |
7f98f4ba RG |
345 | int err = errno; |
346 | throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); | |
347 | } | |
348 | ||
349 | if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) { | |
350 | setPipeBufferSize(receiver.getHandle(), pipeBufferSize); | |
351 | } | |
352 | ||
0b0882f5 | 353 | return {Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF)}; |
7f98f4ba RG |
354 | } |
355 | } | |
356 | } |