]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/channel.hh
dnsdist: Fix invalid parameter name in the channel
[thirdparty/pdns.git] / pdns / channel.hh
CommitLineData
7f98f4ba
RG
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22#pragma once
23#include <memory>
24#include <optional>
25
26#include "misc.hh"
27
91bf355b
RG
28/* g++ defines __SANITIZE_THREAD__
29 clang++ supports the nice __has_feature(thread_sanitizer),
30 let's merge them */
31#if defined(__has_feature)
32#if __has_feature(thread_sanitizer)
33#define __SANITIZE_THREAD__ 1
34#endif
35#endif
36
37#if __SANITIZE_THREAD__
eaf6fce1
RG
38extern "C" void __tsan_acquire(void* addr);
39extern "C" void __tsan_release(void* addr);
91bf355b
RG
40#endif
41
7f98f4ba
RG
42namespace pdns
43{
44namespace channel
45{
46 /**
47 * The sender's end of a channel used to pass objects between threads.
48 *
49 * A sender can be used by several threads in a safe way.
50 */
51 template <typename T, typename D = std::default_delete<T>>
52 class Sender
53 {
54 public:
55 Sender()
56 {
57 }
58 Sender(FDWrapper&& fd) :
59 d_fd(std::move(fd))
60 {
61 }
62 Sender(const Sender&) = delete;
63 Sender& operator=(const Sender&) = delete;
64 Sender(Sender&&) = default;
65 Sender& operator=(Sender&&) = default;
66 /**
67 * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode.
68 *
69 * \return True if the object was properly sent, False if the channel is full.
70 *
71 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
72 */
73 bool send(std::unique_ptr<T, D>&&) const;
5841c6fd 74 void close();
7f98f4ba
RG
75
76 private:
77 FDWrapper d_fd;
78 };
79
80 /**
81 * The receiver's end of a channel used to pass objects between threads.
82 *
83 * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen.
84 */
85 template <typename T, typename D = std::default_delete<T>>
86 class Receiver
87 {
88 public:
89 Receiver()
90 {
91 }
5841c6fd
RG
92 Receiver(FDWrapper&& fd, bool throwOnEOF = true) :
93 d_fd(std::move(fd)), d_throwOnEOF(throwOnEOF)
7f98f4ba
RG
94 {
95 }
96 Receiver(const Receiver&) = delete;
97 Receiver& operator=(const Receiver&) = delete;
98 Receiver(Receiver&&) = default;
99 Receiver& operator=(Receiver&&) = default;
100 /**
101 * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode.
102 *
103 * \return An object if one was available, and std::nullopt otherwise.
104 *
105 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
106 */
5841c6fd
RG
107 std::optional<std::unique_ptr<T, D>> receive();
108 std::optional<std::unique_ptr<T, D>> receive(D deleter);
7f98f4ba
RG
109
110 /**
111 * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available.
112 *
113 * \return A valid descriptor or -1 if the Receiver was not properly initialized.
114 */
115 int getDescriptor() const
116 {
117 return d_fd.getHandle();
118 }
5841c6fd
RG
119 /**
120 * \brief Whether the remote end has closed the channel.
121 */
122 bool isClosed() const
123 {
124 return d_closed;
125 }
7f98f4ba
RG
126
127 private:
128 FDWrapper d_fd;
5841c6fd
RG
129 bool d_closed{false};
130 bool d_throwOnEOF{true};
7f98f4ba
RG
131 };
132
133 /**
134 * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers.
135 *
136 * \return A pair of Sender and Receiver objects.
137 *
138 * \throw runtime_error if the channel creation failed.
139 */
140 template <typename T, typename D = std::default_delete<T>>
1adbac74 141 std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking = true, bool receiveNonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
7f98f4ba
RG
142
143 /**
144 * The notifier's end of a channel used to communicate between threads.
145 *
146 * A notifier can be used by several threads in a safe way.
147 */
148 class Notifier
149 {
150 public:
151 Notifier()
152 {
153 }
154 Notifier(FDWrapper&&);
155 Notifier(const Notifier&) = delete;
156 Notifier& operator=(const Notifier&) = delete;
157 Notifier(Notifier&&) = default;
158 Notifier& operator=(Notifier&&) = default;
159
160 /**
161 * \brief Queue a notification to wake up the other end of the channel.
162 *
163 * \return True if the notification was properly sent, False if the channel is full.
164 *
165 * \throw runtime_error if the channel is broken, for example if the other end has been closed.
166 */
167 bool notify() const;
168
169 private:
170 FDWrapper d_fd;
171 };
172
173 /**
174 * The waiter's end of a channel used to communicate between threads.
175 *
176 * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen.
177 */
178 class Waiter
179 {
180 public:
5841c6fd 181 Waiter(FDWrapper&&, bool throwOnEOF = true);
7f98f4ba
RG
182 Waiter(const Waiter&) = delete;
183 Waiter& operator=(const Waiter&) = delete;
184 Waiter(Waiter&&) = default;
185 Waiter& operator=(Waiter&&) = default;
186
187 /**
188 * \brief Clear all notifications queued on that channel, if any.
189 */
190 void clear() const;
191 /**
192 * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive.
193 *
194 * \return A valid descriptor or -1 if the Waiter was not properly initialized.
195 */
196 int getDescriptor() const;
5841c6fd
RG
197 /**
198 * \brief Whether the remote end has closed the channel.
199 */
200 bool isClosed() const
201 {
202 return d_closed;
203 }
7f98f4ba
RG
204
205 private:
206 FDWrapper d_fd;
5841c6fd
RG
207 bool d_closed{false};
208 bool d_throwOnEOF{true};
7f98f4ba
RG
209 };
210
211 /**
212 * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers.
213 *
214 * \return A pair of Notifier and Sender objects.
215 *
216 * \throw runtime_error if the channel creation failed.
217 */
5841c6fd 218 std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
7f98f4ba
RG
219
220 template <typename T, typename D>
221 bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const
222 {
223 // we do not release right away because we might need the custom deleter later
224 auto ptr = object.get();
225 static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail");
5841c6fd 226 while (true) {
91bf355b
RG
227#if __SANITIZE_THREAD__
228 __tsan_release(ptr);
229#endif /* __SANITIZE_THREAD__ */
5841c6fd 230 ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
7f98f4ba 231
5841c6fd
RG
232 if (sent == sizeof(ptr)) {
233 // we cannot touch it anymore
234 object.release();
235 return true;
7f98f4ba 236 }
43ec3a21
RG
237 else if (sent == 0) {
238 throw std::runtime_error("Unable to write to channel: remote end has been closed");
239 }
7f98f4ba 240 else {
91bf355b 241#if __SANITIZE_THREAD__
eaf6fce1 242 __tsan_acquire(ptr);
91bf355b 243#endif /* __SANITIZE_THREAD__ */
5841c6fd
RG
244 if (errno == EINTR) {
245 continue;
246 }
247 if (errno == EAGAIN || errno == EWOULDBLOCK) {
248 return false;
249 }
250 else {
251 throw std::runtime_error("Unable to write to channel:" + stringerror());
252 }
7f98f4ba
RG
253 }
254 }
5841c6fd 255 }
7f98f4ba 256
5841c6fd
RG
257 template <typename T, typename D>
258 void Sender<T, D>::close()
259 {
260 d_fd.reset();
7f98f4ba
RG
261 }
262
263 template <typename T, typename D>
5841c6fd 264 std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive()
7f98f4ba 265 {
c3be1471 266 return receive(D());
7f98f4ba
RG
267 }
268
269 template <typename T, typename D>
5841c6fd 270 std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter)
7f98f4ba 271 {
5841c6fd
RG
272 while (true) {
273 std::optional<std::unique_ptr<T, D>> result;
f6abb8ae
RG
274 T* objPtr{nullptr};
275 ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr));
276 if (got == sizeof(objPtr)) {
91bf355b 277#if __SANITIZE_THREAD__
f6abb8ae 278 __tsan_acquire(objPtr);
91bf355b 279#endif /* __SANITIZE_THREAD__ */
f6abb8ae 280 return std::unique_ptr<T, D>(objPtr, deleter);
5841c6fd
RG
281 }
282 else if (got == 0) {
283 d_closed = true;
284 if (!d_throwOnEOF) {
285 return result;
286 }
287 throw std::runtime_error("EOF while reading from Channel receiver");
288 }
289 else if (got == -1) {
290 if (errno == EINTR) {
291 continue;
292 }
293 if (errno == EAGAIN || errno == EWOULDBLOCK) {
294 return result;
295 }
296 throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
297 }
298 else {
299 throw std::runtime_error("Partial read from Channel receiver");
7f98f4ba 300 }
7f98f4ba
RG
301 }
302 }
303
304 template <typename T, typename D>
5841c6fd 305 std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking, bool receiveNonBlocking, size_t pipeBufferSize, bool throwOnEOF)
7f98f4ba
RG
306 {
307 int fds[2] = {-1, -1};
308 if (pipe(fds) < 0) {
309 throw std::runtime_error("Error creating channel pipe: " + stringerror());
310 }
311
312 FDWrapper sender(fds[1]);
313 FDWrapper receiver(fds[0]);
314
5841c6fd 315 if (receiveNonBlocking && !setNonBlocking(receiver.getHandle())) {
7f98f4ba
RG
316 int err = errno;
317 throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
318 }
319
5841c6fd 320 if (sendNonBlocking && !setNonBlocking(sender.getHandle())) {
7f98f4ba
RG
321 int err = errno;
322 throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
323 }
324
325 if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) {
326 setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
327 }
328
5841c6fd 329 return std::pair(Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF));
7f98f4ba
RG
330 }
331}
332}