]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal/journald.c
journal: when increasing window, make sure to use the increased window
[thirdparty/systemd.git] / src / journal / journald.c
CommitLineData
87d2c1ff
LP
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include <sys/epoll.h>
23#include <sys/socket.h>
24#include <errno.h>
25#include <sys/signalfd.h>
26#include <unistd.h>
27#include <fcntl.h>
f4b47811
LP
28#include <sys/acl.h>
29#include <acl/libacl.h>
7f3e6257
LP
30#include <stddef.h>
31#include <sys/ioctl.h>
32#include <linux/sockios.h>
6e409ce1 33#include <sys/statvfs.h>
87d2c1ff
LP
34
35#include "hashmap.h"
cec736d2 36#include "journal-file.h"
87d2c1ff
LP
37#include "sd-daemon.h"
38#include "socket-util.h"
f4b47811 39#include "acl-util.h"
69e5d42d 40#include "cgroup-util.h"
fe652127 41#include "list.h"
6e409ce1 42#include "journal-rate-limit.h"
cf244689 43#include "sd-journal.h"
85d83bf4 44#include "sd-login.h"
cf244689 45#include "journal-internal.h"
87d2c1ff 46
cab8ac60 47#define USER_JOURNALS_MAX 1024
fe652127
LP
48#define STDOUT_STREAMS_MAX 4096
49
de97b26a
LP
50#define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
51#define DEFAULT_RATE_LIMIT_BURST 200
52
9cfb57c9
LP
53#define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
54
54a7b863
LP
55#define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
56
8b18eb67
LP
57#define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
58
fe652127 59typedef struct StdoutStream StdoutStream;
cab8ac60 60
87d2c1ff 61typedef struct Server {
87d2c1ff
LP
62 int epoll_fd;
63 int signal_fd;
7f3e6257
LP
64 int syslog_fd;
65 int native_fd;
fe652127 66 int stdout_fd;
87d2c1ff 67
f4b47811 68 JournalFile *runtime_journal;
87d2c1ff
LP
69 JournalFile *system_journal;
70 Hashmap *user_journals;
c2373f84
LP
71
72 uint64_t seqnum;
7f3e6257
LP
73
74 char *buffer;
75 size_t buffer_size;
bc85bfee 76
6e409ce1
LP
77 JournalRateLimit *rate_limit;
78
babfc091
LP
79 JournalMetrics runtime_metrics;
80 JournalMetrics system_metrics;
81
807e17f0 82 bool compress;
fe652127 83
9cfb57c9
LP
84 uint64_t cached_available_space;
85 usec_t cached_available_space_timestamp;
86
54a7b863
LP
87 uint64_t var_available_timestamp;
88
fe652127
LP
89 LIST_HEAD(StdoutStream, stdout_streams);
90 unsigned n_stdout_streams;
87d2c1ff
LP
91} Server;
92
fe652127
LP
93typedef enum StdoutStreamState {
94 STDOUT_STREAM_TAG,
95 STDOUT_STREAM_PRIORITY,
96 STDOUT_STREAM_PRIORITY_PREFIX,
97 STDOUT_STREAM_TEE_CONSOLE,
98 STDOUT_STREAM_RUNNING
99} StdoutStreamState;
100
101struct StdoutStream {
102 Server *server;
103 StdoutStreamState state;
104
105 int fd;
106
107 struct ucred ucred;
108
109 char *tag;
110 int priority;
111 bool priority_prefix:1;
112 bool tee_console:1;
113
114 char buffer[LINE_MAX+1];
115 size_t length;
116
117 LIST_FIELDS(StdoutStream, stdout_stream);
118};
119
cf244689
LP
120static int server_flush_to_var(Server *s);
121
6e409ce1 122static uint64_t available_space(Server *s) {
babfc091 123 char ids[33], *p;
6e409ce1 124 const char *f;
babfc091 125 sd_id128_t machine;
6e409ce1
LP
126 struct statvfs ss;
127 uint64_t sum = 0, avail = 0, ss_avail = 0;
128 int r;
129 DIR *d;
babfc091
LP
130 usec_t ts;
131 JournalMetrics *m;
132
133 ts = now(CLOCK_MONOTONIC);
9cfb57c9
LP
134
135 if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
136 return s->cached_available_space;
6e409ce1
LP
137
138 r = sd_id128_get_machine(&machine);
139 if (r < 0)
140 return 0;
141
babfc091 142 if (s->system_journal) {
6e409ce1 143 f = "/var/log/journal/";
babfc091
LP
144 m = &s->system_metrics;
145 } else {
6e409ce1 146 f = "/run/log/journal/";
babfc091
LP
147 m = &s->runtime_metrics;
148 }
149
150 assert(m);
6e409ce1
LP
151
152 p = strappend(f, sd_id128_to_string(machine, ids));
153 if (!p)
154 return 0;
155
156 d = opendir(p);
157 free(p);
158
159 if (!d)
160 return 0;
161
162 if (fstatvfs(dirfd(d), &ss) < 0)
163 goto finish;
164
165 for (;;) {
166 struct stat st;
167 struct dirent buf, *de;
168 int k;
169
170 k = readdir_r(d, &buf, &de);
171 if (k != 0) {
172 r = -k;
173 goto finish;
174 }
175
176 if (!de)
177 break;
178
179 if (!dirent_is_file_with_suffix(de, ".journal"))
180 continue;
181
182 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
183 continue;
184
185 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
186 }
187
babfc091 188 avail = sum >= m->max_use ? 0 : m->max_use - sum;
6e409ce1
LP
189
190 ss_avail = ss.f_bsize * ss.f_bavail;
191
babfc091 192 ss_avail = ss_avail < m->keep_free ? 0 : ss_avail - m->keep_free;
6e409ce1
LP
193
194 if (ss_avail < avail)
195 avail = ss_avail;
196
9cfb57c9
LP
197 s->cached_available_space = avail;
198 s->cached_available_space_timestamp = ts;
199
6e409ce1
LP
200finish:
201 closedir(d);
202
203 return avail;
204}
205
f4b47811
LP
206static void fix_perms(JournalFile *f, uid_t uid) {
207 acl_t acl;
208 acl_entry_t entry;
209 acl_permset_t permset;
210 int r;
211
212 assert(f);
213
214 r = fchmod_and_fchown(f->fd, 0640, 0, 0);
215 if (r < 0)
216 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
217
218 if (uid <= 0)
219 return;
220
221 acl = acl_get_fd(f->fd);
222 if (!acl) {
223 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
224 return;
225 }
226
227 r = acl_find_uid(acl, uid, &entry);
228 if (r <= 0) {
229
230 if (acl_create_entry(&acl, &entry) < 0 ||
231 acl_set_tag_type(entry, ACL_USER) < 0 ||
232 acl_set_qualifier(entry, &uid) < 0) {
233 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
234 goto finish;
235 }
236 }
237
238 if (acl_get_permset(entry, &permset) < 0 ||
239 acl_add_perm(permset, ACL_READ) < 0 ||
240 acl_calc_mask(&acl) < 0) {
241 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
242 goto finish;
243 }
244
245 if (acl_set_fd(f->fd, acl) < 0)
246 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
247
248finish:
249 acl_free(acl);
250}
251
252static JournalFile* find_journal(Server *s, uid_t uid) {
253 char *p;
254 int r;
255 JournalFile *f;
3fbf9cbb
LP
256 char ids[33];
257 sd_id128_t machine;
f4b47811
LP
258
259 assert(s);
260
cf244689
LP
261 /* We split up user logs only on /var, not on /run. If the
262 * runtime file is open, we write to it exclusively, in order
263 * to guarantee proper order as soon as we flush /run to
264 * /var and close the runtime file. */
265
266 if (s->runtime_journal)
f4b47811
LP
267 return s->runtime_journal;
268
269 if (uid <= 0)
270 return s->system_journal;
271
3fbf9cbb
LP
272 r = sd_id128_get_machine(&machine);
273 if (r < 0)
274 return s->system_journal;
275
f4b47811
LP
276 f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
277 if (f)
278 return f;
279
3fbf9cbb 280 if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
f4b47811
LP
281 return s->system_journal;
282
cab8ac60
LP
283 while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
284 /* Too many open? Then let's close one */
285 f = hashmap_steal_first(s->user_journals);
286 assert(f);
287 journal_file_close(f);
288 }
289
c2373f84 290 r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
f4b47811
LP
291 free(p);
292
293 if (r < 0)
294 return s->system_journal;
295
296 fix_perms(f, uid);
babfc091 297 f->metrics = s->system_metrics;
807e17f0 298 f->compress = s->compress;
f4b47811
LP
299
300 r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
301 if (r < 0) {
302 journal_file_close(f);
303 return s->system_journal;
304 }
305
306 return f;
307}
308
bc85bfee
LP
309static void server_vacuum(Server *s) {
310 Iterator i;
311 void *k;
312 char *p;
313 char ids[33];
314 sd_id128_t machine;
315 int r;
316 JournalFile *f;
317
318 log_info("Rotating...");
319
320 if (s->runtime_journal) {
321 r = journal_file_rotate(&s->runtime_journal);
322 if (r < 0)
323 log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
324 }
325
326 if (s->system_journal) {
327 r = journal_file_rotate(&s->system_journal);
328 if (r < 0)
329 log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
330 }
331
332 HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
333 r = journal_file_rotate(&f);
334 if (r < 0)
335 log_error("Failed to rotate %s: %s", f->path, strerror(-r));
336 else
337 hashmap_replace(s->user_journals, k, f);
338 }
339
340 log_info("Vacuuming...");
341
342 r = sd_id128_get_machine(&machine);
343 if (r < 0) {
344 log_error("Failed to get machine ID: %s", strerror(-r));
345 return;
346 }
347
babfc091 348 sd_id128_to_string(machine, ids);
bc85bfee 349
babfc091
LP
350 if (s->system_journal) {
351 if (asprintf(&p, "/var/log/journal/%s", ids) < 0) {
352 log_error("Out of memory.");
353 return;
354 }
bc85bfee 355
babfc091
LP
356 r = journal_directory_vacuum(p, s->system_metrics.max_use, s->system_metrics.keep_free);
357 if (r < 0 && r != -ENOENT)
358 log_error("Failed to vacuum %s: %s", p, strerror(-r));
359 free(p);
bc85bfee
LP
360 }
361
babfc091
LP
362
363 if (s->runtime_journal) {
364 if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
365 log_error("Out of memory.");
366 return;
367 }
368
369 r = journal_directory_vacuum(p, s->runtime_metrics.max_use, s->runtime_metrics.keep_free);
370 if (r < 0 && r != -ENOENT)
371 log_error("Failed to vacuum %s: %s", p, strerror(-r));
372 free(p);
373 }
9cfb57c9
LP
374
375 s->cached_available_space_timestamp = 0;
bc85bfee
LP
376}
377
6e409ce1
LP
378static char *shortened_cgroup_path(pid_t pid) {
379 int r;
380 char *process_path, *init_path, *path;
381
382 assert(pid > 0);
383
384 r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
385 if (r < 0)
386 return NULL;
387
388 r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
389 if (r < 0) {
390 free(process_path);
391 return NULL;
392 }
393
bad75c27
LP
394 if (endswith(init_path, "/system"))
395 init_path[strlen(init_path) - 7] = 0;
396 else if (streq(init_path, "/"))
6e409ce1
LP
397 init_path[0] = 0;
398
783d2675
LP
399 if (startswith(process_path, init_path)) {
400 char *p;
401
402 p = strdup(process_path + strlen(init_path));
403 if (!p) {
404 free(process_path);
405 free(init_path);
406 return NULL;
407 }
408 path = p;
409 } else {
6e409ce1 410 path = process_path;
783d2675
LP
411 process_path = NULL;
412 }
6e409ce1 413
783d2675 414 free(process_path);
6e409ce1
LP
415 free(init_path);
416
417 return path;
418}
419
420static void dispatch_message_real(Server *s,
421 struct iovec *iovec, unsigned n, unsigned m,
422 struct ucred *ucred,
423 struct timeval *tv) {
424
7f3e6257 425 char *pid = NULL, *uid = NULL, *gid = NULL,
87d2c1ff
LP
426 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
427 *comm = NULL, *cmdline = NULL, *hostname = NULL,
428 *audit_session = NULL, *audit_loginuid = NULL,
85d83bf4
LP
429 *exe = NULL, *cgroup = NULL, *session = NULL,
430 *owner_uid = NULL, *service = NULL;
7f3e6257 431
87d2c1ff
LP
432 char idbuf[33];
433 sd_id128_t id;
434 int r;
435 char *t;
de190aef 436 uid_t loginuid = 0, realuid = 0;
f4b47811 437 JournalFile *f;
bc85bfee 438 bool vacuumed = false;
87d2c1ff 439
7f3e6257 440 assert(s);
6e409ce1
LP
441 assert(iovec);
442 assert(n > 0);
85d83bf4 443 assert(n + 16 <= m);
87d2c1ff
LP
444
445 if (ucred) {
85d83bf4
LP
446 uint32_t audit;
447 uid_t owner;
87d2c1ff 448
de190aef
LP
449 realuid = ucred->uid;
450
451 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
87d2c1ff
LP
452 IOVEC_SET_STRING(iovec[n++], pid);
453
de190aef 454 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
87d2c1ff
LP
455 IOVEC_SET_STRING(iovec[n++], uid);
456
de190aef 457 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
87d2c1ff
LP
458 IOVEC_SET_STRING(iovec[n++], gid);
459
460 r = get_process_comm(ucred->pid, &t);
461 if (r >= 0) {
de190aef 462 comm = strappend("_COMM=", t);
85d83bf4
LP
463 free(t);
464
87d2c1ff
LP
465 if (comm)
466 IOVEC_SET_STRING(iovec[n++], comm);
87d2c1ff
LP
467 }
468
469 r = get_process_exe(ucred->pid, &t);
470 if (r >= 0) {
de190aef 471 exe = strappend("_EXE=", t);
85d83bf4
LP
472 free(t);
473
87d2c1ff
LP
474 if (comm)
475 IOVEC_SET_STRING(iovec[n++], exe);
87d2c1ff
LP
476 }
477
478 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
479 if (r >= 0) {
de190aef 480 cmdline = strappend("_CMDLINE=", t);
85d83bf4
LP
481 free(t);
482
87d2c1ff
LP
483 if (cmdline)
484 IOVEC_SET_STRING(iovec[n++], cmdline);
87d2c1ff
LP
485 }
486
85d83bf4 487 r = audit_session_from_pid(ucred->pid, &audit);
87d2c1ff 488 if (r >= 0)
85d83bf4 489 if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) audit) >= 0)
87d2c1ff
LP
490 IOVEC_SET_STRING(iovec[n++], audit_session);
491
492 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
493 if (r >= 0)
de190aef 494 if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
87d2c1ff 495 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
69e5d42d 496
85d83bf4
LP
497 t = shortened_cgroup_path(ucred->pid);
498 if (t) {
499 cgroup = strappend("_SYSTEMD_CGROUP=", t);
500 free(t);
501
69e5d42d
LP
502 if (cgroup)
503 IOVEC_SET_STRING(iovec[n++], cgroup);
85d83bf4
LP
504 }
505
506 if (sd_pid_get_session(ucred->pid, &t) >= 0) {
507 session = strappend("_SYSTEMD_SESSION=", t);
508 free(t);
509
510 if (session)
511 IOVEC_SET_STRING(iovec[n++], session);
512 }
6e409ce1 513
85d83bf4
LP
514 if (sd_pid_get_service(ucred->pid, &t) >= 0) {
515 service = strappend("_SYSTEMD_SERVICE=", t);
516 free(t);
517
518 if (service)
519 IOVEC_SET_STRING(iovec[n++], service);
69e5d42d 520 }
85d83bf4
LP
521
522 if (sd_pid_get_owner_uid(ucred->uid, &owner) >= 0)
523 if (asprintf(&owner_uid, "_SYSTEMD_OWNER_UID=%lu", (unsigned long) owner) >= 0)
524 IOVEC_SET_STRING(iovec[n++], owner_uid);
87d2c1ff
LP
525 }
526
527 if (tv) {
de190aef 528 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
87d2c1ff
LP
529 (unsigned long long) timeval_load(tv)) >= 0)
530 IOVEC_SET_STRING(iovec[n++], source_time);
531 }
532
ed49ef3f
LP
533 /* Note that strictly speaking storing the boot id here is
534 * redundant since the entry includes this in-line
535 * anyway. However, we need this indexed, too. */
87d2c1ff
LP
536 r = sd_id128_get_boot(&id);
537 if (r >= 0)
de190aef 538 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
87d2c1ff
LP
539 IOVEC_SET_STRING(iovec[n++], boot_id);
540
541 r = sd_id128_get_machine(&id);
542 if (r >= 0)
de190aef 543 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
87d2c1ff
LP
544 IOVEC_SET_STRING(iovec[n++], machine_id);
545
546 t = gethostname_malloc();
547 if (t) {
de190aef 548 hostname = strappend("_HOSTNAME=", t);
85d83bf4 549 free(t);
87d2c1ff
LP
550 if (hostname)
551 IOVEC_SET_STRING(iovec[n++], hostname);
87d2c1ff
LP
552 }
553
7f3e6257
LP
554 assert(n <= m);
555
cf244689
LP
556 server_flush_to_var(s);
557
bc85bfee 558retry:
de190aef 559 f = find_journal(s, realuid == 0 ? 0 : loginuid);
f4b47811
LP
560 if (!f)
561 log_warning("Dropping message, as we can't find a place to store the data.");
562 else {
c2373f84 563 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
87d2c1ff 564
bc85bfee
LP
565 if (r == -E2BIG && !vacuumed) {
566 log_info("Allocation limit reached.");
567
568 server_vacuum(s);
569 vacuumed = true;
570
571 log_info("Retrying write.");
572 goto retry;
573 }
574
f4b47811
LP
575 if (r < 0)
576 log_error("Failed to write entry, ignoring: %s", strerror(-r));
577 }
87d2c1ff 578
87d2c1ff
LP
579 free(pid);
580 free(uid);
581 free(gid);
582 free(comm);
69e5d42d 583 free(exe);
87d2c1ff
LP
584 free(cmdline);
585 free(source_time);
586 free(boot_id);
587 free(machine_id);
588 free(hostname);
589 free(audit_session);
590 free(audit_loginuid);
7f3e6257 591 free(cgroup);
85d83bf4
LP
592 free(session);
593 free(owner_uid);
594 free(service);
7f3e6257
LP
595}
596
6e409ce1
LP
597static void dispatch_message(Server *s,
598 struct iovec *iovec, unsigned n, unsigned m,
599 struct ucred *ucred,
600 struct timeval *tv,
601 int priority) {
602 int rl;
783d2675 603 char *path = NULL, *c;
6e409ce1
LP
604
605 assert(s);
606 assert(iovec || n == 0);
607
608 if (n == 0)
609 return;
610
611 if (!ucred)
612 goto finish;
613
614 path = shortened_cgroup_path(ucred->pid);
615 if (!path)
616 goto finish;
617
618 /* example: /user/lennart/3/foobar
619 * /system/dbus.service/foobar
620 *
621 * So let's cut of everything past the third /, since that is
622 * wher user directories start */
623
624 c = strchr(path, '/');
625 if (c) {
626 c = strchr(c+1, '/');
627 if (c) {
628 c = strchr(c+1, '/');
629 if (c)
630 *c = 0;
631 }
632 }
633
634 rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
635
636 if (rl == 0) {
637 free(path);
638 return;
639 }
640
641 if (rl > 1) {
642 int j = 0;
643 char suppress_message[LINE_MAX];
85d83bf4 644 struct iovec suppress_iovec[18];
6e409ce1
LP
645
646 /* Write a suppression message if we suppressed something */
647
648 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
649 char_array_0(suppress_message);
650
651 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
652 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
653
654 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
655 }
656
657 free(path);
658
659finish:
660 dispatch_message_real(s, iovec, n, m, ucred, tv);
661}
662
7f3e6257
LP
663static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
664 char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
85d83bf4 665 struct iovec iovec[19];
7f3e6257
LP
666 unsigned n = 0;
667 int priority = LOG_USER | LOG_INFO;
668
669 assert(s);
670 assert(buf);
671
672 parse_syslog_priority((char**) &buf, &priority);
673 skip_syslog_date((char**) &buf);
674
675 if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
676 IOVEC_SET_STRING(iovec[n++], syslog_priority);
677
678 if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
679 IOVEC_SET_STRING(iovec[n++], syslog_facility);
680
681 message = strappend("MESSAGE=", buf);
682 if (message)
683 IOVEC_SET_STRING(iovec[n++], message);
684
6e409ce1 685 dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
7f3e6257
LP
686
687 free(message);
87d2c1ff
LP
688 free(syslog_facility);
689 free(syslog_priority);
7f3e6257
LP
690}
691
6ad1d1c3
LP
692static bool valid_user_field(const char *p, size_t l) {
693 const char *a;
694
695 /* We kinda enforce POSIX syntax recommendations for
696 environment variables here, but make a couple of additional
697 requirements.
698
699 http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
700
701 /* No empty field names */
702 if (l <= 0)
703 return false;
704
705 /* Don't allow names longer than 64 chars */
706 if (l > 64)
707 return false;
708
709 /* Variables starting with an underscore are protected */
710 if (p[0] == '_')
711 return false;
712
713 /* Don't allow digits as first character */
714 if (p[0] >= '0' && p[0] <= '9')
715 return false;
716
717 /* Only allow A-Z0-9 and '_' */
718 for (a = p; a < p + l; a++)
719 if (!((*a >= 'A' && *a <= 'Z') ||
720 (*a >= '0' && *a <= '9') ||
721 *a == '_'))
722 return false;
723
724 return true;
725}
726
7f3e6257
LP
727static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
728 struct iovec *iovec = NULL;
729 unsigned n = 0, m = 0, j;
730 const char *p;
731 size_t remaining;
6e409ce1 732 int priority = LOG_INFO;
7f3e6257
LP
733
734 assert(s);
735 assert(buffer || n == 0);
736
737 p = buffer;
738 remaining = buffer_size;
739
740 while (remaining > 0) {
741 const char *e, *q;
742
743 e = memchr(p, '\n', remaining);
744
745 if (!e) {
746 /* Trailing noise, let's ignore it, and flush what we collected */
747 log_debug("Received message with trailing noise, ignoring.");
748 break;
749 }
750
751 if (e == p) {
752 /* Entry separator */
6e409ce1 753 dispatch_message(s, iovec, n, m, ucred, tv, priority);
7f3e6257 754 n = 0;
6e409ce1 755 priority = LOG_INFO;
7f3e6257
LP
756
757 p++;
758 remaining--;
759 continue;
760 }
761
6ad1d1c3
LP
762 if (*p == '.' || *p == '#') {
763 /* Ignore control commands for now, and
764 * comments too. */
7f3e6257
LP
765 remaining -= (e - p) + 1;
766 p = e + 1;
767 continue;
768 }
769
770 /* A property follows */
771
85d83bf4 772 if (n+16 >= m) {
7f3e6257
LP
773 struct iovec *c;
774 unsigned u;
775
85d83bf4 776 u = MAX((n+16U) * 2U, 4U);
7f3e6257
LP
777 c = realloc(iovec, u * sizeof(struct iovec));
778 if (!c) {
779 log_error("Out of memory");
780 break;
781 }
782
783 iovec = c;
784 m = u;
785 }
786
787 q = memchr(p, '=', e - p);
788 if (q) {
6ad1d1c3 789 if (valid_user_field(p, q - p)) {
2b0ba69b
LP
790 /* If the field name starts with an
791 * underscore, skip the variable,
792 * since that indidates a trusted
793 * field */
794 iovec[n].iov_base = (char*) p;
795 iovec[n].iov_len = e - p;
796 n++;
6e409ce1
LP
797
798 /* We need to determine the priority
799 * of this entry for the rate limiting
800 * logic */
801 if (e - p == 10 &&
802 memcmp(p, "PRIORITY=", 10) == 0 &&
803 p[10] >= '0' &&
804 p[10] <= '9')
805 priority = p[10] - '0';
2b0ba69b 806 }
7f3e6257
LP
807
808 remaining -= (e - p) + 1;
809 p = e + 1;
810 continue;
811 } else {
812 uint64_t l;
813 char *k;
814
815 if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
816 log_debug("Failed to parse message, ignoring.");
817 break;
818 }
819
820 memcpy(&l, e + 1, sizeof(uint64_t));
821 l = le64toh(l);
822
823 if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
824 e[1+sizeof(uint64_t)+l] != '\n') {
825 log_debug("Failed to parse message, ignoring.");
826 break;
827 }
828
829 k = malloc((e - p) + 1 + l);
830 if (!k) {
831 log_error("Out of memory");
832 break;
833 }
834
835 memcpy(k, p, e - p);
836 k[e - p] = '=';
837 memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
838
6ad1d1c3 839 if (valid_user_field(p, e - p)) {
2b0ba69b
LP
840 iovec[n].iov_base = k;
841 iovec[n].iov_len = (e - p) + 1 + l;
842 n++;
843 } else
844 free(k);
7f3e6257
LP
845
846 remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
847 p = e + 1 + sizeof(uint64_t) + l + 1;
848 }
849 }
850
6e409ce1 851 dispatch_message(s, iovec, n, m, ucred, tv, priority);
7f3e6257
LP
852
853 for (j = 0; j < n; j++)
854 if (iovec[j].iov_base < buffer ||
855 (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
856 free(iovec[j].iov_base);
87d2c1ff
LP
857}
858
fe652127 859static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
85d83bf4 860 struct iovec iovec[18];
fe652127
LP
861 char *message = NULL, *syslog_priority = NULL;
862 unsigned n = 0;
863 size_t tag_len;
864 int priority;
865
87d2c1ff 866 assert(s);
fe652127
LP
867 assert(p);
868
869 priority = s->priority;
870
871 if (s->priority_prefix &&
872 l > 3 &&
873 p[0] == '<' &&
874 p[1] >= '0' && p[1] <= '7' &&
875 p[2] == '>') {
876
877 priority = p[1] - '0';
878 p += 3;
879 l -= 3;
880 }
881
882 if (l <= 0)
883 return 0;
884
885 if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
886 IOVEC_SET_STRING(iovec[n++], syslog_priority);
887
888 tag_len = s->tag ? strlen(s->tag) + 2: 0;
889 message = malloc(8 + tag_len + l);
890 if (message) {
891 memcpy(message, "MESSAGE=", 8);
892
893 if (s->tag) {
894 memcpy(message+8, s->tag, tag_len-2);
895 memcpy(message+8+tag_len-2, ": ", 2);
896 }
87d2c1ff 897
fe652127
LP
898 memcpy(message+8+tag_len, p, l);
899 iovec[n].iov_base = message;
900 iovec[n].iov_len = 8+tag_len+l;
901 n++;
87d2c1ff
LP
902 }
903
6e409ce1 904 dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
fe652127
LP
905
906 if (s->tee_console) {
907 int console;
908
909 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
910 if (console >= 0) {
911 n = 0;
912 if (s->tag) {
913 IOVEC_SET_STRING(iovec[n++], s->tag);
914 IOVEC_SET_STRING(iovec[n++], ": ");
915 }
916
917 iovec[n].iov_base = (void*) p;
918 iovec[n].iov_len = l;
919 n++;
920
921 IOVEC_SET_STRING(iovec[n++], (char*) "\n");
922
923 writev(console, iovec, n);
924 }
925 }
926
927 free(message);
928 free(syslog_priority);
929
930 return 0;
931}
932
933static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
934 assert(s);
935 assert(p);
936
937 while (l > 0 && strchr(WHITESPACE, *p)) {
938 l--;
939 p++;
940 }
941
942 while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
943 l--;
944
945 switch (s->state) {
946
947 case STDOUT_STREAM_TAG:
948
949 if (l > 0) {
950 s->tag = strndup(p, l);
951 if (!s->tag) {
952 log_error("Out of memory");
953 return -EINVAL;
954 }
955 }
956
957 s->state = STDOUT_STREAM_PRIORITY;
958 return 0;
959
960 case STDOUT_STREAM_PRIORITY:
961 if (l != 1 || *p < '0' || *p > '7') {
962 log_warning("Failed to parse log priority line.");
963 return -EINVAL;
964 }
965
966 s->priority = *p - '0';
967 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
968 return 0;
969
970 case STDOUT_STREAM_PRIORITY_PREFIX:
971 if (l != 1 || *p < '0' || *p > '1') {
972 log_warning("Failed to parse priority prefix line.");
973 return -EINVAL;
974 }
975
976 s->priority_prefix = *p - '0';
977 s->state = STDOUT_STREAM_TEE_CONSOLE;
978 return 0;
979
980 case STDOUT_STREAM_TEE_CONSOLE:
981 if (l != 1 || *p < '0' || *p > '1') {
982 log_warning("Failed to parse tee to console line.");
983 return -EINVAL;
984 }
985
986 s->tee_console = *p - '0';
987 s->state = STDOUT_STREAM_RUNNING;
988 return 0;
989
990 case STDOUT_STREAM_RUNNING:
991 return stdout_stream_log(s, p, l);
992 }
993
994 assert_not_reached("Unknown stream state");
995}
996
997static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
998 char *p;
999 size_t remaining;
1000 int r;
1001
1002 assert(s);
1003
1004 p = s->buffer;
1005 remaining = s->length;
1006 for (;;) {
1007 char *end;
1008 size_t skip;
1009
1010 end = memchr(p, '\n', remaining);
1011 if (!end) {
1012 if (remaining >= LINE_MAX) {
1013 end = p + LINE_MAX;
1014 skip = LINE_MAX;
1015 } else
1016 break;
1017 } else
1018 skip = end - p + 1;
1019
1020 r = stdout_stream_line(s, p, end - p);
1021 if (r < 0)
1022 return r;
1023
1024 remaining -= skip;
1025 p += skip;
1026 }
1027
1028 if (force_flush && remaining > 0) {
1029 r = stdout_stream_line(s, p, remaining);
1030 if (r < 0)
1031 return r;
1032
1033 p += remaining;
1034 remaining = 0;
1035 }
1036
1037 if (p > s->buffer) {
1038 memmove(s->buffer, p, remaining);
1039 s->length = remaining;
1040 }
1041
1042 return 0;
1043}
1044
1045static int stdout_stream_process(StdoutStream *s) {
1046 ssize_t l;
1047 int r;
1048
1049 assert(s);
1050
1051 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
1052 if (l < 0) {
1053
1054 if (errno == EAGAIN)
1055 return 0;
1056
1057 log_warning("Failed to read from stream: %m");
1058 return -errno;
1059 }
1060
1061 if (l == 0) {
1062 r = stdout_stream_scan(s, true);
1063 if (r < 0)
1064 return r;
1065
1066 return 0;
1067 }
1068
1069 s->length += l;
1070 r = stdout_stream_scan(s, false);
1071 if (r < 0)
1072 return r;
1073
1074 return 1;
1075
1076}
1077
1078static void stdout_stream_free(StdoutStream *s) {
1079 assert(s);
1080
1081 if (s->server) {
1082 assert(s->server->n_stdout_streams > 0);
1083 s->server->n_stdout_streams --;
1084 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1085 }
1086
1087 if (s->fd >= 0) {
1088 if (s->server)
1089 epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1090
1091 close_nointr_nofail(s->fd);
1092 }
1093
1094 free(s->tag);
1095 free(s);
1096}
1097
1098static int stdout_stream_new(Server *s) {
1099 StdoutStream *stream;
1100 int fd, r;
1101 socklen_t len;
1102 struct epoll_event ev;
1103
1104 assert(s);
1105
1106 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1107 if (fd < 0) {
1108 if (errno == EAGAIN)
1109 return 0;
1110
1111 log_error("Failed to accept stdout connection: %m");
1112 return -errno;
1113 }
1114
1115 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1116 log_warning("Too many stdout streams, refusing connection.");
1117 close_nointr_nofail(fd);
1118 return 0;
1119 }
1120
1121 stream = new0(StdoutStream, 1);
1122 if (!stream) {
1123 log_error("Out of memory.");
1124 close_nointr_nofail(fd);
1125 return -ENOMEM;
1126 }
1127
1128 stream->fd = fd;
1129
1130 len = sizeof(stream->ucred);
1131 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1132 log_error("Failed to determine peer credentials: %m");
1133 r = -errno;
1134 goto fail;
1135 }
1136
1137 if (shutdown(fd, SHUT_WR) < 0) {
1138 log_error("Failed to shutdown writing side of socket: %m");
1139 r = -errno;
1140 goto fail;
1141 }
1142
1143 zero(ev);
1144 ev.data.ptr = stream;
1145 ev.events = EPOLLIN;
1146 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1147 log_error("Failed to add stream to event loop: %m");
1148 r = -errno;
1149 goto fail;
1150 }
1151
1152 stream->server = s;
1153 LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1154 s->n_stdout_streams ++;
1155
1156 return 0;
1157
1158fail:
1159 stdout_stream_free(stream);
1160 return r;
1161}
1162
cf244689
LP
1163static int system_journal_open(Server *s) {
1164 int r;
1165 char *fn;
1166 sd_id128_t machine;
1167 char ids[33];
1168
1169 r = sd_id128_get_machine(&machine);
1170 if (r < 0)
1171 return r;
1172
1173 sd_id128_to_string(machine, ids);
1174
1175 if (!s->system_journal) {
1176
1177 /* First try to create the machine path, but not the prefix */
1178 fn = strappend("/var/log/journal/", ids);
1179 if (!fn)
1180 return -ENOMEM;
1181 (void) mkdir(fn, 0755);
1182 free(fn);
1183
1184 /* The create the system journal file */
1185 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1186 if (!fn)
1187 return -ENOMEM;
1188
1189 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1190 free(fn);
1191
1192 if (r >= 0) {
babfc091
LP
1193 journal_default_metrics(&s->system_metrics, s->system_journal->fd);
1194
1195 s->system_journal->metrics = s->system_metrics;
cf244689
LP
1196 s->system_journal->compress = s->compress;
1197
1198 fix_perms(s->system_journal, 0);
1199 } else if (r < 0) {
1200
adf7d506
LP
1201 if (r != -ENOENT && r != -EROFS)
1202 log_warning("Failed to open system journal: %s", strerror(-r));
1203
1204 r = 0;
cf244689
LP
1205 }
1206 }
1207
1208 if (!s->runtime_journal) {
1209
1210 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1211 if (!fn)
1212 return -ENOMEM;
1213
1214 if (s->system_journal) {
1215
1216 /* Try to open the runtime journal, but only
1217 * if it already exists, so that we can flush
1218 * it into the system journal */
1219
1220 r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1221 free(fn);
1222
1223 if (r < 0) {
adf7d506
LP
1224 if (r != -ENOENT)
1225 log_warning("Failed to open runtime journal: %s", strerror(-r));
cf244689 1226
adf7d506 1227 r = 0;
cf244689
LP
1228 }
1229
1230 } else {
1231
1232 /* OK, we really need the runtime journal, so create
1233 * it if necessary. */
1234
1235 (void) mkdir_parents(fn, 0755);
1236 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1237 free(fn);
1238
1239 if (r < 0) {
1240 log_error("Failed to open runtime journal: %s", strerror(-r));
1241 return r;
1242 }
1243 }
1244
1245 if (s->runtime_journal) {
babfc091
LP
1246 journal_default_metrics(&s->runtime_metrics, s->runtime_journal->fd);
1247
1248 s->runtime_journal->metrics = s->runtime_metrics;
cf244689
LP
1249 s->runtime_journal->compress = s->compress;
1250
1251 fix_perms(s->runtime_journal, 0);
1252 }
1253 }
1254
1255 return r;
1256}
1257
1258static int server_flush_to_var(Server *s) {
1259 char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1260 Object *o = NULL;
1261 int r;
1262 sd_id128_t machine;
1263 sd_journal *j;
54a7b863 1264 usec_t ts;
cf244689
LP
1265
1266 assert(s);
1267
54a7b863
LP
1268 if (!s->runtime_journal)
1269 return 0;
1270
1271 ts = now(CLOCK_MONOTONIC);
1272 if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1273 return 0;
1274
1275 s->var_available_timestamp = ts;
1276
cf244689
LP
1277 system_journal_open(s);
1278
54a7b863 1279 if (!s->system_journal)
cf244689
LP
1280 return 0;
1281
1282 r = sd_id128_get_machine(&machine);
1283 if (r < 0) {
1284 log_error("Failed to get machine id: %s", strerror(-r));
1285 return r;
1286 }
1287
1288 r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1289 if (r < 0) {
1290 log_error("Failed to read runtime journal: %s", strerror(-r));
1291 return r;
1292 }
1293
1294 SD_JOURNAL_FOREACH(j) {
1295 JournalFile *f;
1296
1297 f = j->current_file;
1298 assert(f && f->current_offset > 0);
1299
1300 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1301 if (r < 0) {
1302 log_error("Can't read entry: %s", strerror(-r));
1303 goto finish;
1304 }
1305
1306 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1307 if (r == -E2BIG) {
1308 log_info("Allocation limit reached.");
1309
1310 journal_file_post_change(s->system_journal);
1311 server_vacuum(s);
1312
1313 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1314 }
1315
1316 if (r < 0) {
1317 log_error("Can't write entry: %s", strerror(-r));
1318 goto finish;
1319 }
1320 }
1321
1322finish:
1323 journal_file_post_change(s->system_journal);
1324
1325 journal_file_close(s->runtime_journal);
1326 s->runtime_journal = NULL;
1327
1328 if (r >= 0) {
1329 sd_id128_to_string(machine, path + 17);
1330 rm_rf(path, false, true, false);
1331 }
1332
1333 return r;
1334}
1335
8b18eb67
LP
1336static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1337 struct msghdr msghdr;
1338 struct iovec iovec;
1339 struct cmsghdr *cmsg;
1340 union {
1341 struct cmsghdr cmsghdr;
1342 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1343 CMSG_SPACE(sizeof(struct timeval))];
1344 } control;
1345 union sockaddr_union sa;
1346
1347 assert(s);
1348
1349 zero(msghdr);
1350
1351 zero(iovec);
1352 iovec.iov_base = (void*) buffer;
1353 iovec.iov_len = length;
1354 msghdr.msg_iov = &iovec;
1355 msghdr.msg_iovlen = 1;
1356
1357 zero(sa);
1358 sa.un.sun_family = AF_UNIX;
1359 strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1360 msghdr.msg_name = &sa;
1361 msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1362
1363 zero(control);
1364 msghdr.msg_control = &control;
1365 msghdr.msg_controllen = sizeof(control);
1366
1367 cmsg = CMSG_FIRSTHDR(&msghdr);
1368 cmsg->cmsg_level = SOL_SOCKET;
1369 cmsg->cmsg_type = SCM_CREDENTIALS;
1370 cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1371 memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1372 msghdr.msg_controllen = cmsg->cmsg_len;
1373
1374 /* Forward the syslog message we received via /dev/log to
1375 * /run/systemd/syslog. Unfortunately we currently can't set
1376 * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1377
1378 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1379 return;
1380
1381 if (errno == ESRCH) {
1382 struct ucred u;
1383
1384 /* Hmm, presumably the sender process vanished
1385 * by now, so let's fix it as good as we
1386 * can, and retry */
1387
1388 u = *ucred;
1389 u.pid = getpid();
1390 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1391
1392 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1393 return;
1394 }
1395
1396 log_debug("Failed to forward syslog message: %m");
1397}
1398
fe652127
LP
1399static int process_event(Server *s, struct epoll_event *ev) {
1400 assert(s);
1401
87d2c1ff
LP
1402 if (ev->data.fd == s->signal_fd) {
1403 struct signalfd_siginfo sfsi;
1404 ssize_t n;
1405
fe652127
LP
1406 if (ev->events != EPOLLIN) {
1407 log_info("Got invalid event from epoll.");
1408 return -EIO;
1409 }
1410
87d2c1ff
LP
1411 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1412 if (n != sizeof(sfsi)) {
1413
1414 if (n >= 0)
1415 return -EIO;
1416
1417 if (errno == EINTR || errno == EAGAIN)
1418 return 0;
1419
1420 return -errno;
1421 }
1422
cf244689
LP
1423 if (sfsi.ssi_signo == SIGUSR1) {
1424 server_flush_to_var(s);
1425 return 0;
1426 }
1427
87d2c1ff
LP
1428 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1429 return 0;
1430
fe652127
LP
1431 } else if (ev->data.fd == s->native_fd ||
1432 ev->data.fd == s->syslog_fd) {
1433
1434 if (ev->events != EPOLLIN) {
1435 log_info("Got invalid event from epoll.");
1436 return -EIO;
1437 }
cec736d2 1438
87d2c1ff 1439 for (;;) {
87d2c1ff
LP
1440 struct msghdr msghdr;
1441 struct iovec iovec;
1442 struct ucred *ucred = NULL;
1443 struct timeval *tv = NULL;
1444 struct cmsghdr *cmsg;
1445 union {
1446 struct cmsghdr cmsghdr;
1447 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1448 CMSG_SPACE(sizeof(struct timeval))];
1449 } control;
1450 ssize_t n;
7f3e6257
LP
1451 int v;
1452
1453 if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1454 log_error("SIOCINQ failed: %m");
1455 return -errno;
1456 }
1457
1458 if (v <= 0)
1459 return 1;
1460
1461 if (s->buffer_size < (size_t) v) {
1462 void *b;
1463 size_t l;
1464
1465 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1466 b = realloc(s->buffer, l+1);
1467
1468 if (!b) {
1469 log_error("Couldn't increase buffer.");
1470 return -ENOMEM;
1471 }
1472
1473 s->buffer_size = l;
1474 s->buffer = b;
1475 }
87d2c1ff
LP
1476
1477 zero(iovec);
7f3e6257
LP
1478 iovec.iov_base = s->buffer;
1479 iovec.iov_len = s->buffer_size;
87d2c1ff
LP
1480
1481 zero(control);
1482 zero(msghdr);
1483 msghdr.msg_iov = &iovec;
1484 msghdr.msg_iovlen = 1;
1485 msghdr.msg_control = &control;
1486 msghdr.msg_controllen = sizeof(control);
1487
1488 n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1489 if (n < 0) {
1490
1491 if (errno == EINTR || errno == EAGAIN)
1492 return 1;
1493
1494 log_error("recvmsg() failed: %m");
1495 return -errno;
1496 }
1497
1498 for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1499
1500 if (cmsg->cmsg_level == SOL_SOCKET &&
1501 cmsg->cmsg_type == SCM_CREDENTIALS &&
1502 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1503 ucred = (struct ucred*) CMSG_DATA(cmsg);
1504 else if (cmsg->cmsg_level == SOL_SOCKET &&
1505 cmsg->cmsg_type == SO_TIMESTAMP &&
1506 cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1507 tv = (struct timeval*) CMSG_DATA(cmsg);
1508 }
1509
7f3e6257
LP
1510 if (ev->data.fd == s->syslog_fd) {
1511 char *e;
1512
1513 e = memchr(s->buffer, '\n', n);
1514 if (e)
1515 *e = 0;
1516 else
1517 s->buffer[n] = 0;
87d2c1ff 1518
8b18eb67 1519 forward_syslog(s, s->buffer, n, ucred, tv);
7f3e6257
LP
1520 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1521 } else
1522 process_native_message(s, s->buffer, n, ucred, tv);
87d2c1ff 1523 }
cec736d2
LP
1524
1525 return 1;
fe652127
LP
1526
1527 } else if (ev->data.fd == s->stdout_fd) {
1528
1529 if (ev->events != EPOLLIN) {
1530 log_info("Got invalid event from epoll.");
1531 return -EIO;
1532 }
1533
1534 stdout_stream_new(s);
1535 return 1;
1536
1537 } else {
1538 StdoutStream *stream;
1539
1540 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1541 log_info("Got invalid event from epoll.");
1542 return -EIO;
1543 }
1544
1545 /* If it is none of the well-known fds, it must be an
1546 * stdout stream fd. Note that this is a bit ugly here
1547 * (since we rely that none of the well-known fds
1548 * could be interpreted as pointer), but nonetheless
1549 * safe, since the well-known fds would never get an
1550 * fd > 4096, i.e. beyond the first memory page */
1551
1552 stream = ev->data.ptr;
1553
1554 if (stdout_stream_process(stream) <= 0)
1555 stdout_stream_free(stream);
1556
1557 return 1;
87d2c1ff
LP
1558 }
1559
cec736d2
LP
1560 log_error("Unknown event.");
1561 return 0;
87d2c1ff
LP
1562}
1563
7f3e6257
LP
1564static int open_syslog_socket(Server *s) {
1565 union sockaddr_union sa;
1566 int one, r;
fe652127 1567 struct epoll_event ev;
8b18eb67 1568 struct timeval tv;
87d2c1ff
LP
1569
1570 assert(s);
1571
7f3e6257 1572 if (s->syslog_fd < 0) {
87d2c1ff 1573
7f3e6257
LP
1574 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1575 if (s->syslog_fd < 0) {
1576 log_error("socket() failed: %m");
1577 return -errno;
1578 }
1579
1580 zero(sa);
1581 sa.un.sun_family = AF_UNIX;
8b18eb67 1582 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
7f3e6257
LP
1583
1584 unlink(sa.un.sun_path);
1585
1586 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1587 if (r < 0) {
1588 log_error("bind() failed: %m");
1589 return -errno;
1590 }
1591
1592 chmod(sa.un.sun_path, 0666);
87d2c1ff
LP
1593 }
1594
7f3e6257
LP
1595 one = 1;
1596 r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1597 if (r < 0) {
1598 log_error("SO_PASSCRED failed: %m");
1599 return -errno;
87d2c1ff
LP
1600 }
1601
7f3e6257
LP
1602 one = 1;
1603 r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1604 if (r < 0) {
1605 log_error("SO_TIMESTAMP failed: %m");
1606 return -errno;
87d2c1ff
LP
1607 }
1608
8b18eb67
LP
1609 /* Since we use the same socket for forwarding this to some
1610 * other syslog implementation, make sure we don't hang
1611 * forever */
1612 timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1613 if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1614 log_error("SO_SNDTIMEO failed: %m");
1615 return -errno;
1616 }
1617
fe652127
LP
1618 zero(ev);
1619 ev.events = EPOLLIN;
1620 ev.data.fd = s->syslog_fd;
1621 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1622 log_error("Failed to add syslog server fd to epoll object: %m");
1623 return -errno;
1624 }
1625
7f3e6257
LP
1626 return 0;
1627}
87d2c1ff 1628
7f3e6257
LP
1629static int open_native_socket(Server*s) {
1630 union sockaddr_union sa;
1631 int one, r;
fe652127 1632 struct epoll_event ev;
7f3e6257
LP
1633
1634 assert(s);
1635
1636 if (s->native_fd < 0) {
1637
1638 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1639 if (s->native_fd < 0) {
87d2c1ff
LP
1640 log_error("socket() failed: %m");
1641 return -errno;
1642 }
1643
1644 zero(sa);
1645 sa.un.sun_family = AF_UNIX;
7f3e6257 1646 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
87d2c1ff
LP
1647
1648 unlink(sa.un.sun_path);
1649
7f3e6257 1650 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
87d2c1ff
LP
1651 if (r < 0) {
1652 log_error("bind() failed: %m");
1653 return -errno;
1654 }
1655
1656 chmod(sa.un.sun_path, 0666);
1657 }
1658
1659 one = 1;
7f3e6257 1660 r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
87d2c1ff
LP
1661 if (r < 0) {
1662 log_error("SO_PASSCRED failed: %m");
1663 return -errno;
1664 }
1665
1666 one = 1;
7f3e6257 1667 r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
87d2c1ff
LP
1668 if (r < 0) {
1669 log_error("SO_TIMESTAMP failed: %m");
1670 return -errno;
1671 }
1672
fe652127
LP
1673 zero(ev);
1674 ev.events = EPOLLIN;
1675 ev.data.fd = s->native_fd;
1676 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1677 log_error("Failed to add native server fd to epoll object: %m");
1678 return -errno;
1679 }
1680
7f3e6257
LP
1681 return 0;
1682}
1683
fe652127
LP
1684static int open_stdout_socket(Server *s) {
1685 union sockaddr_union sa;
1686 int r;
7f3e6257 1687 struct epoll_event ev;
fe652127
LP
1688
1689 assert(s);
1690
1691 if (s->stdout_fd < 0) {
1692
1693 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1694 if (s->stdout_fd < 0) {
1695 log_error("socket() failed: %m");
1696 return -errno;
1697 }
1698
1699 zero(sa);
1700 sa.un.sun_family = AF_UNIX;
1701 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1702
1703 unlink(sa.un.sun_path);
1704
1705 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1706 if (r < 0) {
1707 log_error("bind() failed: %m");
1708 return -errno;
1709 }
1710
1711 chmod(sa.un.sun_path, 0666);
1712
1713 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1714 log_error("liste() failed: %m");
1715 return -errno;
1716 }
1717 }
1718
1719 zero(ev);
1720 ev.events = EPOLLIN;
1721 ev.data.fd = s->stdout_fd;
1722 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1723 log_error("Failed to add stdout server fd to epoll object: %m");
1724 return -errno;
1725 }
1726
1727 return 0;
1728}
1729
1730static int open_signalfd(Server *s) {
7f3e6257 1731 sigset_t mask;
fe652127
LP
1732 struct epoll_event ev;
1733
1734 assert(s);
1735
1736 assert_se(sigemptyset(&mask) == 0);
cf244689 1737 sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
fe652127
LP
1738 assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1739
1740 s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1741 if (s->signal_fd < 0) {
1742 log_error("signalfd(): %m");
1743 return -errno;
1744 }
1745
1746 zero(ev);
1747 ev.events = EPOLLIN;
1748 ev.data.fd = s->signal_fd;
1749
1750 if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1751 log_error("epoll_ctl(): %m");
1752 return -errno;
1753 }
1754
1755 return 0;
1756}
1757
1758static int server_init(Server *s) {
1759 int n, r, fd;
7f3e6257
LP
1760
1761 assert(s);
1762
1763 zero(*s);
fe652127 1764 s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
807e17f0 1765 s->compress = true;
7f3e6257 1766
babfc091
LP
1767 memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics));
1768 memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics));
1769
fe652127
LP
1770 s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1771 if (!s->user_journals) {
1772 log_error("Out of memory.");
1773 return -ENOMEM;
1774 }
1775
7f3e6257
LP
1776 s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1777 if (s->epoll_fd < 0) {
1778 log_error("Failed to create epoll object: %m");
1779 return -errno;
1780 }
1781
1782 n = sd_listen_fds(true);
1783 if (n < 0) {
1784 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1785 return n;
1786 }
1787
1788 for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1789
fe652127 1790 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
7f3e6257 1791
fe652127
LP
1792 if (s->native_fd >= 0) {
1793 log_error("Too many native sockets passed.");
7f3e6257
LP
1794 return -EINVAL;
1795 }
1796
fe652127 1797 s->native_fd = fd;
7f3e6257 1798
fe652127 1799 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
7f3e6257 1800
fe652127
LP
1801 if (s->stdout_fd >= 0) {
1802 log_error("Too many stdout sockets passed.");
7f3e6257
LP
1803 return -EINVAL;
1804 }
1805
fe652127
LP
1806 s->stdout_fd = fd;
1807
1808 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1809
1810 if (s->syslog_fd >= 0) {
1811 log_error("Too many /dev/log sockets passed.");
1812 return -EINVAL;
1813 }
1814
1815 s->syslog_fd = fd;
1816
7f3e6257
LP
1817 } else {
1818 log_error("Unknown socket passed.");
1819 return -EINVAL;
1820 }
1821 }
1822
1823 r = open_syslog_socket(s);
1824 if (r < 0)
1825 return r;
1826
7f3e6257
LP
1827 r = open_native_socket(s);
1828 if (r < 0)
1829 return r;
1830
fe652127
LP
1831 r = open_stdout_socket(s);
1832 if (r < 0)
1833 return r;
87d2c1ff 1834
ed49ef3f
LP
1835 r = system_journal_open(s);
1836 if (r < 0)
87d2c1ff 1837 return r;
87d2c1ff 1838
fe652127
LP
1839 r = open_signalfd(s);
1840 if (r < 0)
1841 return r;
87d2c1ff 1842
de97b26a 1843 s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
6e409ce1
LP
1844 if (!s->rate_limit)
1845 return -ENOMEM;
1846
87d2c1ff
LP
1847 return 0;
1848}
1849
1850static void server_done(Server *s) {
1851 JournalFile *f;
1852 assert(s);
1853
fe652127
LP
1854 while (s->stdout_streams)
1855 stdout_stream_free(s->stdout_streams);
1856
87d2c1ff
LP
1857 if (s->system_journal)
1858 journal_file_close(s->system_journal);
1859
f4b47811
LP
1860 if (s->runtime_journal)
1861 journal_file_close(s->runtime_journal);
1862
87d2c1ff
LP
1863 while ((f = hashmap_steal_first(s->user_journals)))
1864 journal_file_close(f);
1865
1866 hashmap_free(s->user_journals);
1867
1868 if (s->epoll_fd >= 0)
1869 close_nointr_nofail(s->epoll_fd);
1870
1871 if (s->signal_fd >= 0)
1872 close_nointr_nofail(s->signal_fd);
1873
1874 if (s->syslog_fd >= 0)
1875 close_nointr_nofail(s->syslog_fd);
7f3e6257
LP
1876
1877 if (s->native_fd >= 0)
1878 close_nointr_nofail(s->native_fd);
fe652127
LP
1879
1880 if (s->stdout_fd >= 0)
1881 close_nointr_nofail(s->stdout_fd);
6e409ce1
LP
1882
1883 if (s->rate_limit)
1884 journal_rate_limit_free(s->rate_limit);
783d2675
LP
1885
1886 free(s->buffer);
87d2c1ff
LP
1887}
1888
1889int main(int argc, char *argv[]) {
1890 Server server;
1891 int r;
1892
1893 /* if (getppid() != 1) { */
1894 /* log_error("This program should be invoked by init only."); */
1895 /* return EXIT_FAILURE; */
1896 /* } */
1897
1898 if (argc > 1) {
1899 log_error("This program does not take arguments.");
1900 return EXIT_FAILURE;
1901 }
1902
f4b47811 1903 log_set_target(LOG_TARGET_CONSOLE);
87d2c1ff
LP
1904 log_parse_environment();
1905 log_open();
1906
1907 umask(0022);
1908
1909 r = server_init(&server);
1910 if (r < 0)
1911 goto finish;
1912
1913 log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1914
1915 sd_notify(false,
1916 "READY=1\n"
fe652127 1917 "STATUS=Processing requests...");
50f20cfd 1918
cf244689
LP
1919 server_vacuum(&server);
1920 server_flush_to_var(&server);
1921
87d2c1ff
LP
1922 for (;;) {
1923 struct epoll_event event;
1924
1925 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1926 if (r < 0) {
1927
1928 if (errno == EINTR)
1929 continue;
1930
1931 log_error("epoll_wait() failed: %m");
1932 r = -errno;
1933 goto finish;
1934 } else if (r == 0)
1935 break;
1936
1937 r = process_event(&server, &event);
1938 if (r < 0)
1939 goto finish;
1940 else if (r == 0)
1941 break;
1942 }
1943
fe652127
LP
1944 log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1945
87d2c1ff
LP
1946finish:
1947 sd_notify(false,
1948 "STATUS=Shutting down...");
1949
1950 server_done(&server);
1951
1952 return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1953}