]>
Commit | Line | Data |
---|---|---|
7f98f4ba RG |
1 | /* |
2 | * This file is part of PowerDNS or dnsdist. | |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
22 | #pragma once | |
23 | #include <memory> | |
24 | #include <optional> | |
25 | ||
26 | #include "misc.hh" | |
27 | ||
91bf355b RG |
28 | /* g++ defines __SANITIZE_THREAD__ |
29 | clang++ supports the nice __has_feature(thread_sanitizer), | |
30 | let's merge them */ | |
31 | #if defined(__has_feature) | |
32 | #if __has_feature(thread_sanitizer) | |
33 | #define __SANITIZE_THREAD__ 1 | |
34 | #endif | |
35 | #endif | |
36 | ||
37 | #if __SANITIZE_THREAD__ | |
eaf6fce1 RG |
38 | extern "C" void __tsan_acquire(void* addr); |
39 | extern "C" void __tsan_release(void* addr); | |
91bf355b RG |
40 | #endif |
41 | ||
7f98f4ba RG |
42 | namespace pdns |
43 | { | |
44 | namespace channel | |
45 | { | |
46 | /** | |
47 | * The sender's end of a channel used to pass objects between threads. | |
48 | * | |
49 | * A sender can be used by several threads in a safe way. | |
50 | */ | |
51 | template <typename T, typename D = std::default_delete<T>> | |
52 | class Sender | |
53 | { | |
54 | public: | |
55 | Sender() | |
56 | { | |
57 | } | |
58 | Sender(FDWrapper&& fd) : | |
59 | d_fd(std::move(fd)) | |
60 | { | |
61 | } | |
62 | Sender(const Sender&) = delete; | |
63 | Sender& operator=(const Sender&) = delete; | |
64 | Sender(Sender&&) = default; | |
65 | Sender& operator=(Sender&&) = default; | |
66 | /** | |
67 | * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode. | |
68 | * | |
69 | * \return True if the object was properly sent, False if the channel is full. | |
70 | * | |
71 | * \throw runtime_error if the channel is broken, for example if the other end has been closed. | |
72 | */ | |
73 | bool send(std::unique_ptr<T, D>&&) const; | |
5841c6fd | 74 | void close(); |
7f98f4ba RG |
75 | |
76 | private: | |
77 | FDWrapper d_fd; | |
78 | }; | |
79 | ||
80 | /** | |
81 | * The receiver's end of a channel used to pass objects between threads. | |
82 | * | |
83 | * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen. | |
84 | */ | |
85 | template <typename T, typename D = std::default_delete<T>> | |
86 | class Receiver | |
87 | { | |
88 | public: | |
89 | Receiver() | |
90 | { | |
91 | } | |
5841c6fd RG |
92 | Receiver(FDWrapper&& fd, bool throwOnEOF = true) : |
93 | d_fd(std::move(fd)), d_throwOnEOF(throwOnEOF) | |
7f98f4ba RG |
94 | { |
95 | } | |
96 | Receiver(const Receiver&) = delete; | |
97 | Receiver& operator=(const Receiver&) = delete; | |
98 | Receiver(Receiver&&) = default; | |
99 | Receiver& operator=(Receiver&&) = default; | |
100 | /** | |
101 | * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode. | |
102 | * | |
103 | * \return An object if one was available, and std::nullopt otherwise. | |
104 | * | |
105 | * \throw runtime_error if the channel is broken, for example if the other end has been closed. | |
106 | */ | |
5841c6fd RG |
107 | std::optional<std::unique_ptr<T, D>> receive(); |
108 | std::optional<std::unique_ptr<T, D>> receive(D deleter); | |
7f98f4ba RG |
109 | |
110 | /** | |
111 | * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available. | |
112 | * | |
113 | * \return A valid descriptor or -1 if the Receiver was not properly initialized. | |
114 | */ | |
115 | int getDescriptor() const | |
116 | { | |
117 | return d_fd.getHandle(); | |
118 | } | |
5841c6fd RG |
119 | /** |
120 | * \brief Whether the remote end has closed the channel. | |
121 | */ | |
122 | bool isClosed() const | |
123 | { | |
124 | return d_closed; | |
125 | } | |
7f98f4ba RG |
126 | |
127 | private: | |
128 | FDWrapper d_fd; | |
5841c6fd RG |
129 | bool d_closed{false}; |
130 | bool d_throwOnEOF{true}; | |
7f98f4ba RG |
131 | }; |
132 | ||
133 | /** | |
134 | * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers. | |
135 | * | |
136 | * \return A pair of Sender and Receiver objects. | |
137 | * | |
138 | * \throw runtime_error if the channel creation failed. | |
139 | */ | |
140 | template <typename T, typename D = std::default_delete<T>> | |
1adbac74 | 141 | std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking = true, bool receiveNonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true); |
7f98f4ba RG |
142 | |
143 | /** | |
144 | * The notifier's end of a channel used to communicate between threads. | |
145 | * | |
146 | * A notifier can be used by several threads in a safe way. | |
147 | */ | |
148 | class Notifier | |
149 | { | |
150 | public: | |
151 | Notifier() | |
152 | { | |
153 | } | |
154 | Notifier(FDWrapper&&); | |
155 | Notifier(const Notifier&) = delete; | |
156 | Notifier& operator=(const Notifier&) = delete; | |
157 | Notifier(Notifier&&) = default; | |
158 | Notifier& operator=(Notifier&&) = default; | |
159 | ||
160 | /** | |
161 | * \brief Queue a notification to wake up the other end of the channel. | |
162 | * | |
163 | * \return True if the notification was properly sent, False if the channel is full. | |
164 | * | |
165 | * \throw runtime_error if the channel is broken, for example if the other end has been closed. | |
166 | */ | |
167 | bool notify() const; | |
168 | ||
169 | private: | |
170 | FDWrapper d_fd; | |
171 | }; | |
172 | ||
173 | /** | |
174 | * The waiter's end of a channel used to communicate between threads. | |
175 | * | |
176 | * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen. | |
177 | */ | |
178 | class Waiter | |
179 | { | |
180 | public: | |
5841c6fd | 181 | Waiter(FDWrapper&&, bool throwOnEOF = true); |
7f98f4ba RG |
182 | Waiter(const Waiter&) = delete; |
183 | Waiter& operator=(const Waiter&) = delete; | |
184 | Waiter(Waiter&&) = default; | |
185 | Waiter& operator=(Waiter&&) = default; | |
186 | ||
187 | /** | |
188 | * \brief Clear all notifications queued on that channel, if any. | |
189 | */ | |
190 | void clear() const; | |
191 | /** | |
192 | * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive. | |
193 | * | |
194 | * \return A valid descriptor or -1 if the Waiter was not properly initialized. | |
195 | */ | |
196 | int getDescriptor() const; | |
5841c6fd RG |
197 | /** |
198 | * \brief Whether the remote end has closed the channel. | |
199 | */ | |
200 | bool isClosed() const | |
201 | { | |
202 | return d_closed; | |
203 | } | |
7f98f4ba RG |
204 | |
205 | private: | |
206 | FDWrapper d_fd; | |
5841c6fd RG |
207 | bool d_closed{false}; |
208 | bool d_throwOnEOF{true}; | |
7f98f4ba RG |
209 | }; |
210 | ||
211 | /** | |
212 | * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers. | |
213 | * | |
214 | * \return A pair of Notifier and Sender objects. | |
215 | * | |
216 | * \throw runtime_error if the channel creation failed. | |
217 | */ | |
5841c6fd | 218 | std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true); |
7f98f4ba RG |
219 | |
220 | template <typename T, typename D> | |
221 | bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const | |
222 | { | |
223 | // we do not release right away because we might need the custom deleter later | |
224 | auto ptr = object.get(); | |
225 | static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail"); | |
5841c6fd | 226 | while (true) { |
91bf355b RG |
227 | #if __SANITIZE_THREAD__ |
228 | __tsan_release(ptr); | |
229 | #endif /* __SANITIZE_THREAD__ */ | |
5841c6fd | 230 | ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr)); |
7f98f4ba | 231 | |
5841c6fd RG |
232 | if (sent == sizeof(ptr)) { |
233 | // we cannot touch it anymore | |
234 | object.release(); | |
235 | return true; | |
7f98f4ba | 236 | } |
43ec3a21 RG |
237 | else if (sent == 0) { |
238 | throw std::runtime_error("Unable to write to channel: remote end has been closed"); | |
239 | } | |
7f98f4ba | 240 | else { |
91bf355b | 241 | #if __SANITIZE_THREAD__ |
eaf6fce1 | 242 | __tsan_acquire(ptr); |
91bf355b | 243 | #endif /* __SANITIZE_THREAD__ */ |
5841c6fd RG |
244 | if (errno == EINTR) { |
245 | continue; | |
246 | } | |
247 | if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
248 | return false; | |
249 | } | |
250 | else { | |
251 | throw std::runtime_error("Unable to write to channel:" + stringerror()); | |
252 | } | |
7f98f4ba RG |
253 | } |
254 | } | |
5841c6fd | 255 | } |
7f98f4ba | 256 | |
5841c6fd RG |
257 | template <typename T, typename D> |
258 | void Sender<T, D>::close() | |
259 | { | |
260 | d_fd.reset(); | |
7f98f4ba RG |
261 | } |
262 | ||
263 | template <typename T, typename D> | |
5841c6fd | 264 | std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive() |
7f98f4ba | 265 | { |
c3be1471 | 266 | return receive(D()); |
7f98f4ba RG |
267 | } |
268 | ||
269 | template <typename T, typename D> | |
5841c6fd | 270 | std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter) |
7f98f4ba | 271 | { |
5841c6fd RG |
272 | while (true) { |
273 | std::optional<std::unique_ptr<T, D>> result; | |
f6abb8ae RG |
274 | T* objPtr{nullptr}; |
275 | ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr)); | |
276 | if (got == sizeof(objPtr)) { | |
91bf355b | 277 | #if __SANITIZE_THREAD__ |
f6abb8ae | 278 | __tsan_acquire(objPtr); |
91bf355b | 279 | #endif /* __SANITIZE_THREAD__ */ |
f6abb8ae | 280 | return std::unique_ptr<T, D>(objPtr, deleter); |
5841c6fd RG |
281 | } |
282 | else if (got == 0) { | |
283 | d_closed = true; | |
284 | if (!d_throwOnEOF) { | |
285 | return result; | |
286 | } | |
287 | throw std::runtime_error("EOF while reading from Channel receiver"); | |
288 | } | |
289 | else if (got == -1) { | |
290 | if (errno == EINTR) { | |
291 | continue; | |
292 | } | |
293 | if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
294 | return result; | |
295 | } | |
296 | throw std::runtime_error("Error while reading from Channel receiver: " + stringerror()); | |
297 | } | |
298 | else { | |
299 | throw std::runtime_error("Partial read from Channel receiver"); | |
7f98f4ba | 300 | } |
7f98f4ba RG |
301 | } |
302 | } | |
303 | ||
304 | template <typename T, typename D> | |
5841c6fd | 305 | std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking, bool receiveNonBlocking, size_t pipeBufferSize, bool throwOnEOF) |
7f98f4ba RG |
306 | { |
307 | int fds[2] = {-1, -1}; | |
308 | if (pipe(fds) < 0) { | |
309 | throw std::runtime_error("Error creating channel pipe: " + stringerror()); | |
310 | } | |
311 | ||
312 | FDWrapper sender(fds[1]); | |
313 | FDWrapper receiver(fds[0]); | |
314 | ||
5841c6fd | 315 | if (receiveNonBlocking && !setNonBlocking(receiver.getHandle())) { |
7f98f4ba RG |
316 | int err = errno; |
317 | throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); | |
318 | } | |
319 | ||
5841c6fd | 320 | if (sendNonBlocking && !setNonBlocking(sender.getHandle())) { |
7f98f4ba RG |
321 | int err = errno; |
322 | throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); | |
323 | } | |
324 | ||
325 | if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) { | |
326 | setPipeBufferSize(receiver.getHandle(), pipeBufferSize); | |
327 | } | |
328 | ||
5841c6fd | 329 | return std::pair(Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF)); |
7f98f4ba RG |
330 | } |
331 | } | |
332 | } |