]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/libsystemd-bus/sd-event.c
sd-event: EPOLLONESHOT only disables event reporting after an event. The fd is still...
[thirdparty/systemd.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2013 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "prioq.h"
28 #include "hashmap.h"
29 #include "util.h"
30 #include "time-util.h"
31 #include "sd-id128.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
37
38 typedef enum EventSourceType {
39 SOURCE_IO,
40 SOURCE_MONOTONIC,
41 SOURCE_REALTIME,
42 SOURCE_SIGNAL,
43 SOURCE_CHILD,
44 SOURCE_DEFER,
45 SOURCE_QUIT
46 } EventSourceType;
47
48 struct sd_event_source {
49 unsigned n_ref;
50
51 sd_event *event;
52 void *userdata;
53 sd_prepare_handler_t prepare;
54
55 EventSourceType type:4;
56 int enabled:3;
57 bool pending:1;
58
59 int priority;
60 unsigned pending_index;
61 unsigned prepare_index;
62 unsigned pending_iteration;
63 unsigned prepare_iteration;
64
65 union {
66 struct {
67 sd_io_handler_t callback;
68 int fd;
69 uint32_t events;
70 uint32_t revents;
71 bool registered:1;
72 } io;
73 struct {
74 sd_time_handler_t callback;
75 usec_t next, accuracy;
76 unsigned earliest_index;
77 unsigned latest_index;
78 } time;
79 struct {
80 sd_signal_handler_t callback;
81 struct signalfd_siginfo siginfo;
82 int sig;
83 } signal;
84 struct {
85 sd_child_handler_t callback;
86 siginfo_t siginfo;
87 pid_t pid;
88 int options;
89 } child;
90 struct {
91 sd_defer_handler_t callback;
92 } defer;
93 struct {
94 sd_quit_handler_t callback;
95 unsigned prioq_index;
96 } quit;
97 };
98 };
99
100 struct sd_event {
101 unsigned n_ref;
102
103 int epoll_fd;
104 int signal_fd;
105 int realtime_fd;
106 int monotonic_fd;
107
108 Prioq *pending;
109 Prioq *prepare;
110
111 /* For both clocks we maintain two priority queues each, one
112 * ordered for the earliest times the events may be
113 * dispatched, and one ordered by the latest times they must
114 * have been dispatched. The range between the top entries in
115 * the two prioqs is the time window we can freely schedule
116 * wakeups in */
117 Prioq *monotonic_earliest;
118 Prioq *monotonic_latest;
119 Prioq *realtime_earliest;
120 Prioq *realtime_latest;
121
122 usec_t realtime_next, monotonic_next;
123 usec_t perturb;
124
125 sigset_t sigset;
126 sd_event_source **signal_sources;
127
128 Hashmap *child_sources;
129 unsigned n_enabled_child_sources;
130
131 Prioq *quit;
132
133 pid_t original_pid;
134
135 unsigned iteration;
136 dual_timestamp timestamp;
137 int state;
138
139 bool quit_requested:1;
140 bool need_process_child:1;
141 };
142
143 static int pending_prioq_compare(const void *a, const void *b) {
144 const sd_event_source *x = a, *y = b;
145
146 assert(x->pending);
147 assert(y->pending);
148
149 /* Enabled ones first */
150 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
151 return -1;
152 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
153 return 1;
154
155 /* Lower priority values first */
156 if (x->priority < y->priority)
157 return -1;
158 if (x->priority > y->priority)
159 return 1;
160
161 /* Older entries first */
162 if (x->pending_iteration < y->pending_iteration)
163 return -1;
164 if (x->pending_iteration > y->pending_iteration)
165 return 1;
166
167 /* Stability for the rest */
168 if (x < y)
169 return -1;
170 if (x > y)
171 return 1;
172
173 return 0;
174 }
175
176 static int prepare_prioq_compare(const void *a, const void *b) {
177 const sd_event_source *x = a, *y = b;
178
179 assert(x->prepare);
180 assert(y->prepare);
181
182 /* Move most recently prepared ones last, so that we can stop
183 * preparing as soon as we hit one that has already been
184 * prepared in the current iteration */
185 if (x->prepare_iteration < y->prepare_iteration)
186 return -1;
187 if (x->prepare_iteration > y->prepare_iteration)
188 return 1;
189
190 /* Enabled ones first */
191 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
192 return -1;
193 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
194 return 1;
195
196 /* Lower priority values first */
197 if (x->priority < y->priority)
198 return -1;
199 if (x->priority > y->priority)
200 return 1;
201
202 /* Stability for the rest */
203 if (x < y)
204 return -1;
205 if (x > y)
206 return 1;
207
208 return 0;
209 }
210
211 static int earliest_time_prioq_compare(const void *a, const void *b) {
212 const sd_event_source *x = a, *y = b;
213
214 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
215 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
216
217 /* Enabled ones first */
218 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
219 return -1;
220 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
221 return 1;
222
223 /* Move the pending ones to the end */
224 if (!x->pending && y->pending)
225 return -1;
226 if (x->pending && !y->pending)
227 return 1;
228
229 /* Order by time */
230 if (x->time.next < y->time.next)
231 return -1;
232 if (x->time.next > y->time.next)
233 return -1;
234
235 /* Stability for the rest */
236 if (x < y)
237 return -1;
238 if (x > y)
239 return 1;
240
241 return 0;
242 }
243
244 static int latest_time_prioq_compare(const void *a, const void *b) {
245 const sd_event_source *x = a, *y = b;
246
247 assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
248 (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
249
250 /* Enabled ones first */
251 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
252 return -1;
253 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
254 return 1;
255
256 /* Move the pending ones to the end */
257 if (!x->pending && y->pending)
258 return -1;
259 if (x->pending && !y->pending)
260 return 1;
261
262 /* Order by time */
263 if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
264 return -1;
265 if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
266 return -1;
267
268 /* Stability for the rest */
269 if (x < y)
270 return -1;
271 if (x > y)
272 return 1;
273
274 return 0;
275 }
276
277 static int quit_prioq_compare(const void *a, const void *b) {
278 const sd_event_source *x = a, *y = b;
279
280 assert(x->type == SOURCE_QUIT);
281 assert(y->type == SOURCE_QUIT);
282
283 /* Enabled ones first */
284 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
285 return -1;
286 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
287 return 1;
288
289 /* Lower priority values first */
290 if (x->priority < y->priority)
291 return -1;
292 if (x->priority > y->priority)
293 return 1;
294
295 /* Stability for the rest */
296 if (x < y)
297 return -1;
298 if (x > y)
299 return 1;
300
301 return 0;
302 }
303
304 static void event_free(sd_event *e) {
305 assert(e);
306
307 if (e->epoll_fd >= 0)
308 close_nointr_nofail(e->epoll_fd);
309
310 if (e->signal_fd >= 0)
311 close_nointr_nofail(e->signal_fd);
312
313 if (e->realtime_fd >= 0)
314 close_nointr_nofail(e->realtime_fd);
315
316 if (e->monotonic_fd >= 0)
317 close_nointr_nofail(e->monotonic_fd);
318
319 prioq_free(e->pending);
320 prioq_free(e->prepare);
321 prioq_free(e->monotonic_earliest);
322 prioq_free(e->monotonic_latest);
323 prioq_free(e->realtime_earliest);
324 prioq_free(e->realtime_latest);
325 prioq_free(e->quit);
326
327 free(e->signal_sources);
328
329 hashmap_free(e->child_sources);
330 free(e);
331 }
332
333 int sd_event_new(sd_event** ret) {
334 sd_event *e;
335 int r;
336
337 assert_return(ret, -EINVAL);
338
339 e = new0(sd_event, 1);
340 if (!e)
341 return -ENOMEM;
342
343 e->n_ref = 1;
344 e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
345 e->realtime_next = e->monotonic_next = (usec_t) -1;
346 e->original_pid = getpid();
347
348 assert_se(sigemptyset(&e->sigset) == 0);
349
350 e->pending = prioq_new(pending_prioq_compare);
351 if (!e->pending) {
352 r = -ENOMEM;
353 goto fail;
354 }
355
356 e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
357 if (e->epoll_fd < 0) {
358 r = -errno;
359 goto fail;
360 }
361
362 *ret = e;
363 return 0;
364
365 fail:
366 event_free(e);
367 return r;
368 }
369
370 sd_event* sd_event_ref(sd_event *e) {
371 assert_return(e, NULL);
372
373 assert(e->n_ref >= 1);
374 e->n_ref++;
375
376 return e;
377 }
378
379 sd_event* sd_event_unref(sd_event *e) {
380 assert_return(e, NULL);
381
382 assert(e->n_ref >= 1);
383 e->n_ref--;
384
385 if (e->n_ref <= 0)
386 event_free(e);
387
388 return NULL;
389 }
390
391 static bool event_pid_changed(sd_event *e) {
392 assert(e);
393
394 /* We don't support people creating am event loop and keeping
395 * it around over a fork(). Let's complain. */
396
397 return e->original_pid != getpid();
398 }
399
400 static int source_io_unregister(sd_event_source *s) {
401 int r;
402
403 assert(s);
404 assert(s->type == SOURCE_IO);
405
406 if (!s->io.registered)
407 return 0;
408
409 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
410 if (r < 0)
411 return -errno;
412
413 s->io.registered = false;
414 return 0;
415 }
416
417 static int source_io_register(
418 sd_event_source *s,
419 int enabled,
420 uint32_t events) {
421
422 struct epoll_event ev = {};
423 int r;
424
425 assert(s);
426 assert(s->type == SOURCE_IO);
427 assert(enabled != SD_EVENT_OFF);
428
429 ev.events = events;
430 ev.data.ptr = s;
431
432 if (enabled == SD_EVENT_ONESHOT)
433 ev.events |= EPOLLONESHOT;
434
435 if (s->io.registered)
436 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
437 else
438 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
439
440 if (r < 0)
441 return -errno;
442
443 s->io.registered = true;
444
445 return 0;
446 }
447
448 static void source_free(sd_event_source *s) {
449 assert(s);
450
451 if (s->event) {
452 switch (s->type) {
453
454 case SOURCE_IO:
455 if (s->io.fd >= 0)
456 source_io_unregister(s);
457
458 break;
459
460 case SOURCE_MONOTONIC:
461 prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
462 prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
463 break;
464
465 case SOURCE_REALTIME:
466 prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
467 prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
468 break;
469
470 case SOURCE_SIGNAL:
471 if (s->signal.sig > 0) {
472 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
473 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
474
475 if (s->event->signal_sources)
476 s->event->signal_sources[s->signal.sig] = NULL;
477 }
478
479 break;
480
481 case SOURCE_CHILD:
482 if (s->child.pid > 0) {
483 if (s->enabled != SD_EVENT_OFF) {
484 assert(s->event->n_enabled_child_sources > 0);
485 s->event->n_enabled_child_sources--;
486 }
487
488 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
489 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
490
491 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
492 }
493
494 break;
495
496 case SOURCE_DEFER:
497 /* nothing */
498 break;
499
500 case SOURCE_QUIT:
501 prioq_remove(s->event->quit, s, &s->quit.prioq_index);
502 break;
503 }
504
505 if (s->pending)
506 prioq_remove(s->event->pending, s, &s->pending_index);
507
508 if (s->prepare)
509 prioq_remove(s->event->prepare, s, &s->prepare_index);
510
511 sd_event_unref(s->event);
512 }
513
514 free(s);
515 }
516
517 static int source_set_pending(sd_event_source *s, bool b) {
518 int r;
519
520 assert(s);
521 assert(s->type != SOURCE_QUIT);
522
523 if (s->pending == b)
524 return 0;
525
526 s->pending = b;
527
528 if (b) {
529 s->pending_iteration = s->event->iteration;
530
531 r = prioq_put(s->event->pending, s, &s->pending_index);
532 if (r < 0) {
533 s->pending = false;
534 return r;
535 }
536 } else
537 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
538
539 return 0;
540 }
541
542 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
543 sd_event_source *s;
544
545 assert(e);
546
547 s = new0(sd_event_source, 1);
548 if (!s)
549 return NULL;
550
551 s->n_ref = 1;
552 s->event = sd_event_ref(e);
553 s->type = type;
554 s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
555
556 return s;
557 }
558
559 int sd_event_add_io(
560 sd_event *e,
561 int fd,
562 uint32_t events,
563 sd_io_handler_t callback,
564 void *userdata,
565 sd_event_source **ret) {
566
567 sd_event_source *s;
568 int r;
569
570 assert_return(e, -EINVAL);
571 assert_return(fd >= 0, -EINVAL);
572 assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
573 assert_return(callback, -EINVAL);
574 assert_return(ret, -EINVAL);
575 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
576 assert_return(!event_pid_changed(e), -ECHILD);
577
578 s = source_new(e, SOURCE_IO);
579 if (!s)
580 return -ENOMEM;
581
582 s->io.fd = fd;
583 s->io.events = events;
584 s->io.callback = callback;
585 s->userdata = userdata;
586 s->enabled = SD_EVENT_ON;
587
588 r = source_io_register(s, s->enabled, events);
589 if (r < 0) {
590 source_free(s);
591 return -errno;
592 }
593
594 *ret = s;
595 return 0;
596 }
597
598 static int event_setup_timer_fd(
599 sd_event *e,
600 EventSourceType type,
601 int *timer_fd,
602 clockid_t id) {
603
604 struct epoll_event ev = {};
605 int r, fd;
606 sd_id128_t bootid;
607
608 assert(e);
609 assert(timer_fd);
610
611 if (_likely_(*timer_fd >= 0))
612 return 0;
613
614 fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
615 if (fd < 0)
616 return -errno;
617
618 ev.events = EPOLLIN;
619 ev.data.ptr = INT_TO_PTR(type);
620
621 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
622 if (r < 0) {
623 close_nointr_nofail(fd);
624 return -errno;
625 }
626
627 /* When we sleep for longer, we try to realign the wakeup to
628 the same time wihtin each second, so that events all across
629 the system can be coalesced into a single CPU
630 wakeup. However, let's take some system-specific randomness
631 for this value, so that in a network of systems with synced
632 clocks timer events are distributed a bit. Here, we
633 calculate a perturbation usec offset from the boot ID. */
634
635 if (sd_id128_get_boot(&bootid) >= 0)
636 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
637
638 *timer_fd = fd;
639 return 0;
640 }
641
642 static int event_add_time_internal(
643 sd_event *e,
644 EventSourceType type,
645 int *timer_fd,
646 clockid_t id,
647 Prioq **earliest,
648 Prioq **latest,
649 uint64_t usec,
650 uint64_t accuracy,
651 sd_time_handler_t callback,
652 void *userdata,
653 sd_event_source **ret) {
654
655 sd_event_source *s;
656 int r;
657
658 assert_return(e, -EINVAL);
659 assert_return(callback, -EINVAL);
660 assert_return(ret, -EINVAL);
661 assert_return(usec != (uint64_t) -1, -EINVAL);
662 assert_return(accuracy != (uint64_t) -1, -EINVAL);
663 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
664 assert_return(!event_pid_changed(e), -ECHILD);
665
666 assert(timer_fd);
667 assert(earliest);
668 assert(latest);
669
670 if (!*earliest) {
671 *earliest = prioq_new(earliest_time_prioq_compare);
672 if (!*earliest)
673 return -ENOMEM;
674 }
675
676 if (!*latest) {
677 *latest = prioq_new(latest_time_prioq_compare);
678 if (!*latest)
679 return -ENOMEM;
680 }
681
682 if (*timer_fd < 0) {
683 r = event_setup_timer_fd(e, type, timer_fd, id);
684 if (r < 0)
685 return r;
686 }
687
688 s = source_new(e, type);
689 if (!s)
690 return -ENOMEM;
691
692 s->time.next = usec;
693 s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
694 s->time.callback = callback;
695 s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
696 s->userdata = userdata;
697 s->enabled = SD_EVENT_ONESHOT;
698
699 r = prioq_put(*earliest, s, &s->time.earliest_index);
700 if (r < 0)
701 goto fail;
702
703 r = prioq_put(*latest, s, &s->time.latest_index);
704 if (r < 0)
705 goto fail;
706
707 *ret = s;
708 return 0;
709
710 fail:
711 source_free(s);
712 return r;
713 }
714
715 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
716 return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
717 }
718
719 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
720 return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
721 }
722
723 static int event_update_signal_fd(sd_event *e) {
724 struct epoll_event ev = {};
725 bool add_to_epoll;
726 int r;
727
728 assert(e);
729
730 add_to_epoll = e->signal_fd < 0;
731
732 r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
733 if (r < 0)
734 return -errno;
735
736 e->signal_fd = r;
737
738 if (!add_to_epoll)
739 return 0;
740
741 ev.events = EPOLLIN;
742 ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
743
744 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
745 if (r < 0) {
746 close_nointr_nofail(e->signal_fd);
747 e->signal_fd = -1;
748
749 return -errno;
750 }
751
752 return 0;
753 }
754
755 int sd_event_add_signal(
756 sd_event *e,
757 int sig,
758 sd_signal_handler_t callback,
759 void *userdata,
760 sd_event_source **ret) {
761
762 sd_event_source *s;
763 int r;
764
765 assert_return(e, -EINVAL);
766 assert_return(sig > 0, -EINVAL);
767 assert_return(sig < _NSIG, -EINVAL);
768 assert_return(callback, -EINVAL);
769 assert_return(ret, -EINVAL);
770 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
771 assert_return(!event_pid_changed(e), -ECHILD);
772
773 if (!e->signal_sources) {
774 e->signal_sources = new0(sd_event_source*, _NSIG);
775 if (!e->signal_sources)
776 return -ENOMEM;
777 } else if (e->signal_sources[sig])
778 return -EBUSY;
779
780 s = source_new(e, SOURCE_SIGNAL);
781 if (!s)
782 return -ENOMEM;
783
784 s->signal.sig = sig;
785 s->signal.callback = callback;
786 s->userdata = userdata;
787 s->enabled = SD_EVENT_ON;
788
789 e->signal_sources[sig] = s;
790 assert_se(sigaddset(&e->sigset, sig) == 0);
791
792 if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
793 r = event_update_signal_fd(e);
794 if (r < 0) {
795 source_free(s);
796 return r;
797 }
798 }
799
800 *ret = s;
801 return 0;
802 }
803
804 int sd_event_add_child(
805 sd_event *e,
806 pid_t pid,
807 int options,
808 sd_child_handler_t callback,
809 void *userdata,
810 sd_event_source **ret) {
811
812 sd_event_source *s;
813 int r;
814
815 assert_return(e, -EINVAL);
816 assert_return(pid > 1, -EINVAL);
817 assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
818 assert_return(options != 0, -EINVAL);
819 assert_return(callback, -EINVAL);
820 assert_return(ret, -EINVAL);
821 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
822 assert_return(!event_pid_changed(e), -ECHILD);
823
824 r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
825 if (r < 0)
826 return r;
827
828 if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
829 return -EBUSY;
830
831 s = source_new(e, SOURCE_CHILD);
832 if (!s)
833 return -ENOMEM;
834
835 s->child.pid = pid;
836 s->child.options = options;
837 s->child.callback = callback;
838 s->userdata = userdata;
839 s->enabled = SD_EVENT_ONESHOT;
840
841 r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
842 if (r < 0) {
843 source_free(s);
844 return r;
845 }
846
847 e->n_enabled_child_sources ++;
848
849 assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
850
851 if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
852 r = event_update_signal_fd(e);
853 if (r < 0) {
854 source_free(s);
855 return -errno;
856 }
857 }
858
859 e->need_process_child = true;
860
861 *ret = s;
862 return 0;
863 }
864
865 int sd_event_add_defer(
866 sd_event *e,
867 sd_defer_handler_t callback,
868 void *userdata,
869 sd_event_source **ret) {
870
871 sd_event_source *s;
872 int r;
873
874 assert_return(e, -EINVAL);
875 assert_return(callback, -EINVAL);
876 assert_return(ret, -EINVAL);
877 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
878 assert_return(!event_pid_changed(e), -ECHILD);
879
880 s = source_new(e, SOURCE_DEFER);
881 if (!s)
882 return -ENOMEM;
883
884 s->defer.callback = callback;
885 s->userdata = userdata;
886 s->enabled = SD_EVENT_ONESHOT;
887
888 r = source_set_pending(s, true);
889 if (r < 0) {
890 source_free(s);
891 return r;
892 }
893
894 *ret = s;
895 return 0;
896 }
897
898 int sd_event_add_quit(
899 sd_event *e,
900 sd_quit_handler_t callback,
901 void *userdata,
902 sd_event_source **ret) {
903
904 sd_event_source *s;
905 int r;
906
907 assert_return(e, -EINVAL);
908 assert_return(callback, -EINVAL);
909 assert_return(ret, -EINVAL);
910 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
911 assert_return(!event_pid_changed(e), -ECHILD);
912
913 if (!e->quit) {
914 e->quit = prioq_new(quit_prioq_compare);
915 if (!e->quit)
916 return -ENOMEM;
917 }
918
919 s = source_new(e, SOURCE_QUIT);
920 if (!s)
921 return -ENOMEM;
922
923 s->quit.callback = callback;
924 s->userdata = userdata;
925 s->quit.prioq_index = PRIOQ_IDX_NULL;
926 s->enabled = SD_EVENT_ONESHOT;
927
928 r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
929 if (r < 0) {
930 source_free(s);
931 return r;
932 }
933
934 *ret = s;
935 return 0;
936 }
937
938 sd_event_source* sd_event_source_ref(sd_event_source *s) {
939 assert_return(s, NULL);
940
941 assert(s->n_ref >= 1);
942 s->n_ref++;
943
944 return s;
945 }
946
947 sd_event_source* sd_event_source_unref(sd_event_source *s) {
948 assert_return(s, NULL);
949
950 assert(s->n_ref >= 1);
951 s->n_ref--;
952
953 if (s->n_ref <= 0)
954 source_free(s);
955
956 return NULL;
957 }
958
959 sd_event *sd_event_get(sd_event_source *s) {
960 assert_return(s, NULL);
961
962 return s->event;
963 }
964
965 int sd_event_source_get_pending(sd_event_source *s) {
966 assert_return(s, -EINVAL);
967 assert_return(s->type != SOURCE_QUIT, -EDOM);
968 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
969 assert_return(!event_pid_changed(s->event), -ECHILD);
970
971 return s->pending;
972 }
973
974 int sd_event_source_get_io_fd(sd_event_source *s) {
975 assert_return(s, -EINVAL);
976 assert_return(s->type == SOURCE_IO, -EDOM);
977 assert_return(!event_pid_changed(s->event), -ECHILD);
978
979 return s->io.fd;
980 }
981
982 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
983 assert_return(s, -EINVAL);
984 assert_return(events, -EINVAL);
985 assert_return(s->type == SOURCE_IO, -EDOM);
986 assert_return(!event_pid_changed(s->event), -ECHILD);
987
988 *events = s->io.events;
989 return 0;
990 }
991
992 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
993 int r;
994
995 assert_return(s, -EINVAL);
996 assert_return(s->type == SOURCE_IO, -EDOM);
997 assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
998 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
999 assert_return(!event_pid_changed(s->event), -ECHILD);
1000
1001 if (s->io.events == events)
1002 return 0;
1003
1004 if (s->enabled != SD_EVENT_OFF) {
1005 r = source_io_register(s, s->enabled, events);
1006 if (r < 0)
1007 return r;
1008 }
1009
1010 s->io.events = events;
1011
1012 return 0;
1013 }
1014
1015 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1016 assert_return(s, -EINVAL);
1017 assert_return(revents, -EINVAL);
1018 assert_return(s->type == SOURCE_IO, -EDOM);
1019 assert_return(s->pending, -ENODATA);
1020 assert_return(!event_pid_changed(s->event), -ECHILD);
1021
1022 *revents = s->io.revents;
1023 return 0;
1024 }
1025
1026 int sd_event_source_get_signal(sd_event_source *s) {
1027 assert_return(s, -EINVAL);
1028 assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1029 assert_return(!event_pid_changed(s->event), -ECHILD);
1030
1031 return s->signal.sig;
1032 }
1033
1034 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1035 assert_return(s, -EINVAL);
1036 assert_return(!event_pid_changed(s->event), -ECHILD);
1037
1038 return s->priority;
1039 }
1040
1041 int sd_event_source_set_priority(sd_event_source *s, int priority) {
1042 assert_return(s, -EINVAL);
1043 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1044 assert_return(!event_pid_changed(s->event), -ECHILD);
1045
1046 if (s->priority == priority)
1047 return 0;
1048
1049 s->priority = priority;
1050
1051 if (s->pending)
1052 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1053
1054 if (s->prepare)
1055 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1056
1057 if (s->type == SOURCE_QUIT)
1058 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1059
1060 return 0;
1061 }
1062
1063 int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1064 assert_return(s, -EINVAL);
1065 assert_return(m, -EINVAL);
1066 assert_return(!event_pid_changed(s->event), -ECHILD);
1067
1068 *m = s->enabled;
1069 return 0;
1070 }
1071
1072 int sd_event_source_set_enabled(sd_event_source *s, int m) {
1073 int r;
1074
1075 assert_return(s, -EINVAL);
1076 assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1077 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1078 assert_return(!event_pid_changed(s->event), -ECHILD);
1079
1080 if (s->enabled == m)
1081 return 0;
1082
1083 if (m == SD_EVENT_OFF) {
1084
1085 switch (s->type) {
1086
1087 case SOURCE_IO:
1088 r = source_io_unregister(s);
1089 if (r < 0)
1090 return r;
1091
1092 s->enabled = m;
1093 break;
1094
1095 case SOURCE_MONOTONIC:
1096 s->enabled = m;
1097 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1098 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1099 break;
1100
1101 case SOURCE_REALTIME:
1102 s->enabled = m;
1103 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1104 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1105 break;
1106
1107 case SOURCE_SIGNAL:
1108 s->enabled = m;
1109 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1110 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1111 event_update_signal_fd(s->event);
1112 }
1113
1114 break;
1115
1116 case SOURCE_CHILD:
1117 s->enabled = m;
1118
1119 assert(s->event->n_enabled_child_sources > 0);
1120 s->event->n_enabled_child_sources--;
1121
1122 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1123 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1124 event_update_signal_fd(s->event);
1125 }
1126
1127 break;
1128
1129 case SOURCE_QUIT:
1130 s->enabled = m;
1131 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1132 break;
1133
1134 case SOURCE_DEFER:
1135 s->enabled = m;
1136 break;
1137 }
1138
1139 } else {
1140 switch (s->type) {
1141
1142 case SOURCE_IO:
1143 r = source_io_register(s, m, s->io.events);
1144 if (r < 0)
1145 return r;
1146
1147 s->enabled = m;
1148 break;
1149
1150 case SOURCE_MONOTONIC:
1151 s->enabled = m;
1152 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1153 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1154 break;
1155
1156 case SOURCE_REALTIME:
1157 s->enabled = m;
1158 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1159 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1160 break;
1161
1162 case SOURCE_SIGNAL:
1163 s->enabled = m;
1164
1165 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1166 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1167 event_update_signal_fd(s->event);
1168 }
1169 break;
1170
1171 case SOURCE_CHILD:
1172 s->enabled = m;
1173
1174 if (s->enabled == SD_EVENT_OFF) {
1175 s->event->n_enabled_child_sources++;
1176
1177 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1178 assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1179 event_update_signal_fd(s->event);
1180 }
1181 }
1182 break;
1183
1184 case SOURCE_QUIT:
1185 s->enabled = m;
1186 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1187 break;
1188
1189 case SOURCE_DEFER:
1190 s->enabled = m;
1191 break;
1192 }
1193 }
1194
1195 if (s->pending)
1196 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1197
1198 if (s->prepare)
1199 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1200
1201 return 0;
1202 }
1203
1204 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1205 assert_return(s, -EINVAL);
1206 assert_return(usec, -EINVAL);
1207 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1208 assert_return(!event_pid_changed(s->event), -ECHILD);
1209
1210 *usec = s->time.next;
1211 return 0;
1212 }
1213
1214 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1215 assert_return(s, -EINVAL);
1216 assert_return(usec != (uint64_t) -1, -EINVAL);
1217 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1218 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1219 assert_return(!event_pid_changed(s->event), -ECHILD);
1220
1221 if (s->time.next == usec)
1222 return 0;
1223
1224 s->time.next = usec;
1225
1226 if (s->type == SOURCE_REALTIME) {
1227 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1228 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1229 } else {
1230 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1231 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1232 }
1233
1234 return 0;
1235 }
1236
1237 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1238 assert_return(s, -EINVAL);
1239 assert_return(usec, -EINVAL);
1240 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1241 assert_return(!event_pid_changed(s->event), -ECHILD);
1242
1243 *usec = s->time.accuracy;
1244 return 0;
1245 }
1246
1247 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1248 assert_return(s, -EINVAL);
1249 assert_return(usec != (uint64_t) -1, -EINVAL);
1250 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1251 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1252 assert_return(!event_pid_changed(s->event), -ECHILD);
1253
1254 if (usec == 0)
1255 usec = DEFAULT_ACCURACY_USEC;
1256
1257 if (s->time.accuracy == usec)
1258 return 0;
1259
1260 s->time.accuracy = usec;
1261
1262 if (s->type == SOURCE_REALTIME)
1263 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1264 else
1265 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1266
1267 return 0;
1268 }
1269
1270 int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1271 assert_return(s, -EINVAL);
1272 assert_return(pid, -EINVAL);
1273 assert_return(s->type == SOURCE_CHILD, -EDOM);
1274 assert_return(!event_pid_changed(s->event), -ECHILD);
1275
1276 *pid = s->child.pid;
1277 return 0;
1278 }
1279
1280 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1281 int r;
1282
1283 assert_return(s, -EINVAL);
1284 assert_return(s->type != SOURCE_QUIT, -EDOM);
1285 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1286 assert_return(!event_pid_changed(s->event), -ECHILD);
1287
1288 if (s->prepare == callback)
1289 return 0;
1290
1291 if (callback && s->prepare) {
1292 s->prepare = callback;
1293 return 0;
1294 }
1295
1296 r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1297 if (r < 0)
1298 return r;
1299
1300 s->prepare = callback;
1301
1302 if (callback) {
1303 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1304 if (r < 0)
1305 return r;
1306 } else
1307 prioq_remove(s->event->prepare, s, &s->prepare_index);
1308
1309 return 0;
1310 }
1311
1312 void* sd_event_source_get_userdata(sd_event_source *s) {
1313 assert_return(s, NULL);
1314
1315 return s->userdata;
1316 }
1317
1318 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1319 usec_t c;
1320 assert(e);
1321 assert(a <= b);
1322
1323 if (a <= 0)
1324 return 0;
1325
1326 if (b <= a + 1)
1327 return a;
1328
1329 /*
1330 Find a good time to wake up again between times a and b. We
1331 have two goals here:
1332
1333 a) We want to wake up as seldom as possible, hence prefer
1334 later times over earlier times.
1335
1336 b) But if we have to wake up, then let's make sure to
1337 dispatch as much as possible on the entire system.
1338
1339 We implement this by waking up everywhere at the same time
1340 within any given second if we can, synchronised via the
1341 perturbation value determined from the boot ID. If we can't,
1342 then we try to find the same spot in every a 250ms
1343 step. Otherwise, we pick the last possible time to wake up.
1344 */
1345
1346 c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1347 if (c >= b) {
1348 if (_unlikely_(c < USEC_PER_SEC))
1349 return b;
1350
1351 c -= USEC_PER_SEC;
1352 }
1353
1354 if (c >= a)
1355 return c;
1356
1357 c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1358 if (c >= b) {
1359 if (_unlikely_(c < USEC_PER_MSEC*250))
1360 return b;
1361
1362 c -= USEC_PER_MSEC*250;
1363 }
1364
1365 if (c >= a)
1366 return c;
1367
1368 return b;
1369 }
1370
1371 static int event_arm_timer(
1372 sd_event *e,
1373 int timer_fd,
1374 Prioq *earliest,
1375 Prioq *latest,
1376 usec_t *next) {
1377
1378 struct itimerspec its = {};
1379 sd_event_source *a, *b;
1380 usec_t t;
1381 int r;
1382
1383 assert_se(e);
1384 assert_se(next);
1385
1386 a = prioq_peek(earliest);
1387 if (!a || a->enabled == SD_EVENT_OFF) {
1388
1389 if (*next == (usec_t) -1)
1390 return 0;
1391
1392 /* disarm */
1393 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1394 if (r < 0)
1395 return r;
1396
1397 *next = (usec_t) -1;
1398
1399 return 0;
1400 }
1401
1402 b = prioq_peek(latest);
1403 assert_se(b && b->enabled != SD_EVENT_OFF);
1404
1405 t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1406 if (*next == t)
1407 return 0;
1408
1409 assert_se(timer_fd >= 0);
1410
1411 if (t == 0) {
1412 /* We don' want to disarm here, just mean some time looooong ago. */
1413 its.it_value.tv_sec = 0;
1414 its.it_value.tv_nsec = 1;
1415 } else
1416 timespec_store(&its.it_value, t);
1417
1418 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1419 if (r < 0)
1420 return r;
1421
1422 *next = t;
1423 return 0;
1424 }
1425
1426 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1427 assert(e);
1428 assert(s);
1429 assert(s->type == SOURCE_IO);
1430
1431 s->io.revents = events;
1432
1433 return source_set_pending(s, true);
1434 }
1435
1436 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1437 uint64_t x;
1438 ssize_t ss;
1439
1440 assert(e);
1441 assert(fd >= 0);
1442 assert(next);
1443
1444 assert_return(events == EPOLLIN, -EIO);
1445
1446 ss = read(fd, &x, sizeof(x));
1447 if (ss < 0) {
1448 if (errno == EAGAIN || errno == EINTR)
1449 return 0;
1450
1451 return -errno;
1452 }
1453
1454 if (ss != sizeof(x))
1455 return -EIO;
1456
1457 *next = (usec_t) -1;
1458
1459 return 0;
1460 }
1461
1462 static int process_timer(
1463 sd_event *e,
1464 usec_t n,
1465 Prioq *earliest,
1466 Prioq *latest) {
1467
1468 sd_event_source *s;
1469 int r;
1470
1471 assert(e);
1472
1473 for (;;) {
1474 s = prioq_peek(earliest);
1475 if (!s ||
1476 s->time.next > n ||
1477 s->enabled == SD_EVENT_OFF ||
1478 s->pending)
1479 break;
1480
1481 r = source_set_pending(s, true);
1482 if (r < 0)
1483 return r;
1484
1485 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1486 prioq_reshuffle(latest, s, &s->time.latest_index);
1487 }
1488
1489 return 0;
1490 }
1491
1492 static int process_child(sd_event *e) {
1493 sd_event_source *s;
1494 Iterator i;
1495 int r;
1496
1497 assert(e);
1498
1499 e->need_process_child = false;
1500
1501 /*
1502 So, this is ugly. We iteratively invoke waitid() with P_PID
1503 + WNOHANG for each PID we wait for, instead of using
1504 P_ALL. This is because we only want to get child
1505 information of very specific child processes, and not all
1506 of them. We might not have processed the SIGCHLD even of a
1507 previous invocation and we don't want to maintain a
1508 unbounded *per-child* event queue, hence we really don't
1509 want anything flushed out of the kernel's queue that we
1510 don't care about. Since this is O(n) this means that if you
1511 have a lot of processes you probably want to handle SIGCHLD
1512 yourself.
1513 */
1514
1515 HASHMAP_FOREACH(s, e->child_sources, i) {
1516 assert(s->type == SOURCE_CHILD);
1517
1518 if (s->pending)
1519 continue;
1520
1521 if (s->enabled == SD_EVENT_OFF)
1522 continue;
1523
1524 zero(s->child.siginfo);
1525 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1526 if (r < 0)
1527 return -errno;
1528
1529 if (s->child.siginfo.si_pid != 0) {
1530 r = source_set_pending(s, true);
1531 if (r < 0)
1532 return r;
1533 }
1534 }
1535
1536 return 0;
1537 }
1538
1539 static int process_signal(sd_event *e, uint32_t events) {
1540 bool read_one = false;
1541 int r;
1542
1543 assert(e);
1544 assert(e->signal_sources);
1545
1546 assert_return(events == EPOLLIN, -EIO);
1547
1548 for (;;) {
1549 struct signalfd_siginfo si;
1550 ssize_t ss;
1551 sd_event_source *s;
1552
1553 ss = read(e->signal_fd, &si, sizeof(si));
1554 if (ss < 0) {
1555 if (errno == EAGAIN || errno == EINTR)
1556 return read_one;
1557
1558 return -errno;
1559 }
1560
1561 if (ss != sizeof(si))
1562 return -EIO;
1563
1564 read_one = true;
1565
1566 s = e->signal_sources[si.ssi_signo];
1567 if (si.ssi_signo == SIGCHLD) {
1568 r = process_child(e);
1569 if (r < 0)
1570 return r;
1571 if (r > 0 || !s)
1572 continue;
1573 } else
1574 if (!s)
1575 return -EIO;
1576
1577 s->signal.siginfo = si;
1578 r = source_set_pending(s, true);
1579 if (r < 0)
1580 return r;
1581 }
1582
1583
1584 return 0;
1585 }
1586
1587 static int source_dispatch(sd_event_source *s) {
1588 int r = 0;
1589
1590 assert(s);
1591 assert(s->pending || s->type == SOURCE_QUIT);
1592
1593 if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1594 r = source_set_pending(s, false);
1595 if (r < 0)
1596 return r;
1597 }
1598
1599 if (s->enabled == SD_EVENT_ONESHOT) {
1600 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1601 if (r < 0)
1602 return r;
1603 }
1604
1605 switch (s->type) {
1606
1607 case SOURCE_IO:
1608 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1609 break;
1610
1611 case SOURCE_MONOTONIC:
1612 r = s->time.callback(s, s->time.next, s->userdata);
1613 break;
1614
1615 case SOURCE_REALTIME:
1616 r = s->time.callback(s, s->time.next, s->userdata);
1617 break;
1618
1619 case SOURCE_SIGNAL:
1620 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1621 break;
1622
1623 case SOURCE_CHILD:
1624 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1625 break;
1626
1627 case SOURCE_DEFER:
1628 r = s->defer.callback(s, s->userdata);
1629 break;
1630
1631 case SOURCE_QUIT:
1632 r = s->quit.callback(s, s->userdata);
1633 break;
1634 }
1635
1636 return r;
1637 }
1638
1639 static int event_prepare(sd_event *e) {
1640 int r;
1641
1642 assert(e);
1643
1644 for (;;) {
1645 sd_event_source *s;
1646
1647 s = prioq_peek(e->prepare);
1648 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1649 break;
1650
1651 s->prepare_iteration = e->iteration;
1652 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1653 if (r < 0)
1654 return r;
1655
1656 assert(s->prepare);
1657 r = s->prepare(s, s->userdata);
1658 if (r < 0)
1659 return r;
1660
1661 }
1662
1663 return 0;
1664 }
1665
1666 static int dispatch_quit(sd_event *e) {
1667 sd_event_source *p;
1668 int r;
1669
1670 assert(e);
1671
1672 p = prioq_peek(e->quit);
1673 if (!p || p->enabled == SD_EVENT_OFF) {
1674 e->state = SD_EVENT_FINISHED;
1675 return 0;
1676 }
1677
1678 sd_event_ref(e);
1679 e->iteration++;
1680 e->state = SD_EVENT_QUITTING;
1681
1682 r = source_dispatch(p);
1683
1684 e->state = SD_EVENT_PASSIVE;
1685 sd_event_unref(e);
1686
1687 return r;
1688 }
1689
1690 static sd_event_source* event_next_pending(sd_event *e) {
1691 sd_event_source *p;
1692
1693 assert(e);
1694
1695 p = prioq_peek(e->pending);
1696 if (!p)
1697 return NULL;
1698
1699 if (p->enabled == SD_EVENT_OFF)
1700 return NULL;
1701
1702 return p;
1703 }
1704
1705 int sd_event_run(sd_event *e, uint64_t timeout) {
1706 struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1707 sd_event_source *p;
1708 int r, i, m;
1709
1710 assert_return(e, -EINVAL);
1711 assert_return(!event_pid_changed(e), -ECHILD);
1712 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1713 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1714
1715 if (e->quit_requested)
1716 return dispatch_quit(e);
1717
1718 sd_event_ref(e);
1719 e->iteration++;
1720 e->state = SD_EVENT_RUNNING;
1721
1722 r = event_prepare(e);
1723 if (r < 0)
1724 goto finish;
1725
1726 if (event_next_pending(e) || e->need_process_child)
1727 timeout = 0;
1728
1729 if (timeout > 0) {
1730 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1731 if (r < 0)
1732 goto finish;
1733
1734 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1735 if (r < 0)
1736 goto finish;
1737 }
1738
1739 m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1740 timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1741 if (m < 0) {
1742 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1743 goto finish;
1744 }
1745
1746 dual_timestamp_get(&e->timestamp);
1747
1748 for (i = 0; i < m; i++) {
1749
1750 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1751 r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1752 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1753 r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1754 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1755 r = process_signal(e, ev_queue[i].events);
1756 else
1757 r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1758
1759 if (r < 0)
1760 goto finish;
1761 }
1762
1763 r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1764 if (r < 0)
1765 goto finish;
1766
1767 r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1768 if (r < 0)
1769 goto finish;
1770
1771 if (e->need_process_child) {
1772 r = process_child(e);
1773 if (r < 0)
1774 goto finish;
1775 }
1776
1777 p = event_next_pending(e);
1778 if (!p) {
1779 r = 0;
1780 goto finish;
1781 }
1782
1783 r = source_dispatch(p);
1784
1785 finish:
1786 e->state = SD_EVENT_PASSIVE;
1787 sd_event_unref(e);
1788
1789 return r;
1790 }
1791
1792 int sd_event_loop(sd_event *e) {
1793 int r;
1794
1795 assert_return(e, -EINVAL);
1796 assert_return(!event_pid_changed(e), -ECHILD);
1797 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1798
1799 sd_event_ref(e);
1800
1801 while (e->state != SD_EVENT_FINISHED) {
1802 r = sd_event_run(e, (uint64_t) -1);
1803 if (r < 0)
1804 goto finish;
1805 }
1806
1807 r = 0;
1808
1809 finish:
1810 sd_event_unref(e);
1811 return r;
1812 }
1813
1814 int sd_event_get_state(sd_event *e) {
1815 assert_return(e, -EINVAL);
1816 assert_return(!event_pid_changed(e), -ECHILD);
1817
1818 return e->state;
1819 }
1820
1821 int sd_event_get_quit(sd_event *e) {
1822 assert_return(e, -EINVAL);
1823 assert_return(!event_pid_changed(e), -ECHILD);
1824
1825 return e->quit_requested;
1826 }
1827
1828 int sd_event_request_quit(sd_event *e) {
1829 assert_return(e, -EINVAL);
1830 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1831 assert_return(!event_pid_changed(e), -ECHILD);
1832
1833 e->quit_requested = true;
1834 return 0;
1835 }
1836
1837 int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1838 assert_return(e, -EINVAL);
1839 assert_return(usec, -EINVAL);
1840 assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1841 assert_return(!event_pid_changed(e), -ECHILD);
1842
1843 *usec = e->timestamp.realtime;
1844 return 0;
1845 }
1846
1847 int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1848 assert_return(e, -EINVAL);
1849 assert_return(usec, -EINVAL);
1850 assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1851 assert_return(!event_pid_changed(e), -ECHILD);
1852
1853 *usec = e->timestamp.monotonic;
1854 return 0;
1855 }