]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/libsystemd-bus/sd-event.c
bus: add minimal event loop API
[thirdparty/systemd.git] / src / libsystemd-bus / sd-event.c
CommitLineData
fd38203a
LP
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2013 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include <sys/epoll.h>
23#include <sys/timerfd.h>
24#include <sys/wait.h>
25
26#include "macro.h"
27#include "refcnt.h"
28#include "prioq.h"
29#include "hashmap.h"
30#include "util.h"
31#include "time-util.h"
32
33#include "sd-event.h"
34
35#define EPOLL_QUEUE_MAX 64
36
37typedef enum EventSourceType {
38 SOURCE_IO,
39 SOURCE_MONOTONIC,
40 SOURCE_REALTIME,
41 SOURCE_SIGNAL,
42 SOURCE_CHILD,
43 SOURCE_DEFER
44} EventSourceType;
45
46struct sd_event_source {
47 RefCount n_ref;
48
49 sd_event *event;
50 void *userdata;
51 sd_prepare_handler_t prepare;
52
53 EventSourceType type:4;
54 sd_event_mute_t mute:3;
55 bool pending:1;
56
57 int priority;
58 unsigned pending_index;
59 unsigned prepare_index;
60 unsigned pending_iteration;
61 unsigned prepare_iteration;
62
63 union {
64 struct {
65 sd_io_handler_t callback;
66 int fd;
67 uint32_t events;
68 uint32_t revents;
69 bool registered:1;
70 } io;
71 struct {
72 sd_time_handler_t callback;
73 usec_t next;
74 unsigned prioq_index;
75 } time;
76 struct {
77 sd_signal_handler_t callback;
78 struct signalfd_siginfo siginfo;
79 int sig;
80 } signal;
81 struct {
82 sd_child_handler_t callback;
83 siginfo_t siginfo;
84 pid_t pid;
85 int options;
86 } child;
87 struct {
88 sd_defer_handler_t callback;
89 } defer;
90 };
91};
92
93struct sd_event {
94 RefCount n_ref;
95
96 int epoll_fd;
97 int signal_fd;
98 int realtime_fd;
99 int monotonic_fd;
100
101 Prioq *pending;
102 Prioq *prepare;
103 Prioq *monotonic;
104 Prioq *realtime;
105
106 sigset_t sigset;
107 sd_event_source **signal_sources;
108
109 Hashmap *child_sources;
110 unsigned n_unmuted_child_sources;
111
112 unsigned iteration;
113 unsigned processed_children;
114
115 usec_t realtime_next, monotonic_next;
116
117 bool quit;
118};
119
120static int pending_prioq_compare(const void *a, const void *b) {
121 const sd_event_source *x = a, *y = b;
122
123 assert(x->pending);
124 assert(y->pending);
125
126 /* Unmuted ones first */
127 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
128 return -1;
129 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
130 return 1;
131
132 /* Lower priority values first */
133 if (x->priority < y->priority)
134 return -1;
135 if (x->priority > y->priority)
136 return 1;
137
138 /* Older entries first */
139 if (x->pending_iteration < y->pending_iteration)
140 return -1;
141 if (x->pending_iteration > y->pending_iteration)
142 return 1;
143
144 /* Stability for the rest */
145 if (x < y)
146 return -1;
147 if (y > x)
148 return 1;
149
150 return 0;
151}
152
153static int prepare_prioq_compare(const void *a, const void *b) {
154 const sd_event_source *x = a, *y = b;
155
156 assert(x->prepare);
157 assert(y->prepare);
158
159 /* Move most recently prepared ones last, so that we can stop
160 * preparing as soon as we hit one that has already been
161 * prepared in the current iteration */
162 if (x->prepare_iteration < y->prepare_iteration)
163 return -1;
164 if (x->prepare_iteration > y->prepare_iteration)
165 return 1;
166
167 /* Unmuted ones first */
168 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
169 return -1;
170 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
171 return 1;
172
173 /* Lower priority values first */
174 if (x->priority < y->priority)
175 return -1;
176 if (x->priority > y->priority)
177 return 1;
178
179 /* Stability for the rest */
180 if (x < y)
181 return -1;
182 if (y > x)
183 return 1;
184
185 return 0;
186}
187
188static int time_prioq_compare(const void *a, const void *b) {
189 const sd_event_source *x = a, *y = b;
190
191 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
192 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
193
194 /* Unmuted ones first */
195 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
196 return -1;
197 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
198 return 1;
199
200 /* Move the pending ones to the end */
201 if (!x->pending && y->pending)
202 return -1;
203 if (x->pending && !y->pending)
204 return 1;
205
206 /* Order by time */
207 if (x->time.next < y->time.next)
208 return -1;
209 if (x->time.next > y->time.next)
210 return -1;
211
212 /* Stability for the rest */
213 if (x < y)
214 return -1;
215 if (y > x)
216 return 1;
217
218 return 0;
219}
220
221static void event_free(sd_event *e) {
222 assert(e);
223
224 if (e->epoll_fd >= 0)
225 close_nointr_nofail(e->epoll_fd);
226
227 if (e->signal_fd >= 0)
228 close_nointr_nofail(e->signal_fd);
229
230 if (e->realtime_fd >= 0)
231 close_nointr_nofail(e->realtime_fd);
232
233 if (e->monotonic_fd >= 0)
234 close_nointr_nofail(e->monotonic_fd);
235
236 prioq_free(e->pending);
237 prioq_free(e->prepare);
238 prioq_free(e->monotonic);
239 prioq_free(e->realtime);
240
241 free(e->signal_sources);
242
243 hashmap_free(e->child_sources);
244 free(e);
245}
246
247int sd_event_new(sd_event** ret) {
248 sd_event *e;
249 int r;
250
251 if (!ret)
252 return -EINVAL;
253
254 e = new0(sd_event, 1);
255 if (!e)
256 return -ENOMEM;
257
258 e->n_ref = REFCNT_INIT;
259 e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
260 e->realtime_next = e->monotonic_next = (usec_t) -1;
261
262 assert_se(sigemptyset(&e->sigset) == 0);
263
264 e->pending = prioq_new(pending_prioq_compare);
265 if (!e->pending) {
266 r = -ENOMEM;
267 goto fail;
268 }
269
270 e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
271 if (e->epoll_fd < 0) {
272 r = -errno;
273 goto fail;
274 }
275
276 *ret = e;
277 return 0;
278
279fail:
280 event_free(e);
281 return r;
282}
283
284sd_event* sd_event_ref(sd_event *e) {
285 if (!e)
286 return NULL;
287
288 assert_se(REFCNT_INC(e->n_ref) >= 2);
289
290 return e;
291}
292
293sd_event* sd_event_unref(sd_event *e) {
294 if (!e)
295 return NULL;
296
297 if (REFCNT_DEC(e->n_ref) <= 0)
298 event_free(e);
299
300 return NULL;
301}
302
303static int source_io_unregister(sd_event_source *s) {
304 int r;
305
306 assert(s);
307 assert(s->type == SOURCE_IO);
308
309 if (!s->io.registered)
310 return 0;
311
312 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
313 if (r < 0)
314 return -errno;
315
316 s->io.registered = false;
317 return 0;
318}
319
320static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) {
321 struct epoll_event ev = {};
322 int r;
323
324 assert(s);
325 assert(s->type == SOURCE_IO);
326 assert(m != SD_EVENT_MUTED);
327
328 ev.events = events;
329 ev.data.ptr = s;
330
331 if (m == SD_EVENT_ONESHOT)
332 ev.events |= EPOLLONESHOT;
333
334 if (s->io.registered)
335 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
336 else
337 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
338
339 if (r < 0)
340 return -errno;
341
342 s->io.registered = true;
343
344 return 0;
345}
346
347static void source_free(sd_event_source *s) {
348 assert(s);
349
350 if (s->event) {
351 switch (s->type) {
352
353 case SOURCE_IO:
354 if (s->io.fd >= 0)
355 source_io_unregister(s);
356
357 break;
358
359 case SOURCE_MONOTONIC:
360 prioq_remove(s->event->monotonic, s, &s->time.prioq_index);
361 break;
362
363 case SOURCE_REALTIME:
364 prioq_remove(s->event->realtime, s, &s->time.prioq_index);
365 break;
366
367 case SOURCE_SIGNAL:
368 if (s->signal.sig > 0) {
369 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)
370 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
371
372 if (s->event->signal_sources)
373 s->event->signal_sources[s->signal.sig] = NULL;
374 }
375
376 break;
377
378 case SOURCE_CHILD:
379 if (s->child.pid > 0) {
380 if (s->mute != SD_EVENT_MUTED) {
381 assert(s->event->n_unmuted_child_sources > 0);
382 s->event->n_unmuted_child_sources--;
383 }
384
385 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
386 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
387
388 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
389 }
390
391 break;
392 }
393
394 if (s->pending)
395 prioq_remove(s->event->pending, s, &s->pending_index);
396
397 if (s->prepare)
398 prioq_remove(s->event->prepare, s, &s->prepare_index);
399
400 sd_event_unref(s->event);
401 }
402
403 free(s);
404}
405
406static int source_set_pending(sd_event_source *s, bool b) {
407 int r;
408
409 assert(s);
410
411 if (s->pending == b)
412 return 0;
413
414 s->pending = b;
415
416 if (b) {
417 s->pending_iteration = s->event->iteration;
418
419 r = prioq_put(s->event->pending, s, &s->pending_index);
420 if (r < 0) {
421 s->pending = false;
422 return r;
423 }
424 } else
425 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
426
427 return 0;
428}
429
430static sd_event_source *source_new(sd_event *e, EventSourceType type) {
431 sd_event_source *s;
432
433 assert(e);
434
435 s = new0(sd_event_source, 1);
436 if (!s)
437 return NULL;
438
439 s->n_ref = REFCNT_INIT;
440 s->event = sd_event_ref(e);
441 s->type = type;
442 s->mute = SD_EVENT_UNMUTED;
443 s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
444
445 return s;
446}
447
448int sd_event_add_io(
449 sd_event *e,
450 int fd,
451 uint32_t events,
452 sd_io_handler_t callback,
453 void *userdata,
454 sd_event_source **ret) {
455
456 sd_event_source *s;
457 int r;
458
459 if (!e)
460 return -EINVAL;
461 if (fd < 0)
462 return -EINVAL;
463 if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
464 return -EINVAL;
465 if (!callback)
466 return -EINVAL;
467 if (!ret)
468 return -EINVAL;
469
470 s = source_new(e, SOURCE_IO);
471 if (!s)
472 return -ENOMEM;
473
474 s->io.fd = fd;
475 s->io.events = events;
476 s->io.callback = callback;
477 s->userdata = userdata;
478
479 r = source_io_register(s, s->mute, events);
480 if (r < 0) {
481 source_free(s);
482 return -errno;
483 }
484
485 *ret = s;
486 return 0;
487}
488
489static int event_setup_timer_fd(
490 sd_event *e,
491 EventSourceType type,
492 int *timer_fd,
493 clockid_t id) {
494
495 struct epoll_event ev = {};
496 int r, fd;
497
498 assert(e);
499 assert(timer_fd);
500
501 if (_likely_(*timer_fd >= 0))
502 return 0;
503
504 fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
505 if (fd < 0)
506 return -errno;
507
508 ev.events = EPOLLIN;
509 ev.data.ptr = INT_TO_PTR(type);
510
511 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
512 if (r < 0) {
513 close_nointr_nofail(fd);
514 return -errno;
515 }
516
517 *timer_fd = fd;
518 return 0;
519}
520
521static int event_add_time_internal(
522 sd_event *e,
523 EventSourceType type,
524 int *timer_fd,
525 clockid_t id,
526 Prioq **prioq,
527 uint64_t usec,
528 sd_time_handler_t callback,
529 void *userdata,
530 sd_event_source **ret) {
531
532 sd_event_source *s;
533 int r;
534
535 if (!e)
536 return -EINVAL;
537 if (!callback)
538 return -EINVAL;
539 if (!ret)
540 return -EINVAL;
541
542 assert(timer_fd);
543 assert(prioq);
544
545 if (!*prioq) {
546 *prioq = prioq_new(time_prioq_compare);
547 if (!*prioq)
548 return -ENOMEM;
549 }
550
551 if (*timer_fd < 0) {
552 r = event_setup_timer_fd(e, type, timer_fd, id);
553 if (r < 0)
554 return r;
555 }
556
557 s = source_new(e, type);
558 if (!s)
559 return -ENOMEM;
560
561 s->time.next = usec;
562 s->time.callback = callback;
563 s->time.prioq_index = PRIOQ_IDX_NULL;
564 s->userdata = userdata;
565
566 r = prioq_put(*prioq, s, &s->time.prioq_index);
567 if (r < 0) {
568 source_free(s);
569 return r;
570 }
571
572 *ret = s;
573 return 0;
574}
575
576int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
577 return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret);
578}
579
580int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
581 return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret);
582}
583
584static int event_update_signal_fd(sd_event *e) {
585 struct epoll_event ev = {};
586 bool add_to_epoll;
587 int r;
588
589 assert(e);
590
591 add_to_epoll = e->signal_fd < 0;
592
593 r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
594 if (r < 0)
595 return -errno;
596
597 e->signal_fd = r;
598
599 if (!add_to_epoll)
600 return 0;
601
602 ev.events = EPOLLIN;
603 ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
604
605 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
606 if (r < 0) {
607 close_nointr_nofail(e->signal_fd);
608 e->signal_fd = -1;
609
610 return -errno;
611 }
612
613 return 0;
614}
615
616int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
617 sd_event_source *s;
618 int r;
619
620 if (!e)
621 return -EINVAL;
622 if (sig <= 0)
623 return -EINVAL;
624 if (sig >= _NSIG)
625 return -EINVAL;
626 if (!callback)
627 return -EINVAL;
628 if (!ret)
629 return -EINVAL;
630
631 if (!e->signal_sources) {
632 e->signal_sources = new0(sd_event_source*, _NSIG);
633 if (!e->signal_sources)
634 return -ENOMEM;
635 } else if (e->signal_sources[sig])
636 return -EBUSY;
637
638 s = source_new(e, SOURCE_SIGNAL);
639 if (!s)
640 return -ENOMEM;
641
642 s->signal.sig = sig;
643 s->signal.callback = callback;
644 s->userdata = userdata;
645
646 e->signal_sources[sig] = s;
647 assert_se(sigaddset(&e->sigset, sig) == 0);
648
649 if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) {
650 r = event_update_signal_fd(e);
651 if (r < 0) {
652 source_free(s);
653 return r;
654 }
655 }
656
657 *ret = s;
658 return 0;
659}
660
661int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
662 sd_event_source *s;
663 int r;
664
665 if (!e)
666 return -EINVAL;
667 if (pid <= 1)
668 return -EINVAL;
669 if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
670 return -EINVAL;
671 if (!callback)
672 return -EINVAL;
673 if (!ret)
674 return -EINVAL;
675
676 r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
677 if (r < 0)
678 return r;
679
680 if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
681 return -EBUSY;
682
683 s = source_new(e, SOURCE_CHILD);
684 if (!s)
685 return -ENOMEM;
686
687 s->child.pid = pid;
688 s->child.options = options;
689 s->child.callback = callback;
690 s->userdata = userdata;
691
692 r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
693 if (r < 0) {
694 source_free(s);
695 return r;
696 }
697
698 e->n_unmuted_child_sources ++;
699
700 assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
701
702 if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
703 r = event_update_signal_fd(e);
704 if (r < 0) {
705 source_free(s);
706 return -errno;
707 }
708 }
709
710 *ret = s;
711 return 0;
712}
713
714int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
715 sd_event_source *s;
716 int r;
717
718 if (!e)
719 return -EINVAL;
720 if (!ret)
721 return -EINVAL;
722
723 s = source_new(e, SOURCE_DEFER);
724 if (!s)
725 return -ENOMEM;
726
727 s->defer.callback = callback;
728 s->userdata = userdata;
729
730 r = source_set_pending(s, true);
731 if (r < 0) {
732 source_free(s);
733 return r;
734 }
735
736 *ret = s;
737 return 0;
738}
739
740sd_event_source* sd_event_source_ref(sd_event_source *s) {
741 if (!s)
742 return NULL;
743
744 assert_se(REFCNT_INC(s->n_ref) >= 2);
745
746 return s;
747}
748
749sd_event_source* sd_event_source_unref(sd_event_source *s) {
750 if (!s)
751 return NULL;
752
753 if (REFCNT_DEC(s->n_ref) <= 0)
754 source_free(s);
755
756 return NULL;
757}
758
759int sd_event_source_get_pending(sd_event_source *s) {
760 if (!s)
761 return -EINVAL;
762
763 return s->pending;
764}
765
766int sd_event_source_get_io_fd(sd_event_source *s) {
767 if (!s)
768 return -EINVAL;
769 if (s->type != SOURCE_IO)
770 return -EDOM;
771
772 return s->io.fd;
773}
774
775int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
776 if (!s)
777 return -EINVAL;
778 if (s->type != SOURCE_IO)
779 return -EDOM;
780 if (!events)
781 return -EINVAL;
782
783 *events = s->io.events;
784 return 0;
785}
786
787int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
788 int r;
789
790 if (!s)
791 return -EINVAL;
792 if (!s->type != SOURCE_IO)
793 return -EDOM;
794 if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
795 return -EINVAL;
796
797 if (s->io.events == events)
798 return 0;
799
800 if (s->mute != SD_EVENT_MUTED) {
801 r = source_io_register(s, s->io.events, events);
802 if (r < 0)
803 return r;
804 }
805
806 s->io.events = events;
807
808 return 0;
809}
810
811int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
812 if (!s)
813 return -EINVAL;
814 if (s->type != SOURCE_IO)
815 return -EDOM;
816 if (!revents)
817 return -EINVAL;
818 if (!s->pending)
819 return -ENODATA;
820
821 *revents = s->io.revents;
822 return 0;
823}
824
825int sd_event_source_get_signal(sd_event_source *s) {
826 if (!s)
827 return -EINVAL;
828 if (s->type != SOURCE_SIGNAL)
829 return -EDOM;
830
831 return s->signal.sig;
832}
833
834int sd_event_source_get_priority(sd_event_source *s, int *priority) {
835 if (!s)
836 return -EINVAL;
837
838 return s->priority;
839}
840
841int sd_event_source_set_priority(sd_event_source *s, int priority) {
842 if (!s)
843 return -EINVAL;
844
845 if (s->priority == priority)
846 return 0;
847
848 s->priority = priority;
849
850 if (s->pending)
851 assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0);
852
853 if (s->prepare)
854 assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0);
855
856 return 0;
857}
858
859int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) {
860 if (!s)
861 return -EINVAL;
862 if (!m)
863 return -EINVAL;
864
865 *m = s->mute;
866 return 0;
867}
868
869int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
870 int r;
871
872 if (!s)
873 return -EINVAL;
874 if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT)
875 return -EINVAL;
876
877 if (s->mute == m)
878 return 0;
879
880 if (m == SD_EVENT_MUTED) {
881
882 switch (s->type) {
883
884 case SOURCE_IO:
885 r = source_io_unregister(s);
886 if (r < 0)
887 return r;
888
889 s->mute = m;
890 break;
891
892 case SOURCE_MONOTONIC:
893 s->mute = m;
894 prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
895 break;
896
897 case SOURCE_REALTIME:
898 s->mute = m;
899 prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
900 break;
901
902 case SOURCE_SIGNAL:
903 s->mute = m;
904 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
905 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
906 event_update_signal_fd(s->event);
907 }
908
909 break;
910
911 case SOURCE_CHILD:
912 s->mute = m;
913
914 assert(s->event->n_unmuted_child_sources > 0);
915 s->event->n_unmuted_child_sources--;
916
917 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
918 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
919 event_update_signal_fd(s->event);
920 }
921
922 break;
923
924 case SOURCE_DEFER:
925 s->mute = m;
926 break;
927 }
928
929 } else {
930 switch (s->type) {
931
932 case SOURCE_IO:
933 r = source_io_register(s, m, s->io.events);
934 if (r < 0)
935 return r;
936
937 s->mute = m;
938 break;
939
940 case SOURCE_MONOTONIC:
941 s->mute = m;
942 prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
943 break;
944
945 case SOURCE_REALTIME:
946 s->mute = m;
947 prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
948 break;
949
950 case SOURCE_SIGNAL:
951 s->mute = m;
952
953 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
954 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
955 event_update_signal_fd(s->event);
956 }
957 break;
958
959 case SOURCE_CHILD:
960 s->mute = m;
961
962 if (s->mute == SD_EVENT_MUTED) {
963 s->event->n_unmuted_child_sources++;
964
965 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
966 assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
967 event_update_signal_fd(s->event);
968 }
969 }
970 break;
971
972 case SOURCE_DEFER:
973 s->mute = m;
974 break;
975 }
976 }
977
978 if (s->pending)
979 prioq_reshuffle(s->event->pending, s, &s->pending_index);
980
981 if (s->prepare)
982 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
983
984 return 0;
985}
986
987int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
988 if (!s)
989 return -EINVAL;
990 if (!usec)
991 return -EINVAL;
992 if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
993 return -EDOM;
994
995 *usec = s->time.next;
996 return 0;
997}
998
999int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1000 if (!s)
1001 return -EINVAL;
1002 if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1003 return -EDOM;
1004
1005 if (s->time.next == usec)
1006 return 0;
1007
1008 s->time.next = usec;
1009
1010 if (s->type == SOURCE_REALTIME)
1011 prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
1012 else
1013 prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
1014
1015 return 0;
1016}
1017
1018int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1019 int r;
1020
1021 if (!s)
1022 return -EINVAL;
1023
1024 if (s->prepare == callback)
1025 return 0;
1026
1027 if (callback && s->prepare) {
1028 s->prepare = callback;
1029 return 0;
1030 }
1031
1032 r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1033 if (r < 0)
1034 return r;
1035
1036 s->prepare = callback;
1037
1038 if (callback) {
1039 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1040 if (r < 0)
1041 return r;
1042 } else
1043 prioq_remove(s->event->prepare, s, &s->prepare_index);
1044
1045 return 0;
1046}
1047
1048void* sd_event_source_get_userdata(sd_event_source *s) {
1049 if (!s)
1050 return NULL;
1051
1052 return s->userdata;
1053}
1054
1055static int event_arm_timer(
1056 sd_event *e,
1057 int timer_fd,
1058 Prioq *prioq,
1059 usec_t *next) {
1060
1061 struct itimerspec its = {};
1062 sd_event_source *s;
1063 int r;
1064
1065 assert_se(e);
1066 assert_se(next);
1067
1068 s = prioq_peek(prioq);
1069 if (!s || s->mute == SD_EVENT_MUTED)
1070 return 0;
1071
1072 if (*next == s->time.next)
1073 return 0;
1074
1075 assert_se(timer_fd >= 0);
1076
1077 if (s->time.next == 0) {
1078 /* We don' want to disarm here, just mean some time looooong ago. */
1079 its.it_value.tv_sec = 0;
1080 its.it_value.tv_nsec = 1;
1081 } else
1082 timespec_store(&its.it_value, s->time.next);
1083
1084 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1085 if (r < 0)
1086 return r;
1087
1088 *next = s->time.next;
1089 return 0;
1090}
1091
1092static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1093 assert(e);
1094 assert(s);
1095 assert(s->type == SOURCE_IO);
1096
1097 s->io.revents = events;
1098
1099 /*
1100 If this is a oneshot event source, then we added it to the
1101 epoll with EPOLLONESHOT, hence we know it's not registered
1102 anymore. We can save a syscall here...
1103 */
1104
1105 if (s->mute == SD_EVENT_ONESHOT)
1106 s->io.registered = false;
1107
1108 return source_set_pending(s, true);
1109}
1110
1111static int flush_timer(sd_event *e, int fd, uint32_t events) {
1112 uint64_t x;
1113 ssize_t ss;
1114
1115 assert(e);
1116
1117 if (events != EPOLLIN)
1118 return -EIO;
1119
1120 ss = read(fd, &x, sizeof(x));
1121 if (ss < 0) {
1122 if (errno == EAGAIN || errno == EINTR)
1123 return 0;
1124
1125 return -errno;
1126 }
1127
1128 if (ss != sizeof(x))
1129 return -EIO;
1130
1131 return 0;
1132}
1133
1134static int process_timer(sd_event *e, usec_t n, Prioq *prioq) {
1135 sd_event_source *s;
1136 int r;
1137
1138 assert(e);
1139
1140 for (;;) {
1141 s = prioq_peek(prioq);
1142 if (!s ||
1143 s->time.next > n ||
1144 s->mute == SD_EVENT_MUTED ||
1145 s->pending)
1146 break;
1147
1148 r = source_set_pending(s, true);
1149 if (r < 0)
1150 return r;
1151
1152 r = prioq_reshuffle(prioq, s, &s->time.prioq_index);
1153 if (r < 0)
1154 return r;
1155 }
1156
1157 return 0;
1158}
1159
1160static int process_child(sd_event *e) {
1161 sd_event_source *s;
1162 Iterator i;
1163 int r;
1164
1165 assert(e);
1166
1167 /*
1168 So, this is ugly. We iteratively invoke waitid() with P_PID
1169 + WNOHANG for each PID we wait for, instead of using
1170 P_ALL. This is because we only want to get child
1171 information of very specific child processes, and not all
1172 of them. We might not have processed the SIGCHLD even of a
1173 previous invocation and we don't want to maintain a
1174 unbounded *per-child* event queue, hence we really don't
1175 want anything flushed out of the kernel's queue that we
1176 don't care about. Since this is O(n) this means that if you
1177 have a lot of processes you probably want to handle SIGCHLD
1178 yourself.
1179 */
1180
1181 HASHMAP_FOREACH(s, e->child_sources, i) {
1182 assert(s->type == SOURCE_CHILD);
1183
1184 if (s->pending)
1185 continue;
1186
1187 if (s->mute == SD_EVENT_MUTED)
1188 continue;
1189
1190 zero(s->child.siginfo);
1191 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1192 if (r < 0)
1193 return -errno;
1194
1195 if (s->child.siginfo.si_pid != 0) {
1196 r = source_set_pending(s, true);
1197 if (r < 0)
1198 return r;
1199 }
1200 }
1201
1202 e->processed_children = e->iteration;
1203 return 0;
1204}
1205
1206static int process_signal(sd_event *e, uint32_t events) {
1207 struct signalfd_siginfo si;
1208 bool read_one = false;
1209 ssize_t ss;
1210 int r;
1211
1212 if (events != EPOLLIN)
1213 return -EIO;
1214
1215 for (;;) {
1216 sd_event_source *s;
1217
1218 ss = read(e->signal_fd, &si, sizeof(si));
1219 if (ss < 0) {
1220 if (errno == EAGAIN || errno == EINTR)
1221 return read_one;
1222
1223 return -errno;
1224 }
1225
1226 if (ss != sizeof(si))
1227 return -EIO;
1228
1229 read_one = true;
1230
1231 if (si.ssi_signo == SIGCHLD) {
1232 r = process_child(e);
1233 if (r < 0)
1234 return r;
1235 if (r > 0 || !e->signal_sources[si.ssi_signo])
1236 continue;
1237 } else {
1238 s = e->signal_sources[si.ssi_signo];
1239 if (!s)
1240 return -EIO;
1241 }
1242
1243 s->signal.siginfo = si;
1244 r = source_set_pending(s, true);
1245 if (r < 0)
1246 return r;
1247 }
1248
1249
1250 return 0;
1251}
1252
1253static int source_dispatch(sd_event_source *s) {
1254 int r;
1255
1256 assert(s);
1257 assert(s->pending);
1258
1259 r = source_set_pending(s, false);
1260 if (r < 0)
1261 return r;
1262
1263 if (s->mute == SD_EVENT_ONESHOT) {
1264 r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
1265 if (r < 0)
1266 return r;
1267 }
1268
1269 switch (s->type) {
1270
1271 case SOURCE_IO:
1272 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1273 break;
1274
1275 case SOURCE_MONOTONIC:
1276 r = s->time.callback(s, s->time.next, s->userdata);
1277 break;
1278
1279 case SOURCE_REALTIME:
1280 r = s->time.callback(s, s->time.next, s->userdata);
1281 break;
1282
1283 case SOURCE_SIGNAL:
1284 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1285 break;
1286
1287 case SOURCE_CHILD:
1288 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1289 break;
1290
1291 case SOURCE_DEFER:
1292 r = s->defer.callback(s, s->userdata);
1293 break;
1294 }
1295
1296 return r;
1297}
1298
1299static int event_prepare(sd_event *e) {
1300 int r;
1301
1302 assert(e);
1303
1304 for (;;) {
1305 sd_event_source *s;
1306
1307 s = prioq_peek(e->prepare);
1308 if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED)
1309 break;
1310
1311 s->prepare_iteration = e->iteration;
1312 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1313 if (r < 0)
1314 return r;
1315
1316 assert(s->prepare);
1317 r = s->prepare(s, s->userdata);
1318 if (r < 0)
1319 return r;
1320
1321 }
1322
1323 return 0;
1324}
1325
1326int sd_event_run(sd_event *e, uint64_t timeout) {
1327 struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1328 sd_event_source *p;
1329 int r, i, m;
1330 dual_timestamp n;
1331
1332 if (!e)
1333 return -EINVAL;
1334 if (e->quit)
1335 return -ESTALE;
1336
1337 e->iteration++;
1338
1339 r = event_prepare(e);
1340 if (r < 0)
1341 return r;
1342
1343 r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next);
1344 if (r < 0)
1345 return r;
1346
1347 r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next);
1348 if (r < 0)
1349 return r;
1350
1351 if (e->iteration == 1 && !hashmap_isempty(e->child_sources))
1352 /* On the first iteration, there might be already some
1353 * zombies for us to care for, hence, don't wait */
1354 timeout = 0;
1355 else {
1356 p = prioq_peek(e->pending);
1357 if (p && p->mute != SD_EVENT_MUTED)
1358 timeout = 0;
1359 }
1360
1361 m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1362 if (m < 0)
1363 return m;
1364
1365 dual_timestamp_get(&n);
1366
1367 for (i = 0; i < m; i++) {
1368
1369 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1370 r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1371 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1372 r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1373 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1374 r = process_signal(e, ev_queue[i].events);
1375 else
1376 r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1377
1378 if (r < 0)
1379 return r;
1380 }
1381
1382 r = process_timer(e, n.monotonic, e->monotonic);
1383 if (r < 0)
1384 return r;
1385
1386 r = process_timer(e, n.realtime, e->realtime);
1387 if (r < 0)
1388 return r;
1389
1390 if (e->iteration == 1 && e->processed_children != 1) {
1391 /* On the first iteration, make sure we really process
1392 * all children which might already be zombies. */
1393 r = process_child(e);
1394 if (r < 0)
1395 return r;
1396 }
1397
1398 p = prioq_peek(e->pending);
1399 if (!p || p->mute == SD_EVENT_MUTED)
1400 return 0;
1401
1402 return source_dispatch(p);
1403}
1404
1405int sd_event_loop(sd_event *e) {
1406 int r;
1407
1408 if (!e)
1409 return -EINVAL;
1410
1411 while (!e->quit) {
1412 r = sd_event_run(e, (uint64_t) -1);
1413 if (r < 0)
1414 return r;
1415 }
1416
1417 return 0;
1418}
1419
1420int sd_event_quit(sd_event *e) {
1421 if (!e)
1422 return EINVAL;
1423
1424 return e->quit;
1425}
1426
1427int sd_event_request_quit(sd_event *e) {
1428 if (!e)
1429 return -EINVAL;
1430
1431 e->quit = true;
1432 return 0;
1433}
1434
1435sd_event *sd_event_get(sd_event_source *s) {
1436 if (!s)
1437 return NULL;
1438
1439 return s->event;
1440}