]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journald-stream.c
Merge pull request #1668 from ssahani/net1
[thirdparty/systemd.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <stddef.h>
23 #include <unistd.h>
24
25 #ifdef HAVE_SELINUX
26 #include <selinux/selinux.h>
27 #endif
28
29 #include "sd-daemon.h"
30 #include "sd-event.h"
31
32 #include "escape.h"
33 #include "fd-util.h"
34 #include "fileio.h"
35 #include "journald-console.h"
36 #include "journald-kmsg.h"
37 #include "journald-server.h"
38 #include "journald-stream.h"
39 #include "journald-syslog.h"
40 #include "journald-wall.h"
41 #include "mkdir.h"
42 #include "selinux-util.h"
43 #include "socket-util.h"
44 #include "string-util.h"
45
46 #define STDOUT_STREAMS_MAX 4096
47
48 typedef enum StdoutStreamState {
49 STDOUT_STREAM_IDENTIFIER,
50 STDOUT_STREAM_UNIT_ID,
51 STDOUT_STREAM_PRIORITY,
52 STDOUT_STREAM_LEVEL_PREFIX,
53 STDOUT_STREAM_FORWARD_TO_SYSLOG,
54 STDOUT_STREAM_FORWARD_TO_KMSG,
55 STDOUT_STREAM_FORWARD_TO_CONSOLE,
56 STDOUT_STREAM_RUNNING
57 } StdoutStreamState;
58
59 struct StdoutStream {
60 Server *server;
61 StdoutStreamState state;
62
63 int fd;
64
65 struct ucred ucred;
66 char *label;
67 char *identifier;
68 char *unit_id;
69 int priority;
70 bool level_prefix:1;
71 bool forward_to_syslog:1;
72 bool forward_to_kmsg:1;
73 bool forward_to_console:1;
74
75 bool fdstore:1;
76
77 char buffer[LINE_MAX+1];
78 size_t length;
79
80 sd_event_source *event_source;
81
82 char *state_file;
83
84 LIST_FIELDS(StdoutStream, stdout_stream);
85 };
86
87 void stdout_stream_free(StdoutStream *s) {
88 if (!s)
89 return;
90
91 if (s->server) {
92 assert(s->server->n_stdout_streams > 0);
93 s->server->n_stdout_streams --;
94 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
95 }
96
97 if (s->event_source) {
98 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
99 s->event_source = sd_event_source_unref(s->event_source);
100 }
101
102 safe_close(s->fd);
103 free(s->label);
104 free(s->identifier);
105 free(s->unit_id);
106 free(s->state_file);
107
108 free(s);
109 }
110
111 DEFINE_TRIVIAL_CLEANUP_FUNC(StdoutStream*, stdout_stream_free);
112
113 static void stdout_stream_destroy(StdoutStream *s) {
114 if (!s)
115 return;
116
117 if (s->state_file)
118 unlink(s->state_file);
119
120 stdout_stream_free(s);
121 }
122
123 static int stdout_stream_save(StdoutStream *s) {
124 _cleanup_free_ char *temp_path = NULL;
125 _cleanup_fclose_ FILE *f = NULL;
126 int r;
127
128 assert(s);
129
130 if (s->state != STDOUT_STREAM_RUNNING)
131 return 0;
132
133 if (!s->state_file) {
134 struct stat st;
135
136 r = fstat(s->fd, &st);
137 if (r < 0)
138 return log_warning_errno(errno, "Failed to stat connected stream: %m");
139
140 /* We use device and inode numbers as identifier for the stream */
141 if (asprintf(&s->state_file, "/run/systemd/journal/streams/%lu:%lu", (unsigned long) st.st_dev, (unsigned long) st.st_ino) < 0)
142 return log_oom();
143 }
144
145 mkdir_p("/run/systemd/journal/streams", 0755);
146
147 r = fopen_temporary(s->state_file, &f, &temp_path);
148 if (r < 0)
149 goto fail;
150
151 fprintf(f,
152 "# This is private data. Do not parse\n"
153 "PRIORITY=%i\n"
154 "LEVEL_PREFIX=%i\n"
155 "FORWARD_TO_SYSLOG=%i\n"
156 "FORWARD_TO_KMSG=%i\n"
157 "FORWARD_TO_CONSOLE=%i\n",
158 s->priority,
159 s->level_prefix,
160 s->forward_to_syslog,
161 s->forward_to_kmsg,
162 s->forward_to_console);
163
164 if (!isempty(s->identifier)) {
165 _cleanup_free_ char *escaped;
166
167 escaped = cescape(s->identifier);
168 if (!escaped) {
169 r = -ENOMEM;
170 goto fail;
171 }
172
173 fprintf(f, "IDENTIFIER=%s\n", escaped);
174 }
175
176 if (!isempty(s->unit_id)) {
177 _cleanup_free_ char *escaped;
178
179 escaped = cescape(s->unit_id);
180 if (!escaped) {
181 r = -ENOMEM;
182 goto fail;
183 }
184
185 fprintf(f, "UNIT=%s\n", escaped);
186 }
187
188 r = fflush_and_check(f);
189 if (r < 0)
190 goto fail;
191
192 if (rename(temp_path, s->state_file) < 0) {
193 r = -errno;
194 goto fail;
195 }
196
197 /* Store the connection fd in PID 1, so that we get it passed
198 * in again on next start */
199 if (!s->fdstore) {
200 sd_pid_notify_with_fds(0, false, "FDSTORE=1", &s->fd, 1);
201 s->fdstore = true;
202 }
203
204 return 0;
205
206 fail:
207 (void) unlink(s->state_file);
208
209 if (temp_path)
210 (void) unlink(temp_path);
211
212 return log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
213 }
214
215 static int stdout_stream_log(StdoutStream *s, const char *p) {
216 struct iovec iovec[N_IOVEC_META_FIELDS + 5];
217 int priority;
218 char syslog_priority[] = "PRIORITY=\0";
219 char syslog_facility[sizeof("SYSLOG_FACILITY=")-1 + DECIMAL_STR_MAX(int) + 1];
220 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
221 unsigned n = 0;
222 size_t label_len;
223
224 assert(s);
225 assert(p);
226
227 if (isempty(p))
228 return 0;
229
230 priority = s->priority;
231
232 if (s->level_prefix)
233 syslog_parse_priority(&p, &priority, false);
234
235 if (s->forward_to_syslog || s->server->forward_to_syslog)
236 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
237
238 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
239 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
240
241 if (s->forward_to_console || s->server->forward_to_console)
242 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
243
244 if (s->server->forward_to_wall)
245 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
246
247 IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
248
249 syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
250 IOVEC_SET_STRING(iovec[n++], syslog_priority);
251
252 if (priority & LOG_FACMASK) {
253 xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
254 IOVEC_SET_STRING(iovec[n++], syslog_facility);
255 }
256
257 if (s->identifier) {
258 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
259 if (syslog_identifier)
260 IOVEC_SET_STRING(iovec[n++], syslog_identifier);
261 }
262
263 message = strappend("MESSAGE=", p);
264 if (message)
265 IOVEC_SET_STRING(iovec[n++], message);
266
267 label_len = s->label ? strlen(s->label) : 0;
268 server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, s->label, label_len, s->unit_id, priority, 0);
269 return 0;
270 }
271
272 static int stdout_stream_line(StdoutStream *s, char *p) {
273 int r;
274
275 assert(s);
276 assert(p);
277
278 p = strstrip(p);
279
280 switch (s->state) {
281
282 case STDOUT_STREAM_IDENTIFIER:
283 if (isempty(p))
284 s->identifier = NULL;
285 else {
286 s->identifier = strdup(p);
287 if (!s->identifier)
288 return log_oom();
289 }
290
291 s->state = STDOUT_STREAM_UNIT_ID;
292 return 0;
293
294 case STDOUT_STREAM_UNIT_ID:
295 if (s->ucred.uid == 0) {
296 if (isempty(p))
297 s->unit_id = NULL;
298 else {
299 s->unit_id = strdup(p);
300 if (!s->unit_id)
301 return log_oom();
302 }
303 }
304
305 s->state = STDOUT_STREAM_PRIORITY;
306 return 0;
307
308 case STDOUT_STREAM_PRIORITY:
309 r = safe_atoi(p, &s->priority);
310 if (r < 0 || s->priority < 0 || s->priority > 999) {
311 log_warning("Failed to parse log priority line.");
312 return -EINVAL;
313 }
314
315 s->state = STDOUT_STREAM_LEVEL_PREFIX;
316 return 0;
317
318 case STDOUT_STREAM_LEVEL_PREFIX:
319 r = parse_boolean(p);
320 if (r < 0) {
321 log_warning("Failed to parse level prefix line.");
322 return -EINVAL;
323 }
324
325 s->level_prefix = !!r;
326 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
327 return 0;
328
329 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
330 r = parse_boolean(p);
331 if (r < 0) {
332 log_warning("Failed to parse forward to syslog line.");
333 return -EINVAL;
334 }
335
336 s->forward_to_syslog = !!r;
337 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
338 return 0;
339
340 case STDOUT_STREAM_FORWARD_TO_KMSG:
341 r = parse_boolean(p);
342 if (r < 0) {
343 log_warning("Failed to parse copy to kmsg line.");
344 return -EINVAL;
345 }
346
347 s->forward_to_kmsg = !!r;
348 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
349 return 0;
350
351 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
352 r = parse_boolean(p);
353 if (r < 0) {
354 log_warning("Failed to parse copy to console line.");
355 return -EINVAL;
356 }
357
358 s->forward_to_console = !!r;
359 s->state = STDOUT_STREAM_RUNNING;
360
361 /* Try to save the stream, so that journald can be restarted and we can recover */
362 (void) stdout_stream_save(s);
363 return 0;
364
365 case STDOUT_STREAM_RUNNING:
366 return stdout_stream_log(s, p);
367 }
368
369 assert_not_reached("Unknown stream state");
370 }
371
372 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
373 char *p;
374 size_t remaining;
375 int r;
376
377 assert(s);
378
379 p = s->buffer;
380 remaining = s->length;
381 for (;;) {
382 char *end;
383 size_t skip;
384
385 end = memchr(p, '\n', remaining);
386 if (end)
387 skip = end - p + 1;
388 else if (remaining >= sizeof(s->buffer) - 1) {
389 end = p + sizeof(s->buffer) - 1;
390 skip = remaining;
391 } else
392 break;
393
394 *end = 0;
395
396 r = stdout_stream_line(s, p);
397 if (r < 0)
398 return r;
399
400 remaining -= skip;
401 p += skip;
402 }
403
404 if (force_flush && remaining > 0) {
405 p[remaining] = 0;
406 r = stdout_stream_line(s, p);
407 if (r < 0)
408 return r;
409
410 p += remaining;
411 remaining = 0;
412 }
413
414 if (p > s->buffer) {
415 memmove(s->buffer, p, remaining);
416 s->length = remaining;
417 }
418
419 return 0;
420 }
421
422 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
423 StdoutStream *s = userdata;
424 ssize_t l;
425 int r;
426
427 assert(s);
428
429 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
430 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
431 goto terminate;
432 }
433
434 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
435 if (l < 0) {
436
437 if (errno == EAGAIN)
438 return 0;
439
440 log_warning_errno(errno, "Failed to read from stream: %m");
441 goto terminate;
442 }
443
444 if (l == 0) {
445 stdout_stream_scan(s, true);
446 goto terminate;
447 }
448
449 s->length += l;
450 r = stdout_stream_scan(s, false);
451 if (r < 0)
452 goto terminate;
453
454 return 1;
455
456 terminate:
457 stdout_stream_destroy(s);
458 return 0;
459 }
460
461 static int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
462 _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
463 int r;
464
465 assert(s);
466 assert(fd >= 0);
467
468 stream = new0(StdoutStream, 1);
469 if (!stream)
470 return log_oom();
471
472 stream->fd = -1;
473 stream->priority = LOG_INFO;
474
475 r = getpeercred(fd, &stream->ucred);
476 if (r < 0)
477 return log_error_errno(r, "Failed to determine peer credentials: %m");
478
479 if (mac_selinux_use()) {
480 r = getpeersec(fd, &stream->label);
481 if (r < 0 && r != -EOPNOTSUPP)
482 (void) log_warning_errno(r, "Failed to determine peer security context: %m");
483 }
484
485 (void) shutdown(fd, SHUT_WR);
486
487 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
488 if (r < 0)
489 return log_error_errno(r, "Failed to add stream to event loop: %m");
490
491 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
492 if (r < 0)
493 return log_error_errno(r, "Failed to adjust stdout event source priority: %m");
494
495 stream->fd = fd;
496
497 stream->server = s;
498 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
499 s->n_stdout_streams ++;
500
501 if (ret)
502 *ret = stream;
503
504 stream = NULL;
505
506 return 0;
507 }
508
509 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
510 _cleanup_close_ int fd = -1;
511 Server *s = userdata;
512 int r;
513
514 assert(s);
515
516 if (revents != EPOLLIN) {
517 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
518 return -EIO;
519 }
520
521 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
522 if (fd < 0) {
523 if (errno == EAGAIN)
524 return 0;
525
526 log_error_errno(errno, "Failed to accept stdout connection: %m");
527 return -errno;
528 }
529
530 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
531 log_warning("Too many stdout streams, refusing connection.");
532 return 0;
533 }
534
535 r = stdout_stream_install(s, fd, NULL);
536 if (r < 0)
537 return r;
538
539 fd = -1;
540 return 0;
541 }
542
543 static int stdout_stream_load(StdoutStream *stream, const char *fname) {
544 _cleanup_free_ char
545 *priority = NULL,
546 *level_prefix = NULL,
547 *forward_to_syslog = NULL,
548 *forward_to_kmsg = NULL,
549 *forward_to_console = NULL;
550 int r;
551
552 assert(stream);
553 assert(fname);
554
555 if (!stream->state_file) {
556 stream->state_file = strappend("/run/systemd/journal/streams/", fname);
557 if (!stream->state_file)
558 return log_oom();
559 }
560
561 r = parse_env_file(stream->state_file, NEWLINE,
562 "PRIORITY", &priority,
563 "LEVEL_PREFIX", &level_prefix,
564 "FORWARD_TO_SYSLOG", &forward_to_syslog,
565 "FORWARD_TO_KMSG", &forward_to_kmsg,
566 "FORWARD_TO_CONSOLE", &forward_to_console,
567 "IDENTIFIER", &stream->identifier,
568 "UNIT", &stream->unit_id,
569 NULL);
570 if (r < 0)
571 return log_error_errno(r, "Failed to read: %s", stream->state_file);
572
573 if (priority) {
574 int p;
575
576 p = log_level_from_string(priority);
577 if (p >= 0)
578 stream->priority = p;
579 }
580
581 if (level_prefix) {
582 r = parse_boolean(level_prefix);
583 if (r >= 0)
584 stream->level_prefix = r;
585 }
586
587 if (forward_to_syslog) {
588 r = parse_boolean(forward_to_syslog);
589 if (r >= 0)
590 stream->forward_to_syslog = r;
591 }
592
593 if (forward_to_kmsg) {
594 r = parse_boolean(forward_to_kmsg);
595 if (r >= 0)
596 stream->forward_to_kmsg = r;
597 }
598
599 if (forward_to_console) {
600 r = parse_boolean(forward_to_console);
601 if (r >= 0)
602 stream->forward_to_console = r;
603 }
604
605 return 0;
606 }
607
608 static int stdout_stream_restore(Server *s, const char *fname, int fd) {
609 StdoutStream *stream;
610 int r;
611
612 assert(s);
613 assert(fname);
614 assert(fd >= 0);
615
616 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
617 log_warning("Too many stdout streams, refusing restoring of stream.");
618 return -ENOBUFS;
619 }
620
621 r = stdout_stream_install(s, fd, &stream);
622 if (r < 0)
623 return r;
624
625 stream->state = STDOUT_STREAM_RUNNING;
626 stream->fdstore = true;
627
628 /* Ignore all parsing errors */
629 (void) stdout_stream_load(stream, fname);
630
631 return 0;
632 }
633
634 int server_restore_streams(Server *s, FDSet *fds) {
635 _cleanup_closedir_ DIR *d = NULL;
636 struct dirent *de;
637 int r;
638
639 d = opendir("/run/systemd/journal/streams");
640 if (!d) {
641 if (errno == ENOENT)
642 return 0;
643
644 return log_warning_errno(errno, "Failed to enumerate /run/systemd/journal/streams: %m");
645 }
646
647 FOREACH_DIRENT(de, d, goto fail) {
648 unsigned long st_dev, st_ino;
649 bool found = false;
650 Iterator i;
651 int fd;
652
653 if (sscanf(de->d_name, "%lu:%lu", &st_dev, &st_ino) != 2)
654 continue;
655
656 FDSET_FOREACH(fd, fds, i) {
657 struct stat st;
658
659 if (fstat(fd, &st) < 0)
660 return log_error_errno(errno, "Failed to stat %s: %m", de->d_name);
661
662 if (S_ISSOCK(st.st_mode) && st.st_dev == st_dev && st.st_ino == st_ino) {
663 found = true;
664 break;
665 }
666 }
667
668 if (!found) {
669 /* No file descriptor? Then let's delete the state file */
670 log_debug("Cannot restore stream file %s", de->d_name);
671 unlinkat(dirfd(d), de->d_name, 0);
672 continue;
673 }
674
675 fdset_remove(fds, fd);
676
677 r = stdout_stream_restore(s, de->d_name, fd);
678 if (r < 0)
679 safe_close(fd);
680 }
681
682 return 0;
683
684 fail:
685 return log_error_errno(errno, "Failed to read streams directory: %m");
686 }
687
688 int server_open_stdout_socket(Server *s) {
689 int r;
690
691 assert(s);
692
693 if (s->stdout_fd < 0) {
694 union sockaddr_union sa = {
695 .un.sun_family = AF_UNIX,
696 .un.sun_path = "/run/systemd/journal/stdout",
697 };
698
699 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
700 if (s->stdout_fd < 0)
701 return log_error_errno(errno, "socket() failed: %m");
702
703 unlink(sa.un.sun_path);
704
705 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
706 if (r < 0)
707 return log_error_errno(errno, "bind(%s) failed: %m", sa.un.sun_path);
708
709 (void) chmod(sa.un.sun_path, 0666);
710
711 if (listen(s->stdout_fd, SOMAXCONN) < 0)
712 return log_error_errno(errno, "listen(%s) failed: %m", sa.un.sun_path);
713 } else
714 fd_nonblock(s->stdout_fd, 1);
715
716 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
717 if (r < 0)
718 return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
719
720 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
721 if (r < 0)
722 return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
723
724 return 0;
725 }