]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/basic/barrier.c
util-lib: split out fd-related operations into fd-util.[ch]
[thirdparty/systemd.git] / src / basic / barrier.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 David Herrmann <dh.herrmann@gmail.com>
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <poll.h>
25 #include <stdbool.h>
26 #include <stdint.h>
27 #include <stdlib.h>
28 #include <sys/eventfd.h>
29 #include <sys/types.h>
30 #include <unistd.h>
31
32 #include "barrier.h"
33 #include "fd-util.h"
34 #include "macro.h"
35 #include "util.h"
36
37 /**
38 * Barriers
39 * This barrier implementation provides a simple synchronization method based
40 * on file-descriptors that can safely be used between threads and processes. A
41 * barrier object contains 2 shared counters based on eventfd. Both processes
42 * can now place barriers and wait for the other end to reach a random or
43 * specific barrier.
44 * Barriers are numbered, so you can either wait for the other end to reach any
45 * barrier or the last barrier that you placed. This way, you can use barriers
46 * for one-way *and* full synchronization. Note that even-though barriers are
47 * numbered, these numbers are internal and recycled once both sides reached the
48 * same barrier (implemented as a simple signed counter). It is thus not
49 * possible to address barriers by their ID.
50 *
51 * Barrier-API: Both ends can place as many barriers via barrier_place() as
52 * they want and each pair of barriers on both sides will be implicitly linked.
53 * Each side can use the barrier_wait/sync_*() family of calls to wait for the
54 * other side to place a specific barrier. barrier_wait_next() waits until the
55 * other side calls barrier_place(). No links between the barriers are
56 * considered and this simply serves as most basic asynchronous barrier.
57 * barrier_sync_next() is like barrier_wait_next() and waits for the other side
58 * to place their next barrier via barrier_place(). However, it only waits for
59 * barriers that are linked to a barrier we already placed. If the other side
60 * already placed more barriers than we did, barrier_sync_next() returns
61 * immediately.
62 * barrier_sync() extends barrier_sync_next() and waits until the other end
63 * placed as many barriers via barrier_place() as we did. If they already placed
64 * as many as we did (or more), it returns immediately.
65 *
66 * Additionally to basic barriers, an abortion event is available.
67 * barrier_abort() places an abortion event that cannot be undone. An abortion
68 * immediately cancels all placed barriers and replaces them. Any running and
69 * following wait/sync call besides barrier_wait_abortion() will immediately
70 * return false on both sides (otherwise, they always return true).
71 * barrier_abort() can be called multiple times on both ends and will be a
72 * no-op if already called on this side.
73 * barrier_wait_abortion() can be used to wait for the other side to call
74 * barrier_abort() and is the only wait/sync call that does not return
75 * immediately if we aborted outself. It only returns once the other side
76 * called barrier_abort().
77 *
78 * Barriers can be used for in-process and inter-process synchronization.
79 * However, for in-process synchronization you could just use mutexes.
80 * Therefore, main target is IPC and we require both sides to *not* share the FD
81 * table. If that's given, barriers provide target tracking: If the remote side
82 * exit()s, an abortion event is implicitly queued on the other side. This way,
83 * a sync/wait call will be woken up if the remote side crashed or exited
84 * unexpectedly. However, note that these abortion events are only queued if the
85 * barrier-queue has been drained. Therefore, it is safe to place a barrier and
86 * exit. The other side can safely wait on the barrier even though the exit
87 * queued an abortion event. Usually, the abortion event would overwrite the
88 * barrier, however, that's not true for exit-abortion events. Those are only
89 * queued if the barrier-queue is drained (thus, the receiving side has placed
90 * more barriers than the remote side).
91 */
92
93 /**
94 * barrier_create() - Initialize a barrier object
95 * @obj: barrier to initialize
96 *
97 * This initializes a barrier object. The caller is responsible of allocating
98 * the memory and keeping it valid. The memory does not have to be zeroed
99 * beforehand.
100 * Two eventfd objects are allocated for each barrier. If allocation fails, an
101 * error is returned.
102 *
103 * If this function fails, the barrier is reset to an invalid state so it is
104 * safe to call barrier_destroy() on the object regardless whether the
105 * initialization succeeded or not.
106 *
107 * The caller is responsible to destroy the object via barrier_destroy() before
108 * releasing the underlying memory.
109 *
110 * Returns: 0 on success, negative error code on failure.
111 */
112 int barrier_create(Barrier *b) {
113 _cleanup_(barrier_destroyp) Barrier *staging = b;
114 int r;
115
116 assert(b);
117
118 b->me = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
119 if (b->me < 0)
120 return -errno;
121
122 b->them = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
123 if (b->them < 0)
124 return -errno;
125
126 r = pipe2(b->pipe, O_CLOEXEC | O_NONBLOCK);
127 if (r < 0)
128 return -errno;
129
130 staging = NULL;
131 return 0;
132 }
133
134 /**
135 * barrier_destroy() - Destroy a barrier object
136 * @b: barrier to destroy or NULL
137 *
138 * This destroys a barrier object that has previously been passed to
139 * barrier_create(). The object is released and reset to invalid
140 * state. Therefore, it is safe to call barrier_destroy() multiple
141 * times or even if barrier_create() failed. However, barrier must be
142 * always initialized with BARRIER_NULL.
143 *
144 * If @b is NULL, this is a no-op.
145 */
146 void barrier_destroy(Barrier *b) {
147 if (!b)
148 return;
149
150 b->me = safe_close(b->me);
151 b->them = safe_close(b->them);
152 safe_close_pair(b->pipe);
153 b->barriers = 0;
154 }
155
156 /**
157 * barrier_set_role() - Set the local role of the barrier
158 * @b: barrier to operate on
159 * @role: role to set on the barrier
160 *
161 * This sets the roles on a barrier object. This is needed to know
162 * which side of the barrier you're on. Usually, the parent creates
163 * the barrier via barrier_create() and then calls fork() or clone().
164 * Therefore, the FDs are duplicated and the child retains the same
165 * barrier object.
166 *
167 * Both sides need to call barrier_set_role() after fork() or clone()
168 * are done. If this is not done, barriers will not work correctly.
169 *
170 * Note that barriers could be supported without fork() or clone(). However,
171 * this is currently not needed so it hasn't been implemented.
172 */
173 void barrier_set_role(Barrier *b, unsigned int role) {
174 int fd;
175
176 assert(b);
177 assert(role == BARRIER_PARENT || role == BARRIER_CHILD);
178 /* make sure this is only called once */
179 assert(b->pipe[0] >= 0 && b->pipe[1] >= 0);
180
181 if (role == BARRIER_PARENT)
182 b->pipe[1] = safe_close(b->pipe[1]);
183 else {
184 b->pipe[0] = safe_close(b->pipe[0]);
185
186 /* swap me/them for children */
187 fd = b->me;
188 b->me = b->them;
189 b->them = fd;
190 }
191 }
192
193 /* places barrier; returns false if we aborted, otherwise true */
194 static bool barrier_write(Barrier *b, uint64_t buf) {
195 ssize_t len;
196
197 /* prevent new sync-points if we already aborted */
198 if (barrier_i_aborted(b))
199 return false;
200
201 do {
202 len = write(b->me, &buf, sizeof(buf));
203 } while (len < 0 && IN_SET(errno, EAGAIN, EINTR));
204
205 if (len != sizeof(buf))
206 goto error;
207
208 /* lock if we aborted */
209 if (buf >= (uint64_t)BARRIER_ABORTION) {
210 if (barrier_they_aborted(b))
211 b->barriers = BARRIER_WE_ABORTED;
212 else
213 b->barriers = BARRIER_I_ABORTED;
214 } else if (!barrier_is_aborted(b))
215 b->barriers += buf;
216
217 return !barrier_i_aborted(b);
218
219 error:
220 /* If there is an unexpected error, we have to make this fatal. There
221 * is no way we can recover from sync-errors. Therefore, we close the
222 * pipe-ends and treat this as abortion. The other end will notice the
223 * pipe-close and treat it as abortion, too. */
224
225 safe_close_pair(b->pipe);
226 b->barriers = BARRIER_WE_ABORTED;
227 return false;
228 }
229
230 /* waits for barriers; returns false if they aborted, otherwise true */
231 static bool barrier_read(Barrier *b, int64_t comp) {
232 if (barrier_they_aborted(b))
233 return false;
234
235 while (b->barriers > comp) {
236 struct pollfd pfd[2] = {
237 { .fd = b->pipe[0] >= 0 ? b->pipe[0] : b->pipe[1],
238 .events = POLLHUP },
239 { .fd = b->them,
240 .events = POLLIN }};
241 uint64_t buf;
242 int r;
243
244 r = poll(pfd, 2, -1);
245 if (r < 0 && IN_SET(errno, EAGAIN, EINTR))
246 continue;
247 else if (r < 0)
248 goto error;
249
250 if (pfd[1].revents) {
251 ssize_t len;
252
253 /* events on @them signal new data for us */
254 len = read(b->them, &buf, sizeof(buf));
255 if (len < 0 && IN_SET(errno, EAGAIN, EINTR))
256 continue;
257
258 if (len != sizeof(buf))
259 goto error;
260 } else if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL))
261 /* POLLHUP on the pipe tells us the other side exited.
262 * We treat this as implicit abortion. But we only
263 * handle it if there's no event on the eventfd. This
264 * guarantees that exit-abortions do not overwrite real
265 * barriers. */
266 buf = BARRIER_ABORTION;
267 else
268 continue;
269
270 /* lock if they aborted */
271 if (buf >= (uint64_t)BARRIER_ABORTION) {
272 if (barrier_i_aborted(b))
273 b->barriers = BARRIER_WE_ABORTED;
274 else
275 b->barriers = BARRIER_THEY_ABORTED;
276 } else if (!barrier_is_aborted(b))
277 b->barriers -= buf;
278 }
279
280 return !barrier_they_aborted(b);
281
282 error:
283 /* If there is an unexpected error, we have to make this fatal. There
284 * is no way we can recover from sync-errors. Therefore, we close the
285 * pipe-ends and treat this as abortion. The other end will notice the
286 * pipe-close and treat it as abortion, too. */
287
288 safe_close_pair(b->pipe);
289 b->barriers = BARRIER_WE_ABORTED;
290 return false;
291 }
292
293 /**
294 * barrier_place() - Place a new barrier
295 * @b: barrier object
296 *
297 * This places a new barrier on the barrier object. If either side already
298 * aborted, this is a no-op and returns "false". Otherwise, the barrier is
299 * placed and this returns "true".
300 *
301 * Returns: true if barrier was placed, false if either side aborted.
302 */
303 bool barrier_place(Barrier *b) {
304 assert(b);
305
306 if (barrier_is_aborted(b))
307 return false;
308
309 barrier_write(b, BARRIER_SINGLE);
310 return true;
311 }
312
313 /**
314 * barrier_abort() - Abort the synchronization
315 * @b: barrier object to abort
316 *
317 * This aborts the barrier-synchronization. If barrier_abort() was already
318 * called on this side, this is a no-op. Otherwise, the barrier is put into the
319 * ABORT-state and will stay there. The other side is notified about the
320 * abortion. Any following attempt to place normal barriers or to wait on normal
321 * barriers will return immediately as "false".
322 *
323 * You can wait for the other side to call barrier_abort(), too. Use
324 * barrier_wait_abortion() for that.
325 *
326 * Returns: false if the other side already aborted, true otherwise.
327 */
328 bool barrier_abort(Barrier *b) {
329 assert(b);
330
331 barrier_write(b, BARRIER_ABORTION);
332 return !barrier_they_aborted(b);
333 }
334
335 /**
336 * barrier_wait_next() - Wait for the next barrier of the other side
337 * @b: barrier to operate on
338 *
339 * This waits until the other side places its next barrier. This is independent
340 * of any barrier-links and just waits for any next barrier of the other side.
341 *
342 * If either side aborted, this returns false.
343 *
344 * Returns: false if either side aborted, true otherwise.
345 */
346 bool barrier_wait_next(Barrier *b) {
347 assert(b);
348
349 if (barrier_is_aborted(b))
350 return false;
351
352 barrier_read(b, b->barriers - 1);
353 return !barrier_is_aborted(b);
354 }
355
356 /**
357 * barrier_wait_abortion() - Wait for the other side to abort
358 * @b: barrier to operate on
359 *
360 * This waits until the other side called barrier_abort(). This can be called
361 * regardless whether the local side already called barrier_abort() or not.
362 *
363 * If the other side has already aborted, this returns immediately.
364 *
365 * Returns: false if the local side aborted, true otherwise.
366 */
367 bool barrier_wait_abortion(Barrier *b) {
368 assert(b);
369
370 barrier_read(b, BARRIER_THEY_ABORTED);
371 return !barrier_i_aborted(b);
372 }
373
374 /**
375 * barrier_sync_next() - Wait for the other side to place a next linked barrier
376 * @b: barrier to operate on
377 *
378 * This is like barrier_wait_next() and waits for the other side to call
379 * barrier_place(). However, this only waits for linked barriers. That means, if
380 * the other side already placed more barriers than (or as much as) we did, this
381 * returns immediately instead of waiting.
382 *
383 * If either side aborted, this returns false.
384 *
385 * Returns: false if either side aborted, true otherwise.
386 */
387 bool barrier_sync_next(Barrier *b) {
388 assert(b);
389
390 if (barrier_is_aborted(b))
391 return false;
392
393 barrier_read(b, MAX((int64_t)0, b->barriers - 1));
394 return !barrier_is_aborted(b);
395 }
396
397 /**
398 * barrier_sync() - Wait for the other side to place as many barriers as we did
399 * @b: barrier to operate on
400 *
401 * This is like barrier_sync_next() but waits for the other side to call
402 * barrier_place() as often as we did (in total). If they already placed as much
403 * as we did (or more), this returns immediately instead of waiting.
404 *
405 * If either side aborted, this returns false.
406 *
407 * Returns: false if either side aborted, true otherwise.
408 */
409 bool barrier_sync(Barrier *b) {
410 assert(b);
411
412 if (barrier_is_aborted(b))
413 return false;
414
415 barrier_read(b, 0);
416 return !barrier_is_aborted(b);
417 }