]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journald.c
journal: move max_use into metrics structure
[thirdparty/systemd.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "journal-internal.h"
45
46 #define USER_JOURNALS_MAX 1024
47 #define STDOUT_STREAMS_MAX 4096
48
49 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
50 #define DEFAULT_RATE_LIMIT_BURST 200
51
52 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
53
54 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
55
56 #define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
57
58 typedef struct StdoutStream StdoutStream;
59
60 typedef struct Server {
61 int epoll_fd;
62 int signal_fd;
63 int syslog_fd;
64 int native_fd;
65 int stdout_fd;
66
67 JournalFile *runtime_journal;
68 JournalFile *system_journal;
69 Hashmap *user_journals;
70
71 uint64_t seqnum;
72
73 char *buffer;
74 size_t buffer_size;
75
76 JournalRateLimit *rate_limit;
77
78 JournalMetrics metrics;
79 bool compress;
80
81 uint64_t cached_available_space;
82 usec_t cached_available_space_timestamp;
83
84 uint64_t var_available_timestamp;
85
86 LIST_HEAD(StdoutStream, stdout_streams);
87 unsigned n_stdout_streams;
88 } Server;
89
90 typedef enum StdoutStreamState {
91 STDOUT_STREAM_TAG,
92 STDOUT_STREAM_PRIORITY,
93 STDOUT_STREAM_PRIORITY_PREFIX,
94 STDOUT_STREAM_TEE_CONSOLE,
95 STDOUT_STREAM_RUNNING
96 } StdoutStreamState;
97
98 struct StdoutStream {
99 Server *server;
100 StdoutStreamState state;
101
102 int fd;
103
104 struct ucred ucred;
105
106 char *tag;
107 int priority;
108 bool priority_prefix:1;
109 bool tee_console:1;
110
111 char buffer[LINE_MAX+1];
112 size_t length;
113
114 LIST_FIELDS(StdoutStream, stdout_stream);
115 };
116
117 static int server_flush_to_var(Server *s);
118
119 static uint64_t available_space(Server *s) {
120 char ids[33];
121 sd_id128_t machine;
122 char *p;
123 const char *f;
124 struct statvfs ss;
125 uint64_t sum = 0, avail = 0, ss_avail = 0;
126 int r;
127 DIR *d;
128 usec_t ts = now(CLOCK_MONOTONIC);
129
130 if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
131 return s->cached_available_space;
132
133 r = sd_id128_get_machine(&machine);
134 if (r < 0)
135 return 0;
136
137 if (s->system_journal)
138 f = "/var/log/journal/";
139 else
140 f = "/run/log/journal/";
141
142 p = strappend(f, sd_id128_to_string(machine, ids));
143 if (!p)
144 return 0;
145
146 d = opendir(p);
147 free(p);
148
149 if (!d)
150 return 0;
151
152 if (fstatvfs(dirfd(d), &ss) < 0)
153 goto finish;
154
155 for (;;) {
156 struct stat st;
157 struct dirent buf, *de;
158 int k;
159
160 k = readdir_r(d, &buf, &de);
161 if (k != 0) {
162 r = -k;
163 goto finish;
164 }
165
166 if (!de)
167 break;
168
169 if (!dirent_is_file_with_suffix(de, ".journal"))
170 continue;
171
172 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
173 continue;
174
175 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
176 }
177
178 avail = sum >= s->metrics.max_use ? 0 : s->metrics.max_use - sum;
179
180 ss_avail = ss.f_bsize * ss.f_bavail;
181
182 ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
183
184 if (ss_avail < avail)
185 avail = ss_avail;
186
187 s->cached_available_space = avail;
188 s->cached_available_space_timestamp = ts;
189
190 finish:
191 closedir(d);
192
193 return avail;
194 }
195
196 static void fix_perms(JournalFile *f, uid_t uid) {
197 acl_t acl;
198 acl_entry_t entry;
199 acl_permset_t permset;
200 int r;
201
202 assert(f);
203
204 r = fchmod_and_fchown(f->fd, 0640, 0, 0);
205 if (r < 0)
206 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
207
208 if (uid <= 0)
209 return;
210
211 acl = acl_get_fd(f->fd);
212 if (!acl) {
213 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
214 return;
215 }
216
217 r = acl_find_uid(acl, uid, &entry);
218 if (r <= 0) {
219
220 if (acl_create_entry(&acl, &entry) < 0 ||
221 acl_set_tag_type(entry, ACL_USER) < 0 ||
222 acl_set_qualifier(entry, &uid) < 0) {
223 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
224 goto finish;
225 }
226 }
227
228 if (acl_get_permset(entry, &permset) < 0 ||
229 acl_add_perm(permset, ACL_READ) < 0 ||
230 acl_calc_mask(&acl) < 0) {
231 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
232 goto finish;
233 }
234
235 if (acl_set_fd(f->fd, acl) < 0)
236 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
237
238 finish:
239 acl_free(acl);
240 }
241
242 static JournalFile* find_journal(Server *s, uid_t uid) {
243 char *p;
244 int r;
245 JournalFile *f;
246 char ids[33];
247 sd_id128_t machine;
248
249 assert(s);
250
251 /* We split up user logs only on /var, not on /run. If the
252 * runtime file is open, we write to it exclusively, in order
253 * to guarantee proper order as soon as we flush /run to
254 * /var and close the runtime file. */
255
256 if (s->runtime_journal)
257 return s->runtime_journal;
258
259 if (uid <= 0)
260 return s->system_journal;
261
262 r = sd_id128_get_machine(&machine);
263 if (r < 0)
264 return s->system_journal;
265
266 f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
267 if (f)
268 return f;
269
270 if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
271 return s->system_journal;
272
273 while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
274 /* Too many open? Then let's close one */
275 f = hashmap_steal_first(s->user_journals);
276 assert(f);
277 journal_file_close(f);
278 }
279
280 r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
281 free(p);
282
283 if (r < 0)
284 return s->system_journal;
285
286 fix_perms(f, uid);
287 f->metrics = s->metrics;
288 f->compress = s->compress;
289
290 r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
291 if (r < 0) {
292 journal_file_close(f);
293 return s->system_journal;
294 }
295
296 return f;
297 }
298
299 static void server_vacuum(Server *s) {
300 Iterator i;
301 void *k;
302 char *p;
303 char ids[33];
304 sd_id128_t machine;
305 int r;
306 JournalFile *f;
307
308 log_info("Rotating...");
309
310 if (s->runtime_journal) {
311 r = journal_file_rotate(&s->runtime_journal);
312 if (r < 0)
313 log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
314 }
315
316 if (s->system_journal) {
317 r = journal_file_rotate(&s->system_journal);
318 if (r < 0)
319 log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
320 }
321
322 HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
323 r = journal_file_rotate(&f);
324 if (r < 0)
325 log_error("Failed to rotate %s: %s", f->path, strerror(-r));
326 else
327 hashmap_replace(s->user_journals, k, f);
328 }
329
330 log_info("Vacuuming...");
331
332 r = sd_id128_get_machine(&machine);
333 if (r < 0) {
334 log_error("Failed to get machine ID: %s", strerror(-r));
335 return;
336 }
337
338 if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
339 log_error("Out of memory.");
340 return;
341 }
342
343 r = journal_directory_vacuum(p, s->metrics.max_use, s->metrics.keep_free);
344 if (r < 0 && r != -ENOENT)
345 log_error("Failed to vacuum %s: %s", p, strerror(-r));
346 free(p);
347
348 if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
349 log_error("Out of memory.");
350 return;
351 }
352
353 r = journal_directory_vacuum(p, s->metrics.max_use, s->metrics.keep_free);
354 if (r < 0 && r != -ENOENT)
355 log_error("Failed to vacuum %s: %s", p, strerror(-r));
356 free(p);
357
358 s->cached_available_space_timestamp = 0;
359 }
360
361 static char *shortened_cgroup_path(pid_t pid) {
362 int r;
363 char *process_path, *init_path, *path;
364
365 assert(pid > 0);
366
367 r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
368 if (r < 0)
369 return NULL;
370
371 r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
372 if (r < 0) {
373 free(process_path);
374 return NULL;
375 }
376
377 if (streq(init_path, "/"))
378 init_path[0] = 0;
379
380 if (startswith(process_path, init_path)) {
381 char *p;
382
383 p = strdup(process_path + strlen(init_path));
384 if (!p) {
385 free(process_path);
386 free(init_path);
387 return NULL;
388 }
389 path = p;
390 } else {
391 path = process_path;
392 process_path = NULL;
393 }
394
395 free(process_path);
396 free(init_path);
397
398 return path;
399 }
400
401 static void dispatch_message_real(Server *s,
402 struct iovec *iovec, unsigned n, unsigned m,
403 struct ucred *ucred,
404 struct timeval *tv) {
405
406 char *pid = NULL, *uid = NULL, *gid = NULL,
407 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
408 *comm = NULL, *cmdline = NULL, *hostname = NULL,
409 *audit_session = NULL, *audit_loginuid = NULL,
410 *exe = NULL, *cgroup = NULL;
411
412 char idbuf[33];
413 sd_id128_t id;
414 int r;
415 char *t;
416 uid_t loginuid = 0, realuid = 0;
417 JournalFile *f;
418 bool vacuumed = false;
419
420 assert(s);
421 assert(iovec);
422 assert(n > 0);
423 assert(n + 13 <= m);
424
425 if (ucred) {
426 uint32_t session;
427 char *path;
428
429 realuid = ucred->uid;
430
431 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
432 IOVEC_SET_STRING(iovec[n++], pid);
433
434 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
435 IOVEC_SET_STRING(iovec[n++], uid);
436
437 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
438 IOVEC_SET_STRING(iovec[n++], gid);
439
440 r = get_process_comm(ucred->pid, &t);
441 if (r >= 0) {
442 comm = strappend("_COMM=", t);
443 if (comm)
444 IOVEC_SET_STRING(iovec[n++], comm);
445 free(t);
446 }
447
448 r = get_process_exe(ucred->pid, &t);
449 if (r >= 0) {
450 exe = strappend("_EXE=", t);
451 if (comm)
452 IOVEC_SET_STRING(iovec[n++], exe);
453 free(t);
454 }
455
456 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
457 if (r >= 0) {
458 cmdline = strappend("_CMDLINE=", t);
459 if (cmdline)
460 IOVEC_SET_STRING(iovec[n++], cmdline);
461 free(t);
462 }
463
464 r = audit_session_from_pid(ucred->pid, &session);
465 if (r >= 0)
466 if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
467 IOVEC_SET_STRING(iovec[n++], audit_session);
468
469 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
470 if (r >= 0)
471 if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
472 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
473
474 path = shortened_cgroup_path(ucred->pid);
475 if (path) {
476 cgroup = strappend("_SYSTEMD_CGROUP=", path);
477 if (cgroup)
478 IOVEC_SET_STRING(iovec[n++], cgroup);
479
480 free(path);
481 }
482 }
483
484 if (tv) {
485 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
486 (unsigned long long) timeval_load(tv)) >= 0)
487 IOVEC_SET_STRING(iovec[n++], source_time);
488 }
489
490 /* Note that strictly speaking storing the boot id here is
491 * redundant since the entry includes this in-line
492 * anyway. However, we need this indexed, too. */
493 r = sd_id128_get_boot(&id);
494 if (r >= 0)
495 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
496 IOVEC_SET_STRING(iovec[n++], boot_id);
497
498 r = sd_id128_get_machine(&id);
499 if (r >= 0)
500 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
501 IOVEC_SET_STRING(iovec[n++], machine_id);
502
503 t = gethostname_malloc();
504 if (t) {
505 hostname = strappend("_HOSTNAME=", t);
506 if (hostname)
507 IOVEC_SET_STRING(iovec[n++], hostname);
508 free(t);
509 }
510
511 assert(n <= m);
512
513 server_flush_to_var(s);
514
515 retry:
516 f = find_journal(s, realuid == 0 ? 0 : loginuid);
517 if (!f)
518 log_warning("Dropping message, as we can't find a place to store the data.");
519 else {
520 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
521
522 if (r == -E2BIG && !vacuumed) {
523 log_info("Allocation limit reached.");
524
525 server_vacuum(s);
526 vacuumed = true;
527
528 log_info("Retrying write.");
529 goto retry;
530 }
531
532 if (r < 0)
533 log_error("Failed to write entry, ignoring: %s", strerror(-r));
534 }
535
536 free(pid);
537 free(uid);
538 free(gid);
539 free(comm);
540 free(exe);
541 free(cmdline);
542 free(source_time);
543 free(boot_id);
544 free(machine_id);
545 free(hostname);
546 free(audit_session);
547 free(audit_loginuid);
548 free(cgroup);
549 }
550
551 static void dispatch_message(Server *s,
552 struct iovec *iovec, unsigned n, unsigned m,
553 struct ucred *ucred,
554 struct timeval *tv,
555 int priority) {
556 int rl;
557 char *path = NULL, *c;
558
559 assert(s);
560 assert(iovec || n == 0);
561
562 if (n == 0)
563 return;
564
565 if (!ucred)
566 goto finish;
567
568 path = shortened_cgroup_path(ucred->pid);
569 if (!path)
570 goto finish;
571
572 /* example: /user/lennart/3/foobar
573 * /system/dbus.service/foobar
574 *
575 * So let's cut of everything past the third /, since that is
576 * wher user directories start */
577
578 c = strchr(path, '/');
579 if (c) {
580 c = strchr(c+1, '/');
581 if (c) {
582 c = strchr(c+1, '/');
583 if (c)
584 *c = 0;
585 }
586 }
587
588 rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
589
590 if (rl == 0) {
591 free(path);
592 return;
593 }
594
595 if (rl > 1) {
596 int j = 0;
597 char suppress_message[LINE_MAX];
598 struct iovec suppress_iovec[15];
599
600 /* Write a suppression message if we suppressed something */
601
602 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
603 char_array_0(suppress_message);
604
605 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
606 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
607
608 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
609 }
610
611 free(path);
612
613 finish:
614 dispatch_message_real(s, iovec, n, m, ucred, tv);
615 }
616
617 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
618 char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
619 struct iovec iovec[16];
620 unsigned n = 0;
621 int priority = LOG_USER | LOG_INFO;
622
623 assert(s);
624 assert(buf);
625
626 parse_syslog_priority((char**) &buf, &priority);
627 skip_syslog_date((char**) &buf);
628
629 if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
630 IOVEC_SET_STRING(iovec[n++], syslog_priority);
631
632 if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
633 IOVEC_SET_STRING(iovec[n++], syslog_facility);
634
635 message = strappend("MESSAGE=", buf);
636 if (message)
637 IOVEC_SET_STRING(iovec[n++], message);
638
639 dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
640
641 free(message);
642 free(syslog_facility);
643 free(syslog_priority);
644 }
645
646 static bool valid_user_field(const char *p, size_t l) {
647 const char *a;
648
649 /* We kinda enforce POSIX syntax recommendations for
650 environment variables here, but make a couple of additional
651 requirements.
652
653 http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
654
655 /* No empty field names */
656 if (l <= 0)
657 return false;
658
659 /* Don't allow names longer than 64 chars */
660 if (l > 64)
661 return false;
662
663 /* Variables starting with an underscore are protected */
664 if (p[0] == '_')
665 return false;
666
667 /* Don't allow digits as first character */
668 if (p[0] >= '0' && p[0] <= '9')
669 return false;
670
671 /* Only allow A-Z0-9 and '_' */
672 for (a = p; a < p + l; a++)
673 if (!((*a >= 'A' && *a <= 'Z') ||
674 (*a >= '0' && *a <= '9') ||
675 *a == '_'))
676 return false;
677
678 return true;
679 }
680
681 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
682 struct iovec *iovec = NULL;
683 unsigned n = 0, m = 0, j;
684 const char *p;
685 size_t remaining;
686 int priority = LOG_INFO;
687
688 assert(s);
689 assert(buffer || n == 0);
690
691 p = buffer;
692 remaining = buffer_size;
693
694 while (remaining > 0) {
695 const char *e, *q;
696
697 e = memchr(p, '\n', remaining);
698
699 if (!e) {
700 /* Trailing noise, let's ignore it, and flush what we collected */
701 log_debug("Received message with trailing noise, ignoring.");
702 break;
703 }
704
705 if (e == p) {
706 /* Entry separator */
707 dispatch_message(s, iovec, n, m, ucred, tv, priority);
708 n = 0;
709 priority = LOG_INFO;
710
711 p++;
712 remaining--;
713 continue;
714 }
715
716 if (*p == '.' || *p == '#') {
717 /* Ignore control commands for now, and
718 * comments too. */
719 remaining -= (e - p) + 1;
720 p = e + 1;
721 continue;
722 }
723
724 /* A property follows */
725
726 if (n+13 >= m) {
727 struct iovec *c;
728 unsigned u;
729
730 u = MAX((n+13U) * 2U, 4U);
731 c = realloc(iovec, u * sizeof(struct iovec));
732 if (!c) {
733 log_error("Out of memory");
734 break;
735 }
736
737 iovec = c;
738 m = u;
739 }
740
741 q = memchr(p, '=', e - p);
742 if (q) {
743 if (valid_user_field(p, q - p)) {
744 /* If the field name starts with an
745 * underscore, skip the variable,
746 * since that indidates a trusted
747 * field */
748 iovec[n].iov_base = (char*) p;
749 iovec[n].iov_len = e - p;
750 n++;
751
752 /* We need to determine the priority
753 * of this entry for the rate limiting
754 * logic */
755 if (e - p == 10 &&
756 memcmp(p, "PRIORITY=", 10) == 0 &&
757 p[10] >= '0' &&
758 p[10] <= '9')
759 priority = p[10] - '0';
760 }
761
762 remaining -= (e - p) + 1;
763 p = e + 1;
764 continue;
765 } else {
766 uint64_t l;
767 char *k;
768
769 if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
770 log_debug("Failed to parse message, ignoring.");
771 break;
772 }
773
774 memcpy(&l, e + 1, sizeof(uint64_t));
775 l = le64toh(l);
776
777 if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
778 e[1+sizeof(uint64_t)+l] != '\n') {
779 log_debug("Failed to parse message, ignoring.");
780 break;
781 }
782
783 k = malloc((e - p) + 1 + l);
784 if (!k) {
785 log_error("Out of memory");
786 break;
787 }
788
789 memcpy(k, p, e - p);
790 k[e - p] = '=';
791 memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
792
793 if (valid_user_field(p, e - p)) {
794 iovec[n].iov_base = k;
795 iovec[n].iov_len = (e - p) + 1 + l;
796 n++;
797 } else
798 free(k);
799
800 remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
801 p = e + 1 + sizeof(uint64_t) + l + 1;
802 }
803 }
804
805 dispatch_message(s, iovec, n, m, ucred, tv, priority);
806
807 for (j = 0; j < n; j++)
808 if (iovec[j].iov_base < buffer ||
809 (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
810 free(iovec[j].iov_base);
811 }
812
813 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
814 struct iovec iovec[15];
815 char *message = NULL, *syslog_priority = NULL;
816 unsigned n = 0;
817 size_t tag_len;
818 int priority;
819
820 assert(s);
821 assert(p);
822
823 priority = s->priority;
824
825 if (s->priority_prefix &&
826 l > 3 &&
827 p[0] == '<' &&
828 p[1] >= '0' && p[1] <= '7' &&
829 p[2] == '>') {
830
831 priority = p[1] - '0';
832 p += 3;
833 l -= 3;
834 }
835
836 if (l <= 0)
837 return 0;
838
839 if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
840 IOVEC_SET_STRING(iovec[n++], syslog_priority);
841
842 tag_len = s->tag ? strlen(s->tag) + 2: 0;
843 message = malloc(8 + tag_len + l);
844 if (message) {
845 memcpy(message, "MESSAGE=", 8);
846
847 if (s->tag) {
848 memcpy(message+8, s->tag, tag_len-2);
849 memcpy(message+8+tag_len-2, ": ", 2);
850 }
851
852 memcpy(message+8+tag_len, p, l);
853 iovec[n].iov_base = message;
854 iovec[n].iov_len = 8+tag_len+l;
855 n++;
856 }
857
858 dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
859
860 if (s->tee_console) {
861 int console;
862
863 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
864 if (console >= 0) {
865 n = 0;
866 if (s->tag) {
867 IOVEC_SET_STRING(iovec[n++], s->tag);
868 IOVEC_SET_STRING(iovec[n++], ": ");
869 }
870
871 iovec[n].iov_base = (void*) p;
872 iovec[n].iov_len = l;
873 n++;
874
875 IOVEC_SET_STRING(iovec[n++], (char*) "\n");
876
877 writev(console, iovec, n);
878 }
879 }
880
881 free(message);
882 free(syslog_priority);
883
884 return 0;
885 }
886
887 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
888 assert(s);
889 assert(p);
890
891 while (l > 0 && strchr(WHITESPACE, *p)) {
892 l--;
893 p++;
894 }
895
896 while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
897 l--;
898
899 switch (s->state) {
900
901 case STDOUT_STREAM_TAG:
902
903 if (l > 0) {
904 s->tag = strndup(p, l);
905 if (!s->tag) {
906 log_error("Out of memory");
907 return -EINVAL;
908 }
909 }
910
911 s->state = STDOUT_STREAM_PRIORITY;
912 return 0;
913
914 case STDOUT_STREAM_PRIORITY:
915 if (l != 1 || *p < '0' || *p > '7') {
916 log_warning("Failed to parse log priority line.");
917 return -EINVAL;
918 }
919
920 s->priority = *p - '0';
921 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
922 return 0;
923
924 case STDOUT_STREAM_PRIORITY_PREFIX:
925 if (l != 1 || *p < '0' || *p > '1') {
926 log_warning("Failed to parse priority prefix line.");
927 return -EINVAL;
928 }
929
930 s->priority_prefix = *p - '0';
931 s->state = STDOUT_STREAM_TEE_CONSOLE;
932 return 0;
933
934 case STDOUT_STREAM_TEE_CONSOLE:
935 if (l != 1 || *p < '0' || *p > '1') {
936 log_warning("Failed to parse tee to console line.");
937 return -EINVAL;
938 }
939
940 s->tee_console = *p - '0';
941 s->state = STDOUT_STREAM_RUNNING;
942 return 0;
943
944 case STDOUT_STREAM_RUNNING:
945 return stdout_stream_log(s, p, l);
946 }
947
948 assert_not_reached("Unknown stream state");
949 }
950
951 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
952 char *p;
953 size_t remaining;
954 int r;
955
956 assert(s);
957
958 p = s->buffer;
959 remaining = s->length;
960 for (;;) {
961 char *end;
962 size_t skip;
963
964 end = memchr(p, '\n', remaining);
965 if (!end) {
966 if (remaining >= LINE_MAX) {
967 end = p + LINE_MAX;
968 skip = LINE_MAX;
969 } else
970 break;
971 } else
972 skip = end - p + 1;
973
974 r = stdout_stream_line(s, p, end - p);
975 if (r < 0)
976 return r;
977
978 remaining -= skip;
979 p += skip;
980 }
981
982 if (force_flush && remaining > 0) {
983 r = stdout_stream_line(s, p, remaining);
984 if (r < 0)
985 return r;
986
987 p += remaining;
988 remaining = 0;
989 }
990
991 if (p > s->buffer) {
992 memmove(s->buffer, p, remaining);
993 s->length = remaining;
994 }
995
996 return 0;
997 }
998
999 static int stdout_stream_process(StdoutStream *s) {
1000 ssize_t l;
1001 int r;
1002
1003 assert(s);
1004
1005 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
1006 if (l < 0) {
1007
1008 if (errno == EAGAIN)
1009 return 0;
1010
1011 log_warning("Failed to read from stream: %m");
1012 return -errno;
1013 }
1014
1015 if (l == 0) {
1016 r = stdout_stream_scan(s, true);
1017 if (r < 0)
1018 return r;
1019
1020 return 0;
1021 }
1022
1023 s->length += l;
1024 r = stdout_stream_scan(s, false);
1025 if (r < 0)
1026 return r;
1027
1028 return 1;
1029
1030 }
1031
1032 static void stdout_stream_free(StdoutStream *s) {
1033 assert(s);
1034
1035 if (s->server) {
1036 assert(s->server->n_stdout_streams > 0);
1037 s->server->n_stdout_streams --;
1038 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1039 }
1040
1041 if (s->fd >= 0) {
1042 if (s->server)
1043 epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1044
1045 close_nointr_nofail(s->fd);
1046 }
1047
1048 free(s->tag);
1049 free(s);
1050 }
1051
1052 static int stdout_stream_new(Server *s) {
1053 StdoutStream *stream;
1054 int fd, r;
1055 socklen_t len;
1056 struct epoll_event ev;
1057
1058 assert(s);
1059
1060 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1061 if (fd < 0) {
1062 if (errno == EAGAIN)
1063 return 0;
1064
1065 log_error("Failed to accept stdout connection: %m");
1066 return -errno;
1067 }
1068
1069 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1070 log_warning("Too many stdout streams, refusing connection.");
1071 close_nointr_nofail(fd);
1072 return 0;
1073 }
1074
1075 stream = new0(StdoutStream, 1);
1076 if (!stream) {
1077 log_error("Out of memory.");
1078 close_nointr_nofail(fd);
1079 return -ENOMEM;
1080 }
1081
1082 stream->fd = fd;
1083
1084 len = sizeof(stream->ucred);
1085 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1086 log_error("Failed to determine peer credentials: %m");
1087 r = -errno;
1088 goto fail;
1089 }
1090
1091 if (shutdown(fd, SHUT_WR) < 0) {
1092 log_error("Failed to shutdown writing side of socket: %m");
1093 r = -errno;
1094 goto fail;
1095 }
1096
1097 zero(ev);
1098 ev.data.ptr = stream;
1099 ev.events = EPOLLIN;
1100 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1101 log_error("Failed to add stream to event loop: %m");
1102 r = -errno;
1103 goto fail;
1104 }
1105
1106 stream->server = s;
1107 LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1108 s->n_stdout_streams ++;
1109
1110 return 0;
1111
1112 fail:
1113 stdout_stream_free(stream);
1114 return r;
1115 }
1116
1117 static int system_journal_open(Server *s) {
1118 int r;
1119 char *fn;
1120 sd_id128_t machine;
1121 char ids[33];
1122
1123 r = sd_id128_get_machine(&machine);
1124 if (r < 0)
1125 return r;
1126
1127 sd_id128_to_string(machine, ids);
1128
1129 if (!s->system_journal) {
1130
1131 /* First try to create the machine path, but not the prefix */
1132 fn = strappend("/var/log/journal/", ids);
1133 if (!fn)
1134 return -ENOMEM;
1135 (void) mkdir(fn, 0755);
1136 free(fn);
1137
1138 /* The create the system journal file */
1139 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1140 if (!fn)
1141 return -ENOMEM;
1142
1143 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1144 free(fn);
1145
1146 if (r >= 0) {
1147 s->system_journal->metrics = s->metrics;
1148 s->system_journal->compress = s->compress;
1149
1150 fix_perms(s->system_journal, 0);
1151 } else if (r < 0) {
1152
1153 if (r == -ENOENT)
1154 r = 0;
1155 else {
1156 log_error("Failed to open system journal: %s", strerror(-r));
1157 return r;
1158 }
1159 }
1160 }
1161
1162 if (!s->runtime_journal) {
1163
1164 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1165 if (!fn)
1166 return -ENOMEM;
1167
1168 if (s->system_journal) {
1169
1170 /* Try to open the runtime journal, but only
1171 * if it already exists, so that we can flush
1172 * it into the system journal */
1173
1174 r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1175 free(fn);
1176
1177 if (r < 0) {
1178
1179 if (r == -ENOENT)
1180 r = 0;
1181 else {
1182 log_error("Failed to open runtime journal: %s", strerror(-r));
1183 return r;
1184 }
1185 }
1186
1187 } else {
1188
1189 /* OK, we really need the runtime journal, so create
1190 * it if necessary. */
1191
1192 (void) mkdir_parents(fn, 0755);
1193 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1194 free(fn);
1195
1196 if (r < 0) {
1197 log_error("Failed to open runtime journal: %s", strerror(-r));
1198 return r;
1199 }
1200 }
1201
1202 if (s->runtime_journal) {
1203 s->runtime_journal->metrics = s->metrics;
1204 s->runtime_journal->compress = s->compress;
1205
1206 fix_perms(s->runtime_journal, 0);
1207 }
1208 }
1209
1210 return r;
1211 }
1212
1213 static int server_flush_to_var(Server *s) {
1214 char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1215 Object *o = NULL;
1216 int r;
1217 sd_id128_t machine;
1218 sd_journal *j;
1219 usec_t ts;
1220
1221 assert(s);
1222
1223 if (!s->runtime_journal)
1224 return 0;
1225
1226 ts = now(CLOCK_MONOTONIC);
1227 if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1228 return 0;
1229
1230 s->var_available_timestamp = ts;
1231
1232 system_journal_open(s);
1233
1234 if (!s->system_journal)
1235 return 0;
1236
1237 r = sd_id128_get_machine(&machine);
1238 if (r < 0) {
1239 log_error("Failed to get machine id: %s", strerror(-r));
1240 return r;
1241 }
1242
1243 r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1244 if (r < 0) {
1245 log_error("Failed to read runtime journal: %s", strerror(-r));
1246 return r;
1247 }
1248
1249 SD_JOURNAL_FOREACH(j) {
1250 JournalFile *f;
1251
1252 f = j->current_file;
1253 assert(f && f->current_offset > 0);
1254
1255 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1256 if (r < 0) {
1257 log_error("Can't read entry: %s", strerror(-r));
1258 goto finish;
1259 }
1260
1261 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1262 if (r == -E2BIG) {
1263 log_info("Allocation limit reached.");
1264
1265 journal_file_post_change(s->system_journal);
1266 server_vacuum(s);
1267
1268 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1269 }
1270
1271 if (r < 0) {
1272 log_error("Can't write entry: %s", strerror(-r));
1273 goto finish;
1274 }
1275 }
1276
1277 finish:
1278 journal_file_post_change(s->system_journal);
1279
1280 journal_file_close(s->runtime_journal);
1281 s->runtime_journal = NULL;
1282
1283 if (r >= 0) {
1284 sd_id128_to_string(machine, path + 17);
1285 rm_rf(path, false, true, false);
1286 }
1287
1288 return r;
1289 }
1290
1291 static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1292 struct msghdr msghdr;
1293 struct iovec iovec;
1294 struct cmsghdr *cmsg;
1295 union {
1296 struct cmsghdr cmsghdr;
1297 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1298 CMSG_SPACE(sizeof(struct timeval))];
1299 } control;
1300 union sockaddr_union sa;
1301
1302 assert(s);
1303
1304 zero(msghdr);
1305
1306 zero(iovec);
1307 iovec.iov_base = (void*) buffer;
1308 iovec.iov_len = length;
1309 msghdr.msg_iov = &iovec;
1310 msghdr.msg_iovlen = 1;
1311
1312 zero(sa);
1313 sa.un.sun_family = AF_UNIX;
1314 strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1315 msghdr.msg_name = &sa;
1316 msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1317
1318 zero(control);
1319 msghdr.msg_control = &control;
1320 msghdr.msg_controllen = sizeof(control);
1321
1322 cmsg = CMSG_FIRSTHDR(&msghdr);
1323 cmsg->cmsg_level = SOL_SOCKET;
1324 cmsg->cmsg_type = SCM_CREDENTIALS;
1325 cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1326 memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1327 msghdr.msg_controllen = cmsg->cmsg_len;
1328
1329 /* Forward the syslog message we received via /dev/log to
1330 * /run/systemd/syslog. Unfortunately we currently can't set
1331 * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1332
1333 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1334 return;
1335
1336 if (errno == ESRCH) {
1337 struct ucred u;
1338
1339 /* Hmm, presumably the sender process vanished
1340 * by now, so let's fix it as good as we
1341 * can, and retry */
1342
1343 u = *ucred;
1344 u.pid = getpid();
1345 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1346
1347 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1348 return;
1349 }
1350
1351 log_debug("Failed to forward syslog message: %m");
1352 }
1353
1354 static int process_event(Server *s, struct epoll_event *ev) {
1355 assert(s);
1356
1357 if (ev->data.fd == s->signal_fd) {
1358 struct signalfd_siginfo sfsi;
1359 ssize_t n;
1360
1361 if (ev->events != EPOLLIN) {
1362 log_info("Got invalid event from epoll.");
1363 return -EIO;
1364 }
1365
1366 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1367 if (n != sizeof(sfsi)) {
1368
1369 if (n >= 0)
1370 return -EIO;
1371
1372 if (errno == EINTR || errno == EAGAIN)
1373 return 0;
1374
1375 return -errno;
1376 }
1377
1378 if (sfsi.ssi_signo == SIGUSR1) {
1379 server_flush_to_var(s);
1380 return 0;
1381 }
1382
1383 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1384 return 0;
1385
1386 } else if (ev->data.fd == s->native_fd ||
1387 ev->data.fd == s->syslog_fd) {
1388
1389 if (ev->events != EPOLLIN) {
1390 log_info("Got invalid event from epoll.");
1391 return -EIO;
1392 }
1393
1394 for (;;) {
1395 struct msghdr msghdr;
1396 struct iovec iovec;
1397 struct ucred *ucred = NULL;
1398 struct timeval *tv = NULL;
1399 struct cmsghdr *cmsg;
1400 union {
1401 struct cmsghdr cmsghdr;
1402 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1403 CMSG_SPACE(sizeof(struct timeval))];
1404 } control;
1405 ssize_t n;
1406 int v;
1407
1408 if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1409 log_error("SIOCINQ failed: %m");
1410 return -errno;
1411 }
1412
1413 if (v <= 0)
1414 return 1;
1415
1416 if (s->buffer_size < (size_t) v) {
1417 void *b;
1418 size_t l;
1419
1420 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1421 b = realloc(s->buffer, l+1);
1422
1423 if (!b) {
1424 log_error("Couldn't increase buffer.");
1425 return -ENOMEM;
1426 }
1427
1428 s->buffer_size = l;
1429 s->buffer = b;
1430 }
1431
1432 zero(iovec);
1433 iovec.iov_base = s->buffer;
1434 iovec.iov_len = s->buffer_size;
1435
1436 zero(control);
1437 zero(msghdr);
1438 msghdr.msg_iov = &iovec;
1439 msghdr.msg_iovlen = 1;
1440 msghdr.msg_control = &control;
1441 msghdr.msg_controllen = sizeof(control);
1442
1443 n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1444 if (n < 0) {
1445
1446 if (errno == EINTR || errno == EAGAIN)
1447 return 1;
1448
1449 log_error("recvmsg() failed: %m");
1450 return -errno;
1451 }
1452
1453 for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1454
1455 if (cmsg->cmsg_level == SOL_SOCKET &&
1456 cmsg->cmsg_type == SCM_CREDENTIALS &&
1457 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1458 ucred = (struct ucred*) CMSG_DATA(cmsg);
1459 else if (cmsg->cmsg_level == SOL_SOCKET &&
1460 cmsg->cmsg_type == SO_TIMESTAMP &&
1461 cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1462 tv = (struct timeval*) CMSG_DATA(cmsg);
1463 }
1464
1465 if (ev->data.fd == s->syslog_fd) {
1466 char *e;
1467
1468 e = memchr(s->buffer, '\n', n);
1469 if (e)
1470 *e = 0;
1471 else
1472 s->buffer[n] = 0;
1473
1474 forward_syslog(s, s->buffer, n, ucred, tv);
1475 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1476 } else
1477 process_native_message(s, s->buffer, n, ucred, tv);
1478 }
1479
1480 return 1;
1481
1482 } else if (ev->data.fd == s->stdout_fd) {
1483
1484 if (ev->events != EPOLLIN) {
1485 log_info("Got invalid event from epoll.");
1486 return -EIO;
1487 }
1488
1489 stdout_stream_new(s);
1490 return 1;
1491
1492 } else {
1493 StdoutStream *stream;
1494
1495 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1496 log_info("Got invalid event from epoll.");
1497 return -EIO;
1498 }
1499
1500 /* If it is none of the well-known fds, it must be an
1501 * stdout stream fd. Note that this is a bit ugly here
1502 * (since we rely that none of the well-known fds
1503 * could be interpreted as pointer), but nonetheless
1504 * safe, since the well-known fds would never get an
1505 * fd > 4096, i.e. beyond the first memory page */
1506
1507 stream = ev->data.ptr;
1508
1509 if (stdout_stream_process(stream) <= 0)
1510 stdout_stream_free(stream);
1511
1512 return 1;
1513 }
1514
1515 log_error("Unknown event.");
1516 return 0;
1517 }
1518
1519 static int open_syslog_socket(Server *s) {
1520 union sockaddr_union sa;
1521 int one, r;
1522 struct epoll_event ev;
1523 struct timeval tv;
1524
1525 assert(s);
1526
1527 if (s->syslog_fd < 0) {
1528
1529 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1530 if (s->syslog_fd < 0) {
1531 log_error("socket() failed: %m");
1532 return -errno;
1533 }
1534
1535 zero(sa);
1536 sa.un.sun_family = AF_UNIX;
1537 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
1538
1539 unlink(sa.un.sun_path);
1540
1541 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1542 if (r < 0) {
1543 log_error("bind() failed: %m");
1544 return -errno;
1545 }
1546
1547 chmod(sa.un.sun_path, 0666);
1548 }
1549
1550 one = 1;
1551 r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1552 if (r < 0) {
1553 log_error("SO_PASSCRED failed: %m");
1554 return -errno;
1555 }
1556
1557 one = 1;
1558 r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1559 if (r < 0) {
1560 log_error("SO_TIMESTAMP failed: %m");
1561 return -errno;
1562 }
1563
1564 /* Since we use the same socket for forwarding this to some
1565 * other syslog implementation, make sure we don't hang
1566 * forever */
1567 timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1568 if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1569 log_error("SO_SNDTIMEO failed: %m");
1570 return -errno;
1571 }
1572
1573 zero(ev);
1574 ev.events = EPOLLIN;
1575 ev.data.fd = s->syslog_fd;
1576 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1577 log_error("Failed to add syslog server fd to epoll object: %m");
1578 return -errno;
1579 }
1580
1581 return 0;
1582 }
1583
1584 static int open_native_socket(Server*s) {
1585 union sockaddr_union sa;
1586 int one, r;
1587 struct epoll_event ev;
1588
1589 assert(s);
1590
1591 if (s->native_fd < 0) {
1592
1593 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1594 if (s->native_fd < 0) {
1595 log_error("socket() failed: %m");
1596 return -errno;
1597 }
1598
1599 zero(sa);
1600 sa.un.sun_family = AF_UNIX;
1601 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1602
1603 unlink(sa.un.sun_path);
1604
1605 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1606 if (r < 0) {
1607 log_error("bind() failed: %m");
1608 return -errno;
1609 }
1610
1611 chmod(sa.un.sun_path, 0666);
1612 }
1613
1614 one = 1;
1615 r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1616 if (r < 0) {
1617 log_error("SO_PASSCRED failed: %m");
1618 return -errno;
1619 }
1620
1621 one = 1;
1622 r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1623 if (r < 0) {
1624 log_error("SO_TIMESTAMP failed: %m");
1625 return -errno;
1626 }
1627
1628 zero(ev);
1629 ev.events = EPOLLIN;
1630 ev.data.fd = s->native_fd;
1631 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1632 log_error("Failed to add native server fd to epoll object: %m");
1633 return -errno;
1634 }
1635
1636 return 0;
1637 }
1638
1639 static int open_stdout_socket(Server *s) {
1640 union sockaddr_union sa;
1641 int r;
1642 struct epoll_event ev;
1643
1644 assert(s);
1645
1646 if (s->stdout_fd < 0) {
1647
1648 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1649 if (s->stdout_fd < 0) {
1650 log_error("socket() failed: %m");
1651 return -errno;
1652 }
1653
1654 zero(sa);
1655 sa.un.sun_family = AF_UNIX;
1656 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1657
1658 unlink(sa.un.sun_path);
1659
1660 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1661 if (r < 0) {
1662 log_error("bind() failed: %m");
1663 return -errno;
1664 }
1665
1666 chmod(sa.un.sun_path, 0666);
1667
1668 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1669 log_error("liste() failed: %m");
1670 return -errno;
1671 }
1672 }
1673
1674 zero(ev);
1675 ev.events = EPOLLIN;
1676 ev.data.fd = s->stdout_fd;
1677 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1678 log_error("Failed to add stdout server fd to epoll object: %m");
1679 return -errno;
1680 }
1681
1682 return 0;
1683 }
1684
1685 static int open_signalfd(Server *s) {
1686 sigset_t mask;
1687 struct epoll_event ev;
1688
1689 assert(s);
1690
1691 assert_se(sigemptyset(&mask) == 0);
1692 sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1693 assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1694
1695 s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1696 if (s->signal_fd < 0) {
1697 log_error("signalfd(): %m");
1698 return -errno;
1699 }
1700
1701 zero(ev);
1702 ev.events = EPOLLIN;
1703 ev.data.fd = s->signal_fd;
1704
1705 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1706 log_error("epoll_ctl(): %m");
1707 return -errno;
1708 }
1709
1710 return 0;
1711 }
1712
1713 static int server_init(Server *s) {
1714 int n, r, fd;
1715
1716 assert(s);
1717
1718 zero(*s);
1719 s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1720 s->metrics.max_size = DEFAULT_MAX_SIZE;
1721 s->metrics.min_size = DEFAULT_MIN_SIZE;
1722 s->metrics.keep_free = DEFAULT_KEEP_FREE;
1723 s->metrics.max_use = DEFAULT_MAX_USE;
1724 s->compress = true;
1725
1726 s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1727 if (!s->user_journals) {
1728 log_error("Out of memory.");
1729 return -ENOMEM;
1730 }
1731
1732 s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1733 if (s->epoll_fd < 0) {
1734 log_error("Failed to create epoll object: %m");
1735 return -errno;
1736 }
1737
1738 n = sd_listen_fds(true);
1739 if (n < 0) {
1740 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1741 return n;
1742 }
1743
1744 for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1745
1746 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1747
1748 if (s->native_fd >= 0) {
1749 log_error("Too many native sockets passed.");
1750 return -EINVAL;
1751 }
1752
1753 s->native_fd = fd;
1754
1755 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1756
1757 if (s->stdout_fd >= 0) {
1758 log_error("Too many stdout sockets passed.");
1759 return -EINVAL;
1760 }
1761
1762 s->stdout_fd = fd;
1763
1764 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1765
1766 if (s->syslog_fd >= 0) {
1767 log_error("Too many /dev/log sockets passed.");
1768 return -EINVAL;
1769 }
1770
1771 s->syslog_fd = fd;
1772
1773 } else {
1774 log_error("Unknown socket passed.");
1775 return -EINVAL;
1776 }
1777 }
1778
1779 r = open_syslog_socket(s);
1780 if (r < 0)
1781 return r;
1782
1783 r = open_native_socket(s);
1784 if (r < 0)
1785 return r;
1786
1787 r = open_stdout_socket(s);
1788 if (r < 0)
1789 return r;
1790
1791 r = system_journal_open(s);
1792 if (r < 0)
1793 return r;
1794
1795 r = open_signalfd(s);
1796 if (r < 0)
1797 return r;
1798
1799 s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1800 if (!s->rate_limit)
1801 return -ENOMEM;
1802
1803 return 0;
1804 }
1805
1806 static void server_done(Server *s) {
1807 JournalFile *f;
1808 assert(s);
1809
1810 while (s->stdout_streams)
1811 stdout_stream_free(s->stdout_streams);
1812
1813 if (s->system_journal)
1814 journal_file_close(s->system_journal);
1815
1816 if (s->runtime_journal)
1817 journal_file_close(s->runtime_journal);
1818
1819 while ((f = hashmap_steal_first(s->user_journals)))
1820 journal_file_close(f);
1821
1822 hashmap_free(s->user_journals);
1823
1824 if (s->epoll_fd >= 0)
1825 close_nointr_nofail(s->epoll_fd);
1826
1827 if (s->signal_fd >= 0)
1828 close_nointr_nofail(s->signal_fd);
1829
1830 if (s->syslog_fd >= 0)
1831 close_nointr_nofail(s->syslog_fd);
1832
1833 if (s->native_fd >= 0)
1834 close_nointr_nofail(s->native_fd);
1835
1836 if (s->stdout_fd >= 0)
1837 close_nointr_nofail(s->stdout_fd);
1838
1839 if (s->rate_limit)
1840 journal_rate_limit_free(s->rate_limit);
1841
1842 free(s->buffer);
1843 }
1844
1845 int main(int argc, char *argv[]) {
1846 Server server;
1847 int r;
1848
1849 /* if (getppid() != 1) { */
1850 /* log_error("This program should be invoked by init only."); */
1851 /* return EXIT_FAILURE; */
1852 /* } */
1853
1854 if (argc > 1) {
1855 log_error("This program does not take arguments.");
1856 return EXIT_FAILURE;
1857 }
1858
1859 log_set_target(LOG_TARGET_CONSOLE);
1860 log_parse_environment();
1861 log_open();
1862
1863 umask(0022);
1864
1865 r = server_init(&server);
1866 if (r < 0)
1867 goto finish;
1868
1869 log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1870
1871 sd_notify(false,
1872 "READY=1\n"
1873 "STATUS=Processing requests...");
1874
1875 server_vacuum(&server);
1876 server_flush_to_var(&server);
1877
1878 for (;;) {
1879 struct epoll_event event;
1880
1881 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1882 if (r < 0) {
1883
1884 if (errno == EINTR)
1885 continue;
1886
1887 log_error("epoll_wait() failed: %m");
1888 r = -errno;
1889 goto finish;
1890 } else if (r == 0)
1891 break;
1892
1893 r = process_event(&server, &event);
1894 if (r < 0)
1895 goto finish;
1896 else if (r == 0)
1897 break;
1898 }
1899
1900 log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1901
1902 finish:
1903 sd_notify(false,
1904 "STATUS=Shutting down...");
1905
1906 server_done(&server);
1907
1908 return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1909 }