]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journald-stream.c
Merge pull request #1548 from evverx/journalctl-catalog-ops-fixes
[thirdparty/systemd.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <unistd.h>
23 #include <stddef.h>
24
25 #ifdef HAVE_SELINUX
26 #include <selinux/selinux.h>
27 #endif
28
29 #include "sd-event.h"
30 #include "sd-daemon.h"
31 #include "socket-util.h"
32 #include "selinux-util.h"
33 #include "mkdir.h"
34 #include "fileio.h"
35 #include "journald-server.h"
36 #include "journald-stream.h"
37 #include "journald-syslog.h"
38 #include "journald-kmsg.h"
39 #include "journald-console.h"
40 #include "journald-wall.h"
41
42 #define STDOUT_STREAMS_MAX 4096
43
44 typedef enum StdoutStreamState {
45 STDOUT_STREAM_IDENTIFIER,
46 STDOUT_STREAM_UNIT_ID,
47 STDOUT_STREAM_PRIORITY,
48 STDOUT_STREAM_LEVEL_PREFIX,
49 STDOUT_STREAM_FORWARD_TO_SYSLOG,
50 STDOUT_STREAM_FORWARD_TO_KMSG,
51 STDOUT_STREAM_FORWARD_TO_CONSOLE,
52 STDOUT_STREAM_RUNNING
53 } StdoutStreamState;
54
55 struct StdoutStream {
56 Server *server;
57 StdoutStreamState state;
58
59 int fd;
60
61 struct ucred ucred;
62 char *label;
63 char *identifier;
64 char *unit_id;
65 int priority;
66 bool level_prefix:1;
67 bool forward_to_syslog:1;
68 bool forward_to_kmsg:1;
69 bool forward_to_console:1;
70
71 bool fdstore:1;
72
73 char buffer[LINE_MAX+1];
74 size_t length;
75
76 sd_event_source *event_source;
77
78 char *state_file;
79
80 LIST_FIELDS(StdoutStream, stdout_stream);
81 };
82
83 void stdout_stream_free(StdoutStream *s) {
84 if (!s)
85 return;
86
87 if (s->server) {
88 assert(s->server->n_stdout_streams > 0);
89 s->server->n_stdout_streams --;
90 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
91 }
92
93 if (s->event_source) {
94 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
95 s->event_source = sd_event_source_unref(s->event_source);
96 }
97
98 safe_close(s->fd);
99 free(s->label);
100 free(s->identifier);
101 free(s->unit_id);
102 free(s->state_file);
103
104 free(s);
105 }
106
107 DEFINE_TRIVIAL_CLEANUP_FUNC(StdoutStream*, stdout_stream_free);
108
109 static void stdout_stream_destroy(StdoutStream *s) {
110 if (!s)
111 return;
112
113 if (s->state_file)
114 unlink(s->state_file);
115
116 stdout_stream_free(s);
117 }
118
119 static int stdout_stream_save(StdoutStream *s) {
120 _cleanup_free_ char *temp_path = NULL;
121 _cleanup_fclose_ FILE *f = NULL;
122 int r;
123
124 assert(s);
125
126 if (s->state != STDOUT_STREAM_RUNNING)
127 return 0;
128
129 if (!s->state_file) {
130 struct stat st;
131
132 r = fstat(s->fd, &st);
133 if (r < 0)
134 return log_warning_errno(errno, "Failed to stat connected stream: %m");
135
136 /* We use device and inode numbers as identifier for the stream */
137 if (asprintf(&s->state_file, "/run/systemd/journal/streams/%lu:%lu", (unsigned long) st.st_dev, (unsigned long) st.st_ino) < 0)
138 return log_oom();
139 }
140
141 mkdir_p("/run/systemd/journal/streams", 0755);
142
143 r = fopen_temporary(s->state_file, &f, &temp_path);
144 if (r < 0)
145 goto fail;
146
147 fprintf(f,
148 "# This is private data. Do not parse\n"
149 "PRIORITY=%i\n"
150 "LEVEL_PREFIX=%i\n"
151 "FORWARD_TO_SYSLOG=%i\n"
152 "FORWARD_TO_KMSG=%i\n"
153 "FORWARD_TO_CONSOLE=%i\n",
154 s->priority,
155 s->level_prefix,
156 s->forward_to_syslog,
157 s->forward_to_kmsg,
158 s->forward_to_console);
159
160 if (!isempty(s->identifier)) {
161 _cleanup_free_ char *escaped;
162
163 escaped = cescape(s->identifier);
164 if (!escaped) {
165 r = -ENOMEM;
166 goto fail;
167 }
168
169 fprintf(f, "IDENTIFIER=%s\n", escaped);
170 }
171
172 if (!isempty(s->unit_id)) {
173 _cleanup_free_ char *escaped;
174
175 escaped = cescape(s->unit_id);
176 if (!escaped) {
177 r = -ENOMEM;
178 goto fail;
179 }
180
181 fprintf(f, "UNIT=%s\n", escaped);
182 }
183
184 r = fflush_and_check(f);
185 if (r < 0)
186 goto fail;
187
188 if (rename(temp_path, s->state_file) < 0) {
189 r = -errno;
190 goto fail;
191 }
192
193 /* Store the connection fd in PID 1, so that we get it passed
194 * in again on next start */
195 if (!s->fdstore) {
196 sd_pid_notify_with_fds(0, false, "FDSTORE=1", &s->fd, 1);
197 s->fdstore = true;
198 }
199
200 return 0;
201
202 fail:
203 (void) unlink(s->state_file);
204
205 if (temp_path)
206 (void) unlink(temp_path);
207
208 return log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
209 }
210
211 static int stdout_stream_log(StdoutStream *s, const char *p) {
212 struct iovec iovec[N_IOVEC_META_FIELDS + 5];
213 int priority;
214 char syslog_priority[] = "PRIORITY=\0";
215 char syslog_facility[sizeof("SYSLOG_FACILITY=")-1 + DECIMAL_STR_MAX(int) + 1];
216 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
217 unsigned n = 0;
218 size_t label_len;
219
220 assert(s);
221 assert(p);
222
223 if (isempty(p))
224 return 0;
225
226 priority = s->priority;
227
228 if (s->level_prefix)
229 syslog_parse_priority(&p, &priority, false);
230
231 if (s->forward_to_syslog || s->server->forward_to_syslog)
232 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
233
234 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
235 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
236
237 if (s->forward_to_console || s->server->forward_to_console)
238 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
239
240 if (s->server->forward_to_wall)
241 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
242
243 IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
244
245 syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
246 IOVEC_SET_STRING(iovec[n++], syslog_priority);
247
248 if (priority & LOG_FACMASK) {
249 xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
250 IOVEC_SET_STRING(iovec[n++], syslog_facility);
251 }
252
253 if (s->identifier) {
254 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
255 if (syslog_identifier)
256 IOVEC_SET_STRING(iovec[n++], syslog_identifier);
257 }
258
259 message = strappend("MESSAGE=", p);
260 if (message)
261 IOVEC_SET_STRING(iovec[n++], message);
262
263 label_len = s->label ? strlen(s->label) : 0;
264 server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, s->label, label_len, s->unit_id, priority, 0);
265 return 0;
266 }
267
268 static int stdout_stream_line(StdoutStream *s, char *p) {
269 int r;
270
271 assert(s);
272 assert(p);
273
274 p = strstrip(p);
275
276 switch (s->state) {
277
278 case STDOUT_STREAM_IDENTIFIER:
279 if (isempty(p))
280 s->identifier = NULL;
281 else {
282 s->identifier = strdup(p);
283 if (!s->identifier)
284 return log_oom();
285 }
286
287 s->state = STDOUT_STREAM_UNIT_ID;
288 return 0;
289
290 case STDOUT_STREAM_UNIT_ID:
291 if (s->ucred.uid == 0) {
292 if (isempty(p))
293 s->unit_id = NULL;
294 else {
295 s->unit_id = strdup(p);
296 if (!s->unit_id)
297 return log_oom();
298 }
299 }
300
301 s->state = STDOUT_STREAM_PRIORITY;
302 return 0;
303
304 case STDOUT_STREAM_PRIORITY:
305 r = safe_atoi(p, &s->priority);
306 if (r < 0 || s->priority < 0 || s->priority > 999) {
307 log_warning("Failed to parse log priority line.");
308 return -EINVAL;
309 }
310
311 s->state = STDOUT_STREAM_LEVEL_PREFIX;
312 return 0;
313
314 case STDOUT_STREAM_LEVEL_PREFIX:
315 r = parse_boolean(p);
316 if (r < 0) {
317 log_warning("Failed to parse level prefix line.");
318 return -EINVAL;
319 }
320
321 s->level_prefix = !!r;
322 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
323 return 0;
324
325 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
326 r = parse_boolean(p);
327 if (r < 0) {
328 log_warning("Failed to parse forward to syslog line.");
329 return -EINVAL;
330 }
331
332 s->forward_to_syslog = !!r;
333 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
334 return 0;
335
336 case STDOUT_STREAM_FORWARD_TO_KMSG:
337 r = parse_boolean(p);
338 if (r < 0) {
339 log_warning("Failed to parse copy to kmsg line.");
340 return -EINVAL;
341 }
342
343 s->forward_to_kmsg = !!r;
344 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
345 return 0;
346
347 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
348 r = parse_boolean(p);
349 if (r < 0) {
350 log_warning("Failed to parse copy to console line.");
351 return -EINVAL;
352 }
353
354 s->forward_to_console = !!r;
355 s->state = STDOUT_STREAM_RUNNING;
356
357 /* Try to save the stream, so that journald can be restarted and we can recover */
358 (void) stdout_stream_save(s);
359 return 0;
360
361 case STDOUT_STREAM_RUNNING:
362 return stdout_stream_log(s, p);
363 }
364
365 assert_not_reached("Unknown stream state");
366 }
367
368 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
369 char *p;
370 size_t remaining;
371 int r;
372
373 assert(s);
374
375 p = s->buffer;
376 remaining = s->length;
377 for (;;) {
378 char *end;
379 size_t skip;
380
381 end = memchr(p, '\n', remaining);
382 if (end)
383 skip = end - p + 1;
384 else if (remaining >= sizeof(s->buffer) - 1) {
385 end = p + sizeof(s->buffer) - 1;
386 skip = remaining;
387 } else
388 break;
389
390 *end = 0;
391
392 r = stdout_stream_line(s, p);
393 if (r < 0)
394 return r;
395
396 remaining -= skip;
397 p += skip;
398 }
399
400 if (force_flush && remaining > 0) {
401 p[remaining] = 0;
402 r = stdout_stream_line(s, p);
403 if (r < 0)
404 return r;
405
406 p += remaining;
407 remaining = 0;
408 }
409
410 if (p > s->buffer) {
411 memmove(s->buffer, p, remaining);
412 s->length = remaining;
413 }
414
415 return 0;
416 }
417
418 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
419 StdoutStream *s = userdata;
420 ssize_t l;
421 int r;
422
423 assert(s);
424
425 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
426 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
427 goto terminate;
428 }
429
430 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
431 if (l < 0) {
432
433 if (errno == EAGAIN)
434 return 0;
435
436 log_warning_errno(errno, "Failed to read from stream: %m");
437 goto terminate;
438 }
439
440 if (l == 0) {
441 stdout_stream_scan(s, true);
442 goto terminate;
443 }
444
445 s->length += l;
446 r = stdout_stream_scan(s, false);
447 if (r < 0)
448 goto terminate;
449
450 return 1;
451
452 terminate:
453 stdout_stream_destroy(s);
454 return 0;
455 }
456
457 static int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
458 _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
459 int r;
460
461 assert(s);
462 assert(fd >= 0);
463
464 stream = new0(StdoutStream, 1);
465 if (!stream)
466 return log_oom();
467
468 stream->fd = -1;
469 stream->priority = LOG_INFO;
470
471 r = getpeercred(fd, &stream->ucred);
472 if (r < 0)
473 return log_error_errno(r, "Failed to determine peer credentials: %m");
474
475 if (mac_selinux_use()) {
476 r = getpeersec(fd, &stream->label);
477 if (r < 0 && r != -EOPNOTSUPP)
478 (void) log_warning_errno(r, "Failed to determine peer security context: %m");
479 }
480
481 (void) shutdown(fd, SHUT_WR);
482
483 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
484 if (r < 0)
485 return log_error_errno(r, "Failed to add stream to event loop: %m");
486
487 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
488 if (r < 0)
489 return log_error_errno(r, "Failed to adjust stdout event source priority: %m");
490
491 stream->fd = fd;
492
493 stream->server = s;
494 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
495 s->n_stdout_streams ++;
496
497 if (ret)
498 *ret = stream;
499
500 stream = NULL;
501
502 return 0;
503 }
504
505 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
506 _cleanup_close_ int fd = -1;
507 Server *s = userdata;
508 int r;
509
510 assert(s);
511
512 if (revents != EPOLLIN) {
513 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
514 return -EIO;
515 }
516
517 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
518 if (fd < 0) {
519 if (errno == EAGAIN)
520 return 0;
521
522 log_error_errno(errno, "Failed to accept stdout connection: %m");
523 return -errno;
524 }
525
526 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
527 log_warning("Too many stdout streams, refusing connection.");
528 return 0;
529 }
530
531 r = stdout_stream_install(s, fd, NULL);
532 if (r < 0)
533 return r;
534
535 fd = -1;
536 return 0;
537 }
538
539 static int stdout_stream_load(StdoutStream *stream, const char *fname) {
540 _cleanup_free_ char
541 *priority = NULL,
542 *level_prefix = NULL,
543 *forward_to_syslog = NULL,
544 *forward_to_kmsg = NULL,
545 *forward_to_console = NULL;
546 int r;
547
548 assert(stream);
549 assert(fname);
550
551 if (!stream->state_file) {
552 stream->state_file = strappend("/run/systemd/journal/streams/", fname);
553 if (!stream->state_file)
554 return log_oom();
555 }
556
557 r = parse_env_file(stream->state_file, NEWLINE,
558 "PRIORITY", &priority,
559 "LEVEL_PREFIX", &level_prefix,
560 "FORWARD_TO_SYSLOG", &forward_to_syslog,
561 "FORWARD_TO_KMSG", &forward_to_kmsg,
562 "FORWARD_TO_CONSOLE", &forward_to_console,
563 "IDENTIFIER", &stream->identifier,
564 "UNIT", &stream->unit_id,
565 NULL);
566 if (r < 0)
567 return log_error_errno(r, "Failed to read: %s", stream->state_file);
568
569 if (priority) {
570 int p;
571
572 p = log_level_from_string(priority);
573 if (p >= 0)
574 stream->priority = p;
575 }
576
577 if (level_prefix) {
578 r = parse_boolean(level_prefix);
579 if (r >= 0)
580 stream->level_prefix = r;
581 }
582
583 if (forward_to_syslog) {
584 r = parse_boolean(forward_to_syslog);
585 if (r >= 0)
586 stream->forward_to_syslog = r;
587 }
588
589 if (forward_to_kmsg) {
590 r = parse_boolean(forward_to_kmsg);
591 if (r >= 0)
592 stream->forward_to_kmsg = r;
593 }
594
595 if (forward_to_console) {
596 r = parse_boolean(forward_to_console);
597 if (r >= 0)
598 stream->forward_to_console = r;
599 }
600
601 return 0;
602 }
603
604 static int stdout_stream_restore(Server *s, const char *fname, int fd) {
605 StdoutStream *stream;
606 int r;
607
608 assert(s);
609 assert(fname);
610 assert(fd >= 0);
611
612 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
613 log_warning("Too many stdout streams, refusing restoring of stream.");
614 return -ENOBUFS;
615 }
616
617 r = stdout_stream_install(s, fd, &stream);
618 if (r < 0)
619 return r;
620
621 stream->state = STDOUT_STREAM_RUNNING;
622 stream->fdstore = true;
623
624 /* Ignore all parsing errors */
625 (void) stdout_stream_load(stream, fname);
626
627 return 0;
628 }
629
630 int server_restore_streams(Server *s, FDSet *fds) {
631 _cleanup_closedir_ DIR *d = NULL;
632 struct dirent *de;
633 int r;
634
635 d = opendir("/run/systemd/journal/streams");
636 if (!d) {
637 if (errno == ENOENT)
638 return 0;
639
640 return log_warning_errno(errno, "Failed to enumerate /run/systemd/journal/streams: %m");
641 }
642
643 FOREACH_DIRENT(de, d, goto fail) {
644 unsigned long st_dev, st_ino;
645 bool found = false;
646 Iterator i;
647 int fd;
648
649 if (sscanf(de->d_name, "%lu:%lu", &st_dev, &st_ino) != 2)
650 continue;
651
652 FDSET_FOREACH(fd, fds, i) {
653 struct stat st;
654
655 if (fstat(fd, &st) < 0)
656 return log_error_errno(errno, "Failed to stat %s: %m", de->d_name);
657
658 if (S_ISSOCK(st.st_mode) && st.st_dev == st_dev && st.st_ino == st_ino) {
659 found = true;
660 break;
661 }
662 }
663
664 if (!found) {
665 /* No file descriptor? Then let's delete the state file */
666 log_debug("Cannot restore stream file %s", de->d_name);
667 unlinkat(dirfd(d), de->d_name, 0);
668 continue;
669 }
670
671 fdset_remove(fds, fd);
672
673 r = stdout_stream_restore(s, de->d_name, fd);
674 if (r < 0)
675 safe_close(fd);
676 }
677
678 return 0;
679
680 fail:
681 return log_error_errno(errno, "Failed to read streams directory: %m");
682 }
683
684 int server_open_stdout_socket(Server *s) {
685 int r;
686
687 assert(s);
688
689 if (s->stdout_fd < 0) {
690 union sockaddr_union sa = {
691 .un.sun_family = AF_UNIX,
692 .un.sun_path = "/run/systemd/journal/stdout",
693 };
694
695 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
696 if (s->stdout_fd < 0)
697 return log_error_errno(errno, "socket() failed: %m");
698
699 unlink(sa.un.sun_path);
700
701 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
702 if (r < 0)
703 return log_error_errno(errno, "bind(%s) failed: %m", sa.un.sun_path);
704
705 (void) chmod(sa.un.sun_path, 0666);
706
707 if (listen(s->stdout_fd, SOMAXCONN) < 0)
708 return log_error_errno(errno, "listen(%s) failed: %m", sa.un.sun_path);
709 } else
710 fd_nonblock(s->stdout_fd, 1);
711
712 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
713 if (r < 0)
714 return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
715
716 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
717 if (r < 0)
718 return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
719
720 return 0;
721 }