]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/libsystemd-bus/sd-event.c
96ba2ad269cc10cc02f4d3ff20704ba619c15952
[thirdparty/systemd.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2013 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "prioq.h"
28 #include "hashmap.h"
29 #include "util.h"
30 #include "time-util.h"
31 #include "sd-id128.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
37
38 typedef enum EventSourceType {
39 SOURCE_IO,
40 SOURCE_MONOTONIC,
41 SOURCE_REALTIME,
42 SOURCE_SIGNAL,
43 SOURCE_CHILD,
44 SOURCE_DEFER,
45 SOURCE_QUIT
46 } EventSourceType;
47
48 struct sd_event_source {
49 unsigned n_ref;
50
51 sd_event *event;
52 void *userdata;
53 sd_prepare_handler_t prepare;
54
55 EventSourceType type:4;
56 int enabled:3;
57 bool pending:1;
58
59 int priority;
60 unsigned pending_index;
61 unsigned prepare_index;
62 unsigned pending_iteration;
63 unsigned prepare_iteration;
64
65 union {
66 struct {
67 sd_io_handler_t callback;
68 int fd;
69 uint32_t events;
70 uint32_t revents;
71 bool registered:1;
72 } io;
73 struct {
74 sd_time_handler_t callback;
75 usec_t next, accuracy;
76 unsigned earliest_index;
77 unsigned latest_index;
78 } time;
79 struct {
80 sd_signal_handler_t callback;
81 struct signalfd_siginfo siginfo;
82 int sig;
83 } signal;
84 struct {
85 sd_child_handler_t callback;
86 siginfo_t siginfo;
87 pid_t pid;
88 int options;
89 } child;
90 struct {
91 sd_defer_handler_t callback;
92 } defer;
93 struct {
94 sd_quit_handler_t callback;
95 unsigned prioq_index;
96 } quit;
97 };
98 };
99
100 struct sd_event {
101 unsigned n_ref;
102
103 int epoll_fd;
104 int signal_fd;
105 int realtime_fd;
106 int monotonic_fd;
107
108 Prioq *pending;
109 Prioq *prepare;
110
111 /* For both clocks we maintain two priority queues each, one
112 * ordered for the earliest times the events may be
113 * dispatched, and one ordered by the latest times they must
114 * have been dispatched. The range between the top entries in
115 * the two prioqs is the time window we can freely schedule
116 * wakeups in */
117 Prioq *monotonic_earliest;
118 Prioq *monotonic_latest;
119 Prioq *realtime_earliest;
120 Prioq *realtime_latest;
121
122 usec_t realtime_next, monotonic_next;
123 usec_t perturb;
124
125 sigset_t sigset;
126 sd_event_source **signal_sources;
127
128 Hashmap *child_sources;
129 unsigned n_enabled_child_sources;
130
131 Prioq *quit;
132
133 pid_t original_pid;
134
135 unsigned iteration;
136 dual_timestamp timestamp;
137 int state;
138
139 bool quit_requested:1;
140 bool need_process_child:1;
141 };
142
143 static int pending_prioq_compare(const void *a, const void *b) {
144 const sd_event_source *x = a, *y = b;
145
146 assert(x->pending);
147 assert(y->pending);
148
149 /* Enabled ones first */
150 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
151 return -1;
152 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
153 return 1;
154
155 /* Lower priority values first */
156 if (x->priority < y->priority)
157 return -1;
158 if (x->priority > y->priority)
159 return 1;
160
161 /* Older entries first */
162 if (x->pending_iteration < y->pending_iteration)
163 return -1;
164 if (x->pending_iteration > y->pending_iteration)
165 return 1;
166
167 /* Stability for the rest */
168 if (x < y)
169 return -1;
170 if (x > y)
171 return 1;
172
173 return 0;
174 }
175
176 static int prepare_prioq_compare(const void *a, const void *b) {
177 const sd_event_source *x = a, *y = b;
178
179 assert(x->prepare);
180 assert(y->prepare);
181
182 /* Move most recently prepared ones last, so that we can stop
183 * preparing as soon as we hit one that has already been
184 * prepared in the current iteration */
185 if (x->prepare_iteration < y->prepare_iteration)
186 return -1;
187 if (x->prepare_iteration > y->prepare_iteration)
188 return 1;
189
190 /* Enabled ones first */
191 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
192 return -1;
193 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
194 return 1;
195
196 /* Lower priority values first */
197 if (x->priority < y->priority)
198 return -1;
199 if (x->priority > y->priority)
200 return 1;
201
202 /* Stability for the rest */
203 if (x < y)
204 return -1;
205 if (x > y)
206 return 1;
207
208 return 0;
209 }
210
211 static int earliest_time_prioq_compare(const void *a, const void *b) {
212 const sd_event_source *x = a, *y = b;
213
214 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
215 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
216
217 /* Enabled ones first */
218 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
219 return -1;
220 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
221 return 1;
222
223 /* Move the pending ones to the end */
224 if (!x->pending && y->pending)
225 return -1;
226 if (x->pending && !y->pending)
227 return 1;
228
229 /* Order by time */
230 if (x->time.next < y->time.next)
231 return -1;
232 if (x->time.next > y->time.next)
233 return -1;
234
235 /* Stability for the rest */
236 if (x < y)
237 return -1;
238 if (x > y)
239 return 1;
240
241 return 0;
242 }
243
244 static int latest_time_prioq_compare(const void *a, const void *b) {
245 const sd_event_source *x = a, *y = b;
246
247 assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
248 (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
249
250 /* Enabled ones first */
251 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
252 return -1;
253 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
254 return 1;
255
256 /* Move the pending ones to the end */
257 if (!x->pending && y->pending)
258 return -1;
259 if (x->pending && !y->pending)
260 return 1;
261
262 /* Order by time */
263 if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
264 return -1;
265 if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
266 return -1;
267
268 /* Stability for the rest */
269 if (x < y)
270 return -1;
271 if (x > y)
272 return 1;
273
274 return 0;
275 }
276
277 static int quit_prioq_compare(const void *a, const void *b) {
278 const sd_event_source *x = a, *y = b;
279
280 assert(x->type == SOURCE_QUIT);
281 assert(y->type == SOURCE_QUIT);
282
283 /* Enabled ones first */
284 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
285 return -1;
286 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
287 return 1;
288
289 /* Lower priority values first */
290 if (x->priority < y->priority)
291 return -1;
292 if (x->priority > y->priority)
293 return 1;
294
295 /* Stability for the rest */
296 if (x < y)
297 return -1;
298 if (x > y)
299 return 1;
300
301 return 0;
302 }
303
304 static void event_free(sd_event *e) {
305 assert(e);
306
307 if (e->epoll_fd >= 0)
308 close_nointr_nofail(e->epoll_fd);
309
310 if (e->signal_fd >= 0)
311 close_nointr_nofail(e->signal_fd);
312
313 if (e->realtime_fd >= 0)
314 close_nointr_nofail(e->realtime_fd);
315
316 if (e->monotonic_fd >= 0)
317 close_nointr_nofail(e->monotonic_fd);
318
319 prioq_free(e->pending);
320 prioq_free(e->prepare);
321 prioq_free(e->monotonic_earliest);
322 prioq_free(e->monotonic_latest);
323 prioq_free(e->realtime_earliest);
324 prioq_free(e->realtime_latest);
325 prioq_free(e->quit);
326
327 free(e->signal_sources);
328
329 hashmap_free(e->child_sources);
330 free(e);
331 }
332
333 int sd_event_new(sd_event** ret) {
334 sd_event *e;
335 int r;
336
337 assert_return(ret, -EINVAL);
338
339 e = new0(sd_event, 1);
340 if (!e)
341 return -ENOMEM;
342
343 e->n_ref = 1;
344 e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
345 e->realtime_next = e->monotonic_next = (usec_t) -1;
346 e->original_pid = getpid();
347
348 assert_se(sigemptyset(&e->sigset) == 0);
349
350 e->pending = prioq_new(pending_prioq_compare);
351 if (!e->pending) {
352 r = -ENOMEM;
353 goto fail;
354 }
355
356 e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
357 if (e->epoll_fd < 0) {
358 r = -errno;
359 goto fail;
360 }
361
362 *ret = e;
363 return 0;
364
365 fail:
366 event_free(e);
367 return r;
368 }
369
370 sd_event* sd_event_ref(sd_event *e) {
371 assert_return(e, NULL);
372
373 assert(e->n_ref >= 1);
374 e->n_ref++;
375
376 return e;
377 }
378
379 sd_event* sd_event_unref(sd_event *e) {
380 assert_return(e, NULL);
381
382 assert(e->n_ref >= 1);
383 e->n_ref--;
384
385 if (e->n_ref <= 0)
386 event_free(e);
387
388 return NULL;
389 }
390
391 static bool event_pid_changed(sd_event *e) {
392 assert(e);
393
394 /* We don't support people creating am event loop and keeping
395 * it around over a fork(). Let's complain. */
396
397 return e->original_pid != getpid();
398 }
399
400 static int source_io_unregister(sd_event_source *s) {
401 int r;
402
403 assert(s);
404 assert(s->type == SOURCE_IO);
405
406 if (!s->io.registered)
407 return 0;
408
409 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
410 if (r < 0)
411 return -errno;
412
413 s->io.registered = false;
414 return 0;
415 }
416
417 static int source_io_register(
418 sd_event_source *s,
419 int enabled,
420 uint32_t events) {
421
422 struct epoll_event ev = {};
423 int r;
424
425 assert(s);
426 assert(s->type == SOURCE_IO);
427 assert(enabled != SD_EVENT_OFF);
428
429 ev.events = events;
430 ev.data.ptr = s;
431
432 if (enabled == SD_EVENT_ONESHOT)
433 ev.events |= EPOLLONESHOT;
434
435 if (s->io.registered)
436 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
437 else
438 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
439
440 if (r < 0)
441 return -errno;
442
443 s->io.registered = true;
444
445 return 0;
446 }
447
448 static void source_free(sd_event_source *s) {
449 assert(s);
450
451 if (s->event) {
452 switch (s->type) {
453
454 case SOURCE_IO:
455 if (s->io.fd >= 0)
456 source_io_unregister(s);
457
458 break;
459
460 case SOURCE_MONOTONIC:
461 prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
462 prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
463 break;
464
465 case SOURCE_REALTIME:
466 prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
467 prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
468 break;
469
470 case SOURCE_SIGNAL:
471 if (s->signal.sig > 0) {
472 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
473 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
474
475 if (s->event->signal_sources)
476 s->event->signal_sources[s->signal.sig] = NULL;
477 }
478
479 break;
480
481 case SOURCE_CHILD:
482 if (s->child.pid > 0) {
483 if (s->enabled != SD_EVENT_OFF) {
484 assert(s->event->n_enabled_child_sources > 0);
485 s->event->n_enabled_child_sources--;
486 }
487
488 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
489 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
490
491 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
492 }
493
494 break;
495
496 case SOURCE_QUIT:
497 prioq_remove(s->event->quit, s, &s->quit.prioq_index);
498 break;
499 }
500
501 if (s->pending)
502 prioq_remove(s->event->pending, s, &s->pending_index);
503
504 if (s->prepare)
505 prioq_remove(s->event->prepare, s, &s->prepare_index);
506
507 sd_event_unref(s->event);
508 }
509
510 free(s);
511 }
512
513 static int source_set_pending(sd_event_source *s, bool b) {
514 int r;
515
516 assert(s);
517 assert(s->type != SOURCE_QUIT);
518
519 if (s->pending == b)
520 return 0;
521
522 s->pending = b;
523
524 if (b) {
525 s->pending_iteration = s->event->iteration;
526
527 r = prioq_put(s->event->pending, s, &s->pending_index);
528 if (r < 0) {
529 s->pending = false;
530 return r;
531 }
532 } else
533 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
534
535 return 0;
536 }
537
538 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
539 sd_event_source *s;
540
541 assert(e);
542
543 s = new0(sd_event_source, 1);
544 if (!s)
545 return NULL;
546
547 s->n_ref = 1;
548 s->event = sd_event_ref(e);
549 s->type = type;
550 s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
551
552 return s;
553 }
554
555 int sd_event_add_io(
556 sd_event *e,
557 int fd,
558 uint32_t events,
559 sd_io_handler_t callback,
560 void *userdata,
561 sd_event_source **ret) {
562
563 sd_event_source *s;
564 int r;
565
566 assert_return(e, -EINVAL);
567 assert_return(fd >= 0, -EINVAL);
568 assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
569 assert_return(callback, -EINVAL);
570 assert_return(ret, -EINVAL);
571 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
572 assert_return(!event_pid_changed(e), -ECHILD);
573
574 s = source_new(e, SOURCE_IO);
575 if (!s)
576 return -ENOMEM;
577
578 s->io.fd = fd;
579 s->io.events = events;
580 s->io.callback = callback;
581 s->userdata = userdata;
582 s->enabled = SD_EVENT_ON;
583
584 r = source_io_register(s, s->enabled, events);
585 if (r < 0) {
586 source_free(s);
587 return -errno;
588 }
589
590 *ret = s;
591 return 0;
592 }
593
594 static int event_setup_timer_fd(
595 sd_event *e,
596 EventSourceType type,
597 int *timer_fd,
598 clockid_t id) {
599
600 struct epoll_event ev = {};
601 int r, fd;
602 sd_id128_t bootid;
603
604 assert(e);
605 assert(timer_fd);
606
607 if (_likely_(*timer_fd >= 0))
608 return 0;
609
610 fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
611 if (fd < 0)
612 return -errno;
613
614 ev.events = EPOLLIN;
615 ev.data.ptr = INT_TO_PTR(type);
616
617 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
618 if (r < 0) {
619 close_nointr_nofail(fd);
620 return -errno;
621 }
622
623 /* When we sleep for longer, we try to realign the wakeup to
624 the same time wihtin each second, so that events all across
625 the system can be coalesced into a single CPU
626 wakeup. However, let's take some system-specific randomness
627 for this value, so that in a network of systems with synced
628 clocks timer events are distributed a bit. Here, we
629 calculate a perturbation usec offset from the boot ID. */
630
631 if (sd_id128_get_boot(&bootid) >= 0)
632 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
633
634 *timer_fd = fd;
635 return 0;
636 }
637
638 static int event_add_time_internal(
639 sd_event *e,
640 EventSourceType type,
641 int *timer_fd,
642 clockid_t id,
643 Prioq **earliest,
644 Prioq **latest,
645 uint64_t usec,
646 uint64_t accuracy,
647 sd_time_handler_t callback,
648 void *userdata,
649 sd_event_source **ret) {
650
651 sd_event_source *s;
652 int r;
653
654 assert_return(e, -EINVAL);
655 assert_return(callback, -EINVAL);
656 assert_return(ret, -EINVAL);
657 assert_return(usec != (uint64_t) -1, -EINVAL);
658 assert_return(accuracy != (uint64_t) -1, -EINVAL);
659 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
660 assert_return(!event_pid_changed(e), -ECHILD);
661
662 assert(timer_fd);
663 assert(earliest);
664 assert(latest);
665
666 if (!*earliest) {
667 *earliest = prioq_new(earliest_time_prioq_compare);
668 if (!*earliest)
669 return -ENOMEM;
670 }
671
672 if (!*latest) {
673 *latest = prioq_new(latest_time_prioq_compare);
674 if (!*latest)
675 return -ENOMEM;
676 }
677
678 if (*timer_fd < 0) {
679 r = event_setup_timer_fd(e, type, timer_fd, id);
680 if (r < 0)
681 return r;
682 }
683
684 s = source_new(e, type);
685 if (!s)
686 return -ENOMEM;
687
688 s->time.next = usec;
689 s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
690 s->time.callback = callback;
691 s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
692 s->userdata = userdata;
693 s->enabled = SD_EVENT_ONESHOT;
694
695 r = prioq_put(*earliest, s, &s->time.earliest_index);
696 if (r < 0)
697 goto fail;
698
699 r = prioq_put(*latest, s, &s->time.latest_index);
700 if (r < 0)
701 goto fail;
702
703 *ret = s;
704 return 0;
705
706 fail:
707 source_free(s);
708 return r;
709 }
710
711 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
712 return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
713 }
714
715 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
716 return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
717 }
718
719 static int event_update_signal_fd(sd_event *e) {
720 struct epoll_event ev = {};
721 bool add_to_epoll;
722 int r;
723
724 assert(e);
725
726 add_to_epoll = e->signal_fd < 0;
727
728 r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
729 if (r < 0)
730 return -errno;
731
732 e->signal_fd = r;
733
734 if (!add_to_epoll)
735 return 0;
736
737 ev.events = EPOLLIN;
738 ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
739
740 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
741 if (r < 0) {
742 close_nointr_nofail(e->signal_fd);
743 e->signal_fd = -1;
744
745 return -errno;
746 }
747
748 return 0;
749 }
750
751 int sd_event_add_signal(
752 sd_event *e,
753 int sig,
754 sd_signal_handler_t callback,
755 void *userdata,
756 sd_event_source **ret) {
757
758 sd_event_source *s;
759 int r;
760
761 assert_return(e, -EINVAL);
762 assert_return(sig > 0, -EINVAL);
763 assert_return(sig < _NSIG, -EINVAL);
764 assert_return(callback, -EINVAL);
765 assert_return(ret, -EINVAL);
766 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
767 assert_return(!event_pid_changed(e), -ECHILD);
768
769 if (!e->signal_sources) {
770 e->signal_sources = new0(sd_event_source*, _NSIG);
771 if (!e->signal_sources)
772 return -ENOMEM;
773 } else if (e->signal_sources[sig])
774 return -EBUSY;
775
776 s = source_new(e, SOURCE_SIGNAL);
777 if (!s)
778 return -ENOMEM;
779
780 s->signal.sig = sig;
781 s->signal.callback = callback;
782 s->userdata = userdata;
783 s->enabled = SD_EVENT_ON;
784
785 e->signal_sources[sig] = s;
786 assert_se(sigaddset(&e->sigset, sig) == 0);
787
788 if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
789 r = event_update_signal_fd(e);
790 if (r < 0) {
791 source_free(s);
792 return r;
793 }
794 }
795
796 *ret = s;
797 return 0;
798 }
799
800 int sd_event_add_child(
801 sd_event *e,
802 pid_t pid,
803 int options,
804 sd_child_handler_t callback,
805 void *userdata,
806 sd_event_source **ret) {
807
808 sd_event_source *s;
809 int r;
810
811 assert_return(e, -EINVAL);
812 assert_return(pid > 1, -EINVAL);
813 assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
814 assert_return(options != 0, -EINVAL);
815 assert_return(callback, -EINVAL);
816 assert_return(ret, -EINVAL);
817 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
818 assert_return(!event_pid_changed(e), -ECHILD);
819
820 r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
821 if (r < 0)
822 return r;
823
824 if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
825 return -EBUSY;
826
827 s = source_new(e, SOURCE_CHILD);
828 if (!s)
829 return -ENOMEM;
830
831 s->child.pid = pid;
832 s->child.options = options;
833 s->child.callback = callback;
834 s->userdata = userdata;
835 s->enabled = SD_EVENT_ONESHOT;
836
837 r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
838 if (r < 0) {
839 source_free(s);
840 return r;
841 }
842
843 e->n_enabled_child_sources ++;
844
845 assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
846
847 if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
848 r = event_update_signal_fd(e);
849 if (r < 0) {
850 source_free(s);
851 return -errno;
852 }
853 }
854
855 e->need_process_child = true;
856
857 *ret = s;
858 return 0;
859 }
860
861 int sd_event_add_defer(
862 sd_event *e,
863 sd_defer_handler_t callback,
864 void *userdata,
865 sd_event_source **ret) {
866
867 sd_event_source *s;
868 int r;
869
870 assert_return(e, -EINVAL);
871 assert_return(callback, -EINVAL);
872 assert_return(ret, -EINVAL);
873 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
874 assert_return(!event_pid_changed(e), -ECHILD);
875
876 s = source_new(e, SOURCE_DEFER);
877 if (!s)
878 return -ENOMEM;
879
880 s->defer.callback = callback;
881 s->userdata = userdata;
882 s->enabled = SD_EVENT_ONESHOT;
883
884 r = source_set_pending(s, true);
885 if (r < 0) {
886 source_free(s);
887 return r;
888 }
889
890 *ret = s;
891 return 0;
892 }
893
894 int sd_event_add_quit(
895 sd_event *e,
896 sd_quit_handler_t callback,
897 void *userdata,
898 sd_event_source **ret) {
899
900 sd_event_source *s;
901 int r;
902
903 assert_return(e, -EINVAL);
904 assert_return(callback, -EINVAL);
905 assert_return(ret, -EINVAL);
906 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
907 assert_return(!event_pid_changed(e), -ECHILD);
908
909 if (!e->quit) {
910 e->quit = prioq_new(quit_prioq_compare);
911 if (!e->quit)
912 return -ENOMEM;
913 }
914
915 s = source_new(e, SOURCE_QUIT);
916 if (!s)
917 return -ENOMEM;
918
919 s->quit.callback = callback;
920 s->userdata = userdata;
921 s->quit.prioq_index = PRIOQ_IDX_NULL;
922 s->enabled = SD_EVENT_ONESHOT;
923
924 r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
925 if (r < 0) {
926 source_free(s);
927 return r;
928 }
929
930 *ret = s;
931 return 0;
932 }
933
934 sd_event_source* sd_event_source_ref(sd_event_source *s) {
935 assert_return(s, NULL);
936
937 assert(s->n_ref >= 1);
938 s->n_ref++;
939
940 return s;
941 }
942
943 sd_event_source* sd_event_source_unref(sd_event_source *s) {
944 assert_return(s, NULL);
945
946 assert(s->n_ref >= 1);
947 s->n_ref--;
948
949 if (s->n_ref <= 0)
950 source_free(s);
951
952 return NULL;
953 }
954
955 sd_event *sd_event_get(sd_event_source *s) {
956 assert_return(s, NULL);
957
958 return s->event;
959 }
960
961 int sd_event_source_get_pending(sd_event_source *s) {
962 assert_return(s, -EINVAL);
963 assert_return(s->type != SOURCE_QUIT, -EDOM);
964 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
965 assert_return(!event_pid_changed(s->event), -ECHILD);
966
967 return s->pending;
968 }
969
970 int sd_event_source_get_io_fd(sd_event_source *s) {
971 assert_return(s, -EINVAL);
972 assert_return(s->type == SOURCE_IO, -EDOM);
973 assert_return(!event_pid_changed(s->event), -ECHILD);
974
975 return s->io.fd;
976 }
977
978 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
979 assert_return(s, -EINVAL);
980 assert_return(events, -EINVAL);
981 assert_return(s->type == SOURCE_IO, -EDOM);
982 assert_return(!event_pid_changed(s->event), -ECHILD);
983
984 *events = s->io.events;
985 return 0;
986 }
987
988 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
989 int r;
990
991 assert_return(s, -EINVAL);
992 assert_return(s->type == SOURCE_IO, -EDOM);
993 assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
994 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
995 assert_return(!event_pid_changed(s->event), -ECHILD);
996
997 if (s->io.events == events)
998 return 0;
999
1000 if (s->enabled != SD_EVENT_OFF) {
1001 r = source_io_register(s, s->io.events, events);
1002 if (r < 0)
1003 return r;
1004 }
1005
1006 s->io.events = events;
1007
1008 return 0;
1009 }
1010
1011 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1012 assert_return(s, -EINVAL);
1013 assert_return(revents, -EINVAL);
1014 assert_return(s->type == SOURCE_IO, -EDOM);
1015 assert_return(s->pending, -ENODATA);
1016 assert_return(!event_pid_changed(s->event), -ECHILD);
1017
1018 *revents = s->io.revents;
1019 return 0;
1020 }
1021
1022 int sd_event_source_get_signal(sd_event_source *s) {
1023 assert_return(s, -EINVAL);
1024 assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1025 assert_return(!event_pid_changed(s->event), -ECHILD);
1026
1027 return s->signal.sig;
1028 }
1029
1030 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1031 assert_return(s, -EINVAL);
1032 assert_return(!event_pid_changed(s->event), -ECHILD);
1033
1034 return s->priority;
1035 }
1036
1037 int sd_event_source_set_priority(sd_event_source *s, int priority) {
1038 assert_return(s, -EINVAL);
1039 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1040 assert_return(!event_pid_changed(s->event), -ECHILD);
1041
1042 if (s->priority == priority)
1043 return 0;
1044
1045 s->priority = priority;
1046
1047 if (s->pending)
1048 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1049
1050 if (s->prepare)
1051 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1052
1053 if (s->type == SOURCE_QUIT)
1054 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1055
1056 return 0;
1057 }
1058
1059 int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1060 assert_return(s, -EINVAL);
1061 assert_return(m, -EINVAL);
1062 assert_return(!event_pid_changed(s->event), -ECHILD);
1063
1064 *m = s->enabled;
1065 return 0;
1066 }
1067
1068 int sd_event_source_set_enabled(sd_event_source *s, int m) {
1069 int r;
1070
1071 assert_return(s, -EINVAL);
1072 assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1073 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1074 assert_return(!event_pid_changed(s->event), -ECHILD);
1075
1076 if (s->enabled == m)
1077 return 0;
1078
1079 if (m == SD_EVENT_OFF) {
1080
1081 switch (s->type) {
1082
1083 case SOURCE_IO:
1084 r = source_io_unregister(s);
1085 if (r < 0)
1086 return r;
1087
1088 s->enabled = m;
1089 break;
1090
1091 case SOURCE_MONOTONIC:
1092 s->enabled = m;
1093 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1094 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1095 break;
1096
1097 case SOURCE_REALTIME:
1098 s->enabled = m;
1099 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1100 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1101 break;
1102
1103 case SOURCE_SIGNAL:
1104 s->enabled = m;
1105 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1106 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1107 event_update_signal_fd(s->event);
1108 }
1109
1110 break;
1111
1112 case SOURCE_CHILD:
1113 s->enabled = m;
1114
1115 assert(s->event->n_enabled_child_sources > 0);
1116 s->event->n_enabled_child_sources--;
1117
1118 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1119 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1120 event_update_signal_fd(s->event);
1121 }
1122
1123 break;
1124
1125 case SOURCE_QUIT:
1126 s->enabled = m;
1127 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1128 break;
1129
1130 case SOURCE_DEFER:
1131 s->enabled = m;
1132 break;
1133 }
1134
1135 } else {
1136 switch (s->type) {
1137
1138 case SOURCE_IO:
1139 r = source_io_register(s, m, s->io.events);
1140 if (r < 0)
1141 return r;
1142
1143 s->enabled = m;
1144 break;
1145
1146 case SOURCE_MONOTONIC:
1147 s->enabled = m;
1148 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1149 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1150 break;
1151
1152 case SOURCE_REALTIME:
1153 s->enabled = m;
1154 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1155 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1156 break;
1157
1158 case SOURCE_SIGNAL:
1159 s->enabled = m;
1160
1161 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1162 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1163 event_update_signal_fd(s->event);
1164 }
1165 break;
1166
1167 case SOURCE_CHILD:
1168 s->enabled = m;
1169
1170 if (s->enabled == SD_EVENT_OFF) {
1171 s->event->n_enabled_child_sources++;
1172
1173 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1174 assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1175 event_update_signal_fd(s->event);
1176 }
1177 }
1178 break;
1179
1180 case SOURCE_QUIT:
1181 s->enabled = m;
1182 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1183 break;
1184
1185 case SOURCE_DEFER:
1186 s->enabled = m;
1187 break;
1188 }
1189 }
1190
1191 if (s->pending)
1192 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1193
1194 if (s->prepare)
1195 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1196
1197 return 0;
1198 }
1199
1200 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1201 assert_return(s, -EINVAL);
1202 assert_return(usec, -EINVAL);
1203 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1204 assert_return(!event_pid_changed(s->event), -ECHILD);
1205
1206 *usec = s->time.next;
1207 return 0;
1208 }
1209
1210 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1211 assert_return(s, -EINVAL);
1212 assert_return(usec != (uint64_t) -1, -EINVAL);
1213 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1214 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1215 assert_return(!event_pid_changed(s->event), -ECHILD);
1216
1217 if (s->time.next == usec)
1218 return 0;
1219
1220 s->time.next = usec;
1221
1222 if (s->type == SOURCE_REALTIME) {
1223 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1224 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1225 } else {
1226 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1227 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1228 }
1229
1230 return 0;
1231 }
1232
1233 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1234 assert_return(s, -EINVAL);
1235 assert_return(usec, -EINVAL);
1236 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1237 assert_return(!event_pid_changed(s->event), -ECHILD);
1238
1239 *usec = s->time.accuracy;
1240 return 0;
1241 }
1242
1243 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1244 assert_return(s, -EINVAL);
1245 assert_return(usec != (uint64_t) -1, -EINVAL);
1246 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1247 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1248 assert_return(!event_pid_changed(s->event), -ECHILD);
1249
1250 if (usec == 0)
1251 usec = DEFAULT_ACCURACY_USEC;
1252
1253 if (s->time.accuracy == usec)
1254 return 0;
1255
1256 s->time.accuracy = usec;
1257
1258 if (s->type == SOURCE_REALTIME)
1259 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1260 else
1261 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1262
1263 return 0;
1264 }
1265
1266 int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1267 assert_return(s, -EINVAL);
1268 assert_return(pid, -EINVAL);
1269 assert_return(s->type == SOURCE_CHILD, -EDOM);
1270 assert_return(!event_pid_changed(s->event), -ECHILD);
1271
1272 *pid = s->child.pid;
1273 return 0;
1274 }
1275
1276 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1277 int r;
1278
1279 assert_return(s, -EINVAL);
1280 assert_return(s->type != SOURCE_QUIT, -EDOM);
1281 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1282 assert_return(!event_pid_changed(s->event), -ECHILD);
1283
1284 if (s->prepare == callback)
1285 return 0;
1286
1287 if (callback && s->prepare) {
1288 s->prepare = callback;
1289 return 0;
1290 }
1291
1292 r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1293 if (r < 0)
1294 return r;
1295
1296 s->prepare = callback;
1297
1298 if (callback) {
1299 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1300 if (r < 0)
1301 return r;
1302 } else
1303 prioq_remove(s->event->prepare, s, &s->prepare_index);
1304
1305 return 0;
1306 }
1307
1308 void* sd_event_source_get_userdata(sd_event_source *s) {
1309 assert_return(s, NULL);
1310
1311 return s->userdata;
1312 }
1313
1314 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1315 usec_t c;
1316 assert(e);
1317 assert(a <= b);
1318
1319 if (a <= 0)
1320 return 0;
1321
1322 if (b <= a + 1)
1323 return a;
1324
1325 /*
1326 Find a good time to wake up again between times a and b. We
1327 have two goals here:
1328
1329 a) We want to wake up as seldom as possible, hence prefer
1330 later times over earlier times.
1331
1332 b) But if we have to wake up, then let's make sure to
1333 dispatch as much as possible on the entire system.
1334
1335 We implement this by waking up everywhere at the same time
1336 within any given second if we can, synchronised via the
1337 perturbation value determined from the boot ID. If we can't,
1338 then we try to find the same spot in every a 250ms
1339 step. Otherwise, we pick the last possible time to wake up.
1340 */
1341
1342 c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1343 if (c >= b) {
1344 if (_unlikely_(c < USEC_PER_SEC))
1345 return b;
1346
1347 c -= USEC_PER_SEC;
1348 }
1349
1350 if (c >= a)
1351 return c;
1352
1353 c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1354 if (c >= b) {
1355 if (_unlikely_(c < USEC_PER_MSEC*250))
1356 return b;
1357
1358 c -= USEC_PER_MSEC*250;
1359 }
1360
1361 if (c >= a)
1362 return c;
1363
1364 return b;
1365 }
1366
1367 static int event_arm_timer(
1368 sd_event *e,
1369 int timer_fd,
1370 Prioq *earliest,
1371 Prioq *latest,
1372 usec_t *next) {
1373
1374 struct itimerspec its = {};
1375 sd_event_source *a, *b;
1376 usec_t t;
1377 int r;
1378
1379 assert_se(e);
1380 assert_se(next);
1381
1382 a = prioq_peek(earliest);
1383 if (!a || a->enabled == SD_EVENT_OFF)
1384 return 0;
1385
1386 b = prioq_peek(latest);
1387 assert_se(b && b->enabled != SD_EVENT_OFF);
1388
1389 t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1390 if (*next == t)
1391 return 0;
1392
1393 assert_se(timer_fd >= 0);
1394
1395 if (t == 0) {
1396 /* We don' want to disarm here, just mean some time looooong ago. */
1397 its.it_value.tv_sec = 0;
1398 its.it_value.tv_nsec = 1;
1399 } else
1400 timespec_store(&its.it_value, t);
1401
1402 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1403 if (r < 0)
1404 return r;
1405
1406 *next = t;
1407 return 0;
1408 }
1409
1410 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1411 assert(e);
1412 assert(s);
1413 assert(s->type == SOURCE_IO);
1414
1415 s->io.revents = events;
1416
1417 /*
1418 If this is a oneshot event source, then we added it to the
1419 epoll with EPOLLONESHOT, hence we know it's not registered
1420 anymore. We can save a syscall here...
1421 */
1422
1423 if (s->enabled == SD_EVENT_ONESHOT)
1424 s->io.registered = false;
1425
1426 return source_set_pending(s, true);
1427 }
1428
1429 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1430 uint64_t x;
1431 ssize_t ss;
1432
1433 assert(e);
1434 assert(fd >= 0);
1435 assert_return(events == EPOLLIN, -EIO);
1436
1437 ss = read(fd, &x, sizeof(x));
1438 if (ss < 0) {
1439 if (errno == EAGAIN || errno == EINTR)
1440 return 0;
1441
1442 return -errno;
1443 }
1444
1445 if (ss != sizeof(x))
1446 return -EIO;
1447
1448 return 0;
1449 }
1450
1451 static int process_timer(
1452 sd_event *e,
1453 usec_t n,
1454 Prioq *earliest,
1455 Prioq *latest) {
1456
1457 sd_event_source *s;
1458 int r;
1459
1460 assert(e);
1461
1462 for (;;) {
1463 s = prioq_peek(earliest);
1464 if (!s ||
1465 s->time.next > n ||
1466 s->enabled == SD_EVENT_OFF ||
1467 s->pending)
1468 break;
1469
1470 r = source_set_pending(s, true);
1471 if (r < 0)
1472 return r;
1473
1474 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1475 prioq_reshuffle(latest, s, &s->time.latest_index);
1476 }
1477
1478 return 0;
1479 }
1480
1481 static int process_child(sd_event *e) {
1482 sd_event_source *s;
1483 Iterator i;
1484 int r;
1485
1486 assert(e);
1487
1488 e->need_process_child = false;
1489
1490 /*
1491 So, this is ugly. We iteratively invoke waitid() with P_PID
1492 + WNOHANG for each PID we wait for, instead of using
1493 P_ALL. This is because we only want to get child
1494 information of very specific child processes, and not all
1495 of them. We might not have processed the SIGCHLD even of a
1496 previous invocation and we don't want to maintain a
1497 unbounded *per-child* event queue, hence we really don't
1498 want anything flushed out of the kernel's queue that we
1499 don't care about. Since this is O(n) this means that if you
1500 have a lot of processes you probably want to handle SIGCHLD
1501 yourself.
1502 */
1503
1504 HASHMAP_FOREACH(s, e->child_sources, i) {
1505 assert(s->type == SOURCE_CHILD);
1506
1507 if (s->pending)
1508 continue;
1509
1510 if (s->enabled == SD_EVENT_OFF)
1511 continue;
1512
1513 zero(s->child.siginfo);
1514 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1515 if (r < 0)
1516 return -errno;
1517
1518 if (s->child.siginfo.si_pid != 0) {
1519 r = source_set_pending(s, true);
1520 if (r < 0)
1521 return r;
1522 }
1523 }
1524
1525 return 0;
1526 }
1527
1528 static int process_signal(sd_event *e, uint32_t events) {
1529 struct signalfd_siginfo si;
1530 bool read_one = false;
1531 ssize_t ss;
1532 int r;
1533
1534 assert(e);
1535 assert_return(events == EPOLLIN, -EIO);
1536
1537 for (;;) {
1538 sd_event_source *s;
1539
1540 ss = read(e->signal_fd, &si, sizeof(si));
1541 if (ss < 0) {
1542 if (errno == EAGAIN || errno == EINTR)
1543 return read_one;
1544
1545 return -errno;
1546 }
1547
1548 if (ss != sizeof(si))
1549 return -EIO;
1550
1551 read_one = true;
1552
1553 if (si.ssi_signo == SIGCHLD) {
1554 r = process_child(e);
1555 if (r < 0)
1556 return r;
1557 if (r > 0 || !e->signal_sources[si.ssi_signo])
1558 continue;
1559 } else {
1560 s = e->signal_sources[si.ssi_signo];
1561 if (!s)
1562 return -EIO;
1563 }
1564
1565 s->signal.siginfo = si;
1566 r = source_set_pending(s, true);
1567 if (r < 0)
1568 return r;
1569 }
1570
1571
1572 return 0;
1573 }
1574
1575 static int source_dispatch(sd_event_source *s) {
1576 int r;
1577
1578 assert(s);
1579 assert(s->pending || s->type == SOURCE_QUIT);
1580
1581 if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1582 r = source_set_pending(s, false);
1583 if (r < 0)
1584 return r;
1585 }
1586
1587 if (s->enabled == SD_EVENT_ONESHOT) {
1588 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1589 if (r < 0)
1590 return r;
1591 }
1592
1593 switch (s->type) {
1594
1595 case SOURCE_IO:
1596 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1597 break;
1598
1599 case SOURCE_MONOTONIC:
1600 r = s->time.callback(s, s->time.next, s->userdata);
1601 break;
1602
1603 case SOURCE_REALTIME:
1604 r = s->time.callback(s, s->time.next, s->userdata);
1605 break;
1606
1607 case SOURCE_SIGNAL:
1608 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1609 break;
1610
1611 case SOURCE_CHILD:
1612 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1613 break;
1614
1615 case SOURCE_DEFER:
1616 r = s->defer.callback(s, s->userdata);
1617 break;
1618
1619 case SOURCE_QUIT:
1620 r = s->quit.callback(s, s->userdata);
1621 break;
1622 }
1623
1624 return r;
1625 }
1626
1627 static int event_prepare(sd_event *e) {
1628 int r;
1629
1630 assert(e);
1631
1632 for (;;) {
1633 sd_event_source *s;
1634
1635 s = prioq_peek(e->prepare);
1636 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1637 break;
1638
1639 s->prepare_iteration = e->iteration;
1640 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1641 if (r < 0)
1642 return r;
1643
1644 assert(s->prepare);
1645 r = s->prepare(s, s->userdata);
1646 if (r < 0)
1647 return r;
1648
1649 }
1650
1651 return 0;
1652 }
1653
1654 static int dispatch_quit(sd_event *e) {
1655 sd_event_source *p;
1656 int r;
1657
1658 assert(e);
1659
1660 p = prioq_peek(e->quit);
1661 if (!p || p->enabled == SD_EVENT_OFF) {
1662 e->state = SD_EVENT_FINISHED;
1663 return 0;
1664 }
1665
1666 sd_event_ref(e);
1667 e->iteration++;
1668 e->state = SD_EVENT_QUITTING;
1669
1670 r = source_dispatch(p);
1671
1672 e->state = SD_EVENT_PASSIVE;
1673 sd_event_unref(e);
1674
1675 return r;
1676 }
1677
1678 static sd_event_source* event_next_pending(sd_event *e) {
1679 sd_event_source *p;
1680
1681 assert(e);
1682
1683 p = prioq_peek(e->pending);
1684 if (!p)
1685 return NULL;
1686
1687 if (p->enabled == SD_EVENT_OFF)
1688 return NULL;
1689
1690 return p;
1691 }
1692
1693 int sd_event_run(sd_event *e, uint64_t timeout) {
1694 struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1695 sd_event_source *p;
1696 int r, i, m;
1697
1698 assert_return(e, -EINVAL);
1699 assert_return(!event_pid_changed(e), -ECHILD);
1700 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1701 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1702
1703 if (e->quit_requested)
1704 return dispatch_quit(e);
1705
1706 sd_event_ref(e);
1707 e->iteration++;
1708 e->state = SD_EVENT_RUNNING;
1709
1710 r = event_prepare(e);
1711 if (r < 0)
1712 goto finish;
1713
1714 if (event_next_pending(e) || e->need_process_child)
1715 timeout = 0;
1716
1717 if (timeout > 0) {
1718 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1719 if (r < 0)
1720 goto finish;
1721
1722 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1723 if (r < 0)
1724 goto finish;
1725 }
1726
1727 m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1728 timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1729 if (m < 0) {
1730 r = m;
1731 goto finish;
1732 }
1733
1734 dual_timestamp_get(&e->timestamp);
1735
1736 for (i = 0; i < m; i++) {
1737
1738 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1739 r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1740 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1741 r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1742 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1743 r = process_signal(e, ev_queue[i].events);
1744 else
1745 r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1746
1747 if (r < 0)
1748 goto finish;
1749 }
1750
1751 r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1752 if (r < 0)
1753 goto finish;
1754
1755 r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1756 if (r < 0)
1757 goto finish;
1758
1759 if (e->need_process_child) {
1760 r = process_child(e);
1761 if (r < 0)
1762 goto finish;
1763 }
1764
1765 p = event_next_pending(e);
1766 if (!p) {
1767 r = 0;
1768 goto finish;
1769 }
1770
1771 r = source_dispatch(p);
1772
1773 finish:
1774 e->state = SD_EVENT_PASSIVE;
1775 sd_event_unref(e);
1776
1777 return r;
1778 }
1779
1780 int sd_event_loop(sd_event *e) {
1781 int r;
1782
1783 assert_return(e, -EINVAL);
1784 assert_return(!event_pid_changed(e), -ECHILD);
1785 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1786
1787 sd_event_ref(e);
1788
1789 while (e->state != SD_EVENT_FINISHED) {
1790 r = sd_event_run(e, (uint64_t) -1);
1791 if (r < 0)
1792 goto finish;
1793 }
1794
1795 r = 0;
1796
1797 finish:
1798 sd_event_unref(e);
1799 return r;
1800 }
1801
1802 int sd_event_get_state(sd_event *e) {
1803 assert_return(e, -EINVAL);
1804 assert_return(!event_pid_changed(e), -ECHILD);
1805
1806 return e->state;
1807 }
1808
1809 int sd_event_get_quit(sd_event *e) {
1810 assert_return(e, -EINVAL);
1811 assert_return(!event_pid_changed(e), -ECHILD);
1812
1813 return e->quit_requested;
1814 }
1815
1816 int sd_event_request_quit(sd_event *e) {
1817 assert_return(e, -EINVAL);
1818 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1819 assert_return(!event_pid_changed(e), -ECHILD);
1820
1821 e->quit_requested = true;
1822 return 0;
1823 }
1824
1825 int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1826 assert_return(e, -EINVAL);
1827 assert_return(usec, -EINVAL);
1828 assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1829 assert_return(!event_pid_changed(e), -ECHILD);
1830
1831 *usec = e->timestamp.realtime;
1832 return 0;
1833 }
1834
1835 int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1836 assert_return(e, -EINVAL);
1837 assert_return(usec, -EINVAL);
1838 assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1839 assert_return(!event_pid_changed(e), -ECHILD);
1840
1841 *usec = e->timestamp.monotonic;
1842 return 0;
1843 }