]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journald-stream.c
util-lib: split out printf() helpers to stdio-util.h
[thirdparty/systemd.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <stddef.h>
23 #include <unistd.h>
24
25 #ifdef HAVE_SELINUX
26 #include <selinux/selinux.h>
27 #endif
28
29 #include "sd-daemon.h"
30 #include "sd-event.h"
31
32 #include "dirent-util.h"
33 #include "escape.h"
34 #include "fd-util.h"
35 #include "fileio.h"
36 #include "io-util.h"
37 #include "journald-console.h"
38 #include "journald-kmsg.h"
39 #include "journald-server.h"
40 #include "journald-stream.h"
41 #include "journald-syslog.h"
42 #include "journald-wall.h"
43 #include "mkdir.h"
44 #include "parse-util.h"
45 #include "selinux-util.h"
46 #include "socket-util.h"
47 #include "stdio-util.h"
48 #include "string-util.h"
49 #include "syslog-util.h"
50
51 #define STDOUT_STREAMS_MAX 4096
52
53 typedef enum StdoutStreamState {
54 STDOUT_STREAM_IDENTIFIER,
55 STDOUT_STREAM_UNIT_ID,
56 STDOUT_STREAM_PRIORITY,
57 STDOUT_STREAM_LEVEL_PREFIX,
58 STDOUT_STREAM_FORWARD_TO_SYSLOG,
59 STDOUT_STREAM_FORWARD_TO_KMSG,
60 STDOUT_STREAM_FORWARD_TO_CONSOLE,
61 STDOUT_STREAM_RUNNING
62 } StdoutStreamState;
63
64 struct StdoutStream {
65 Server *server;
66 StdoutStreamState state;
67
68 int fd;
69
70 struct ucred ucred;
71 char *label;
72 char *identifier;
73 char *unit_id;
74 int priority;
75 bool level_prefix:1;
76 bool forward_to_syslog:1;
77 bool forward_to_kmsg:1;
78 bool forward_to_console:1;
79
80 bool fdstore:1;
81
82 char buffer[LINE_MAX+1];
83 size_t length;
84
85 sd_event_source *event_source;
86
87 char *state_file;
88
89 LIST_FIELDS(StdoutStream, stdout_stream);
90 };
91
92 void stdout_stream_free(StdoutStream *s) {
93 if (!s)
94 return;
95
96 if (s->server) {
97 assert(s->server->n_stdout_streams > 0);
98 s->server->n_stdout_streams --;
99 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
100 }
101
102 if (s->event_source) {
103 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
104 s->event_source = sd_event_source_unref(s->event_source);
105 }
106
107 safe_close(s->fd);
108 free(s->label);
109 free(s->identifier);
110 free(s->unit_id);
111 free(s->state_file);
112
113 free(s);
114 }
115
116 DEFINE_TRIVIAL_CLEANUP_FUNC(StdoutStream*, stdout_stream_free);
117
118 static void stdout_stream_destroy(StdoutStream *s) {
119 if (!s)
120 return;
121
122 if (s->state_file)
123 unlink(s->state_file);
124
125 stdout_stream_free(s);
126 }
127
128 static int stdout_stream_save(StdoutStream *s) {
129 _cleanup_free_ char *temp_path = NULL;
130 _cleanup_fclose_ FILE *f = NULL;
131 int r;
132
133 assert(s);
134
135 if (s->state != STDOUT_STREAM_RUNNING)
136 return 0;
137
138 if (!s->state_file) {
139 struct stat st;
140
141 r = fstat(s->fd, &st);
142 if (r < 0)
143 return log_warning_errno(errno, "Failed to stat connected stream: %m");
144
145 /* We use device and inode numbers as identifier for the stream */
146 if (asprintf(&s->state_file, "/run/systemd/journal/streams/%lu:%lu", (unsigned long) st.st_dev, (unsigned long) st.st_ino) < 0)
147 return log_oom();
148 }
149
150 mkdir_p("/run/systemd/journal/streams", 0755);
151
152 r = fopen_temporary(s->state_file, &f, &temp_path);
153 if (r < 0)
154 goto fail;
155
156 fprintf(f,
157 "# This is private data. Do not parse\n"
158 "PRIORITY=%i\n"
159 "LEVEL_PREFIX=%i\n"
160 "FORWARD_TO_SYSLOG=%i\n"
161 "FORWARD_TO_KMSG=%i\n"
162 "FORWARD_TO_CONSOLE=%i\n",
163 s->priority,
164 s->level_prefix,
165 s->forward_to_syslog,
166 s->forward_to_kmsg,
167 s->forward_to_console);
168
169 if (!isempty(s->identifier)) {
170 _cleanup_free_ char *escaped;
171
172 escaped = cescape(s->identifier);
173 if (!escaped) {
174 r = -ENOMEM;
175 goto fail;
176 }
177
178 fprintf(f, "IDENTIFIER=%s\n", escaped);
179 }
180
181 if (!isempty(s->unit_id)) {
182 _cleanup_free_ char *escaped;
183
184 escaped = cescape(s->unit_id);
185 if (!escaped) {
186 r = -ENOMEM;
187 goto fail;
188 }
189
190 fprintf(f, "UNIT=%s\n", escaped);
191 }
192
193 r = fflush_and_check(f);
194 if (r < 0)
195 goto fail;
196
197 if (rename(temp_path, s->state_file) < 0) {
198 r = -errno;
199 goto fail;
200 }
201
202 /* Store the connection fd in PID 1, so that we get it passed
203 * in again on next start */
204 if (!s->fdstore) {
205 sd_pid_notify_with_fds(0, false, "FDSTORE=1", &s->fd, 1);
206 s->fdstore = true;
207 }
208
209 return 0;
210
211 fail:
212 (void) unlink(s->state_file);
213
214 if (temp_path)
215 (void) unlink(temp_path);
216
217 return log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
218 }
219
220 static int stdout_stream_log(StdoutStream *s, const char *p) {
221 struct iovec iovec[N_IOVEC_META_FIELDS + 5];
222 int priority;
223 char syslog_priority[] = "PRIORITY=\0";
224 char syslog_facility[sizeof("SYSLOG_FACILITY=")-1 + DECIMAL_STR_MAX(int) + 1];
225 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
226 unsigned n = 0;
227 size_t label_len;
228
229 assert(s);
230 assert(p);
231
232 if (isempty(p))
233 return 0;
234
235 priority = s->priority;
236
237 if (s->level_prefix)
238 syslog_parse_priority(&p, &priority, false);
239
240 if (s->forward_to_syslog || s->server->forward_to_syslog)
241 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
242
243 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
244 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
245
246 if (s->forward_to_console || s->server->forward_to_console)
247 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
248
249 if (s->server->forward_to_wall)
250 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
251
252 IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
253
254 syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
255 IOVEC_SET_STRING(iovec[n++], syslog_priority);
256
257 if (priority & LOG_FACMASK) {
258 xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
259 IOVEC_SET_STRING(iovec[n++], syslog_facility);
260 }
261
262 if (s->identifier) {
263 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
264 if (syslog_identifier)
265 IOVEC_SET_STRING(iovec[n++], syslog_identifier);
266 }
267
268 message = strappend("MESSAGE=", p);
269 if (message)
270 IOVEC_SET_STRING(iovec[n++], message);
271
272 label_len = s->label ? strlen(s->label) : 0;
273 server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, s->label, label_len, s->unit_id, priority, 0);
274 return 0;
275 }
276
277 static int stdout_stream_line(StdoutStream *s, char *p) {
278 int r;
279
280 assert(s);
281 assert(p);
282
283 p = strstrip(p);
284
285 switch (s->state) {
286
287 case STDOUT_STREAM_IDENTIFIER:
288 if (isempty(p))
289 s->identifier = NULL;
290 else {
291 s->identifier = strdup(p);
292 if (!s->identifier)
293 return log_oom();
294 }
295
296 s->state = STDOUT_STREAM_UNIT_ID;
297 return 0;
298
299 case STDOUT_STREAM_UNIT_ID:
300 if (s->ucred.uid == 0) {
301 if (isempty(p))
302 s->unit_id = NULL;
303 else {
304 s->unit_id = strdup(p);
305 if (!s->unit_id)
306 return log_oom();
307 }
308 }
309
310 s->state = STDOUT_STREAM_PRIORITY;
311 return 0;
312
313 case STDOUT_STREAM_PRIORITY:
314 r = safe_atoi(p, &s->priority);
315 if (r < 0 || s->priority < 0 || s->priority > 999) {
316 log_warning("Failed to parse log priority line.");
317 return -EINVAL;
318 }
319
320 s->state = STDOUT_STREAM_LEVEL_PREFIX;
321 return 0;
322
323 case STDOUT_STREAM_LEVEL_PREFIX:
324 r = parse_boolean(p);
325 if (r < 0) {
326 log_warning("Failed to parse level prefix line.");
327 return -EINVAL;
328 }
329
330 s->level_prefix = !!r;
331 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
332 return 0;
333
334 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
335 r = parse_boolean(p);
336 if (r < 0) {
337 log_warning("Failed to parse forward to syslog line.");
338 return -EINVAL;
339 }
340
341 s->forward_to_syslog = !!r;
342 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
343 return 0;
344
345 case STDOUT_STREAM_FORWARD_TO_KMSG:
346 r = parse_boolean(p);
347 if (r < 0) {
348 log_warning("Failed to parse copy to kmsg line.");
349 return -EINVAL;
350 }
351
352 s->forward_to_kmsg = !!r;
353 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
354 return 0;
355
356 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
357 r = parse_boolean(p);
358 if (r < 0) {
359 log_warning("Failed to parse copy to console line.");
360 return -EINVAL;
361 }
362
363 s->forward_to_console = !!r;
364 s->state = STDOUT_STREAM_RUNNING;
365
366 /* Try to save the stream, so that journald can be restarted and we can recover */
367 (void) stdout_stream_save(s);
368 return 0;
369
370 case STDOUT_STREAM_RUNNING:
371 return stdout_stream_log(s, p);
372 }
373
374 assert_not_reached("Unknown stream state");
375 }
376
377 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
378 char *p;
379 size_t remaining;
380 int r;
381
382 assert(s);
383
384 p = s->buffer;
385 remaining = s->length;
386 for (;;) {
387 char *end;
388 size_t skip;
389
390 end = memchr(p, '\n', remaining);
391 if (end)
392 skip = end - p + 1;
393 else if (remaining >= sizeof(s->buffer) - 1) {
394 end = p + sizeof(s->buffer) - 1;
395 skip = remaining;
396 } else
397 break;
398
399 *end = 0;
400
401 r = stdout_stream_line(s, p);
402 if (r < 0)
403 return r;
404
405 remaining -= skip;
406 p += skip;
407 }
408
409 if (force_flush && remaining > 0) {
410 p[remaining] = 0;
411 r = stdout_stream_line(s, p);
412 if (r < 0)
413 return r;
414
415 p += remaining;
416 remaining = 0;
417 }
418
419 if (p > s->buffer) {
420 memmove(s->buffer, p, remaining);
421 s->length = remaining;
422 }
423
424 return 0;
425 }
426
427 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
428 StdoutStream *s = userdata;
429 ssize_t l;
430 int r;
431
432 assert(s);
433
434 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
435 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
436 goto terminate;
437 }
438
439 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
440 if (l < 0) {
441
442 if (errno == EAGAIN)
443 return 0;
444
445 log_warning_errno(errno, "Failed to read from stream: %m");
446 goto terminate;
447 }
448
449 if (l == 0) {
450 stdout_stream_scan(s, true);
451 goto terminate;
452 }
453
454 s->length += l;
455 r = stdout_stream_scan(s, false);
456 if (r < 0)
457 goto terminate;
458
459 return 1;
460
461 terminate:
462 stdout_stream_destroy(s);
463 return 0;
464 }
465
466 static int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
467 _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
468 int r;
469
470 assert(s);
471 assert(fd >= 0);
472
473 stream = new0(StdoutStream, 1);
474 if (!stream)
475 return log_oom();
476
477 stream->fd = -1;
478 stream->priority = LOG_INFO;
479
480 r = getpeercred(fd, &stream->ucred);
481 if (r < 0)
482 return log_error_errno(r, "Failed to determine peer credentials: %m");
483
484 if (mac_selinux_use()) {
485 r = getpeersec(fd, &stream->label);
486 if (r < 0 && r != -EOPNOTSUPP)
487 (void) log_warning_errno(r, "Failed to determine peer security context: %m");
488 }
489
490 (void) shutdown(fd, SHUT_WR);
491
492 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
493 if (r < 0)
494 return log_error_errno(r, "Failed to add stream to event loop: %m");
495
496 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
497 if (r < 0)
498 return log_error_errno(r, "Failed to adjust stdout event source priority: %m");
499
500 stream->fd = fd;
501
502 stream->server = s;
503 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
504 s->n_stdout_streams ++;
505
506 if (ret)
507 *ret = stream;
508
509 stream = NULL;
510
511 return 0;
512 }
513
514 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
515 _cleanup_close_ int fd = -1;
516 Server *s = userdata;
517 int r;
518
519 assert(s);
520
521 if (revents != EPOLLIN) {
522 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
523 return -EIO;
524 }
525
526 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
527 if (fd < 0) {
528 if (errno == EAGAIN)
529 return 0;
530
531 log_error_errno(errno, "Failed to accept stdout connection: %m");
532 return -errno;
533 }
534
535 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
536 log_warning("Too many stdout streams, refusing connection.");
537 return 0;
538 }
539
540 r = stdout_stream_install(s, fd, NULL);
541 if (r < 0)
542 return r;
543
544 fd = -1;
545 return 0;
546 }
547
548 static int stdout_stream_load(StdoutStream *stream, const char *fname) {
549 _cleanup_free_ char
550 *priority = NULL,
551 *level_prefix = NULL,
552 *forward_to_syslog = NULL,
553 *forward_to_kmsg = NULL,
554 *forward_to_console = NULL;
555 int r;
556
557 assert(stream);
558 assert(fname);
559
560 if (!stream->state_file) {
561 stream->state_file = strappend("/run/systemd/journal/streams/", fname);
562 if (!stream->state_file)
563 return log_oom();
564 }
565
566 r = parse_env_file(stream->state_file, NEWLINE,
567 "PRIORITY", &priority,
568 "LEVEL_PREFIX", &level_prefix,
569 "FORWARD_TO_SYSLOG", &forward_to_syslog,
570 "FORWARD_TO_KMSG", &forward_to_kmsg,
571 "FORWARD_TO_CONSOLE", &forward_to_console,
572 "IDENTIFIER", &stream->identifier,
573 "UNIT", &stream->unit_id,
574 NULL);
575 if (r < 0)
576 return log_error_errno(r, "Failed to read: %s", stream->state_file);
577
578 if (priority) {
579 int p;
580
581 p = log_level_from_string(priority);
582 if (p >= 0)
583 stream->priority = p;
584 }
585
586 if (level_prefix) {
587 r = parse_boolean(level_prefix);
588 if (r >= 0)
589 stream->level_prefix = r;
590 }
591
592 if (forward_to_syslog) {
593 r = parse_boolean(forward_to_syslog);
594 if (r >= 0)
595 stream->forward_to_syslog = r;
596 }
597
598 if (forward_to_kmsg) {
599 r = parse_boolean(forward_to_kmsg);
600 if (r >= 0)
601 stream->forward_to_kmsg = r;
602 }
603
604 if (forward_to_console) {
605 r = parse_boolean(forward_to_console);
606 if (r >= 0)
607 stream->forward_to_console = r;
608 }
609
610 return 0;
611 }
612
613 static int stdout_stream_restore(Server *s, const char *fname, int fd) {
614 StdoutStream *stream;
615 int r;
616
617 assert(s);
618 assert(fname);
619 assert(fd >= 0);
620
621 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
622 log_warning("Too many stdout streams, refusing restoring of stream.");
623 return -ENOBUFS;
624 }
625
626 r = stdout_stream_install(s, fd, &stream);
627 if (r < 0)
628 return r;
629
630 stream->state = STDOUT_STREAM_RUNNING;
631 stream->fdstore = true;
632
633 /* Ignore all parsing errors */
634 (void) stdout_stream_load(stream, fname);
635
636 return 0;
637 }
638
639 int server_restore_streams(Server *s, FDSet *fds) {
640 _cleanup_closedir_ DIR *d = NULL;
641 struct dirent *de;
642 int r;
643
644 d = opendir("/run/systemd/journal/streams");
645 if (!d) {
646 if (errno == ENOENT)
647 return 0;
648
649 return log_warning_errno(errno, "Failed to enumerate /run/systemd/journal/streams: %m");
650 }
651
652 FOREACH_DIRENT(de, d, goto fail) {
653 unsigned long st_dev, st_ino;
654 bool found = false;
655 Iterator i;
656 int fd;
657
658 if (sscanf(de->d_name, "%lu:%lu", &st_dev, &st_ino) != 2)
659 continue;
660
661 FDSET_FOREACH(fd, fds, i) {
662 struct stat st;
663
664 if (fstat(fd, &st) < 0)
665 return log_error_errno(errno, "Failed to stat %s: %m", de->d_name);
666
667 if (S_ISSOCK(st.st_mode) && st.st_dev == st_dev && st.st_ino == st_ino) {
668 found = true;
669 break;
670 }
671 }
672
673 if (!found) {
674 /* No file descriptor? Then let's delete the state file */
675 log_debug("Cannot restore stream file %s", de->d_name);
676 unlinkat(dirfd(d), de->d_name, 0);
677 continue;
678 }
679
680 fdset_remove(fds, fd);
681
682 r = stdout_stream_restore(s, de->d_name, fd);
683 if (r < 0)
684 safe_close(fd);
685 }
686
687 return 0;
688
689 fail:
690 return log_error_errno(errno, "Failed to read streams directory: %m");
691 }
692
693 int server_open_stdout_socket(Server *s) {
694 int r;
695
696 assert(s);
697
698 if (s->stdout_fd < 0) {
699 union sockaddr_union sa = {
700 .un.sun_family = AF_UNIX,
701 .un.sun_path = "/run/systemd/journal/stdout",
702 };
703
704 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
705 if (s->stdout_fd < 0)
706 return log_error_errno(errno, "socket() failed: %m");
707
708 unlink(sa.un.sun_path);
709
710 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
711 if (r < 0)
712 return log_error_errno(errno, "bind(%s) failed: %m", sa.un.sun_path);
713
714 (void) chmod(sa.un.sun_path, 0666);
715
716 if (listen(s->stdout_fd, SOMAXCONN) < 0)
717 return log_error_errno(errno, "listen(%s) failed: %m", sa.un.sun_path);
718 } else
719 fd_nonblock(s->stdout_fd, 1);
720
721 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
722 if (r < 0)
723 return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
724
725 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
726 if (r < 0)
727 return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
728
729 return 0;
730 }