]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journald-stream.c
Merge pull request #140 from teg/man-udev-timeout
[thirdparty/systemd.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <unistd.h>
23 #include <stddef.h>
24
25 #ifdef HAVE_SELINUX
26 #include <selinux/selinux.h>
27 #endif
28
29 #include "sd-event.h"
30 #include "sd-daemon.h"
31 #include "socket-util.h"
32 #include "selinux-util.h"
33 #include "mkdir.h"
34 #include "fileio.h"
35 #include "journald-server.h"
36 #include "journald-stream.h"
37 #include "journald-syslog.h"
38 #include "journald-kmsg.h"
39 #include "journald-console.h"
40 #include "journald-wall.h"
41
42 #define STDOUT_STREAMS_MAX 4096
43
44 typedef enum StdoutStreamState {
45 STDOUT_STREAM_IDENTIFIER,
46 STDOUT_STREAM_UNIT_ID,
47 STDOUT_STREAM_PRIORITY,
48 STDOUT_STREAM_LEVEL_PREFIX,
49 STDOUT_STREAM_FORWARD_TO_SYSLOG,
50 STDOUT_STREAM_FORWARD_TO_KMSG,
51 STDOUT_STREAM_FORWARD_TO_CONSOLE,
52 STDOUT_STREAM_RUNNING
53 } StdoutStreamState;
54
55 struct StdoutStream {
56 Server *server;
57 StdoutStreamState state;
58
59 int fd;
60
61 struct ucred ucred;
62 char *label;
63 char *identifier;
64 char *unit_id;
65 int priority;
66 bool level_prefix:1;
67 bool forward_to_syslog:1;
68 bool forward_to_kmsg:1;
69 bool forward_to_console:1;
70
71 bool fdstore:1;
72
73 char buffer[LINE_MAX+1];
74 size_t length;
75
76 sd_event_source *event_source;
77
78 char *state_file;
79
80 LIST_FIELDS(StdoutStream, stdout_stream);
81 };
82
83 void stdout_stream_free(StdoutStream *s) {
84 if (!s)
85 return;
86
87 if (s->server) {
88 assert(s->server->n_stdout_streams > 0);
89 s->server->n_stdout_streams --;
90 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
91 }
92
93 if (s->event_source) {
94 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
95 s->event_source = sd_event_source_unref(s->event_source);
96 }
97
98 safe_close(s->fd);
99 free(s->label);
100 free(s->identifier);
101 free(s->unit_id);
102 free(s->state_file);
103
104 free(s);
105 }
106
107 DEFINE_TRIVIAL_CLEANUP_FUNC(StdoutStream*, stdout_stream_free);
108
109 static void stdout_stream_destroy(StdoutStream *s) {
110 if (!s)
111 return;
112
113 if (s->state_file)
114 unlink(s->state_file);
115
116 stdout_stream_free(s);
117 }
118
119 static int stdout_stream_save(StdoutStream *s) {
120 _cleanup_free_ char *temp_path = NULL;
121 _cleanup_fclose_ FILE *f = NULL;
122 int r;
123
124 assert(s);
125
126 if (s->state != STDOUT_STREAM_RUNNING)
127 return 0;
128
129 if (!s->state_file) {
130 struct stat st;
131
132 r = fstat(s->fd, &st);
133 if (r < 0)
134 return log_warning_errno(errno, "Failed to stat connected stream: %m");
135
136 /* We use device and inode numbers as identifier for the stream */
137 if (asprintf(&s->state_file, "/run/systemd/journal/streams/%lu:%lu", (unsigned long) st.st_dev, (unsigned long) st.st_ino) < 0)
138 return log_oom();
139 }
140
141 mkdir_p("/run/systemd/journal/streams", 0755);
142
143 r = fopen_temporary(s->state_file, &f, &temp_path);
144 if (r < 0)
145 goto finish;
146
147 fprintf(f,
148 "# This is private data. Do not parse\n"
149 "PRIORITY=%i\n"
150 "LEVEL_PREFIX=%i\n"
151 "FORWARD_TO_SYSLOG=%i\n"
152 "FORWARD_TO_KMSG=%i\n"
153 "FORWARD_TO_CONSOLE=%i\n",
154 s->priority,
155 s->level_prefix,
156 s->forward_to_syslog,
157 s->forward_to_kmsg,
158 s->forward_to_console);
159
160 if (!isempty(s->identifier)) {
161 _cleanup_free_ char *escaped;
162
163 escaped = cescape(s->identifier);
164 if (!escaped) {
165 r = -ENOMEM;
166 goto finish;
167 }
168
169 fprintf(f, "IDENTIFIER=%s\n", escaped);
170 }
171
172 if (!isempty(s->unit_id)) {
173 _cleanup_free_ char *escaped;
174
175 escaped = cescape(s->unit_id);
176 if (!escaped) {
177 r = -ENOMEM;
178 goto finish;
179 }
180
181 fprintf(f, "UNIT=%s\n", escaped);
182 }
183
184 r = fflush_and_check(f);
185 if (r < 0)
186 goto finish;
187
188 if (rename(temp_path, s->state_file) < 0) {
189 r = -errno;
190 goto finish;
191 }
192
193 free(temp_path);
194 temp_path = NULL;
195
196 /* Store the connection fd in PID 1, so that we get it passed
197 * in again on next start */
198 if (!s->fdstore) {
199 sd_pid_notify_with_fds(0, false, "FDSTORE=1", &s->fd, 1);
200 s->fdstore = true;
201 }
202
203 finish:
204 if (temp_path)
205 unlink(temp_path);
206
207 if (r < 0)
208 log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
209
210 return r;
211 }
212
213 static int stdout_stream_log(StdoutStream *s, const char *p) {
214 struct iovec iovec[N_IOVEC_META_FIELDS + 5];
215 int priority;
216 char syslog_priority[] = "PRIORITY=\0";
217 char syslog_facility[sizeof("SYSLOG_FACILITY=")-1 + DECIMAL_STR_MAX(int) + 1];
218 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
219 unsigned n = 0;
220 size_t label_len;
221
222 assert(s);
223 assert(p);
224
225 if (isempty(p))
226 return 0;
227
228 priority = s->priority;
229
230 if (s->level_prefix)
231 syslog_parse_priority(&p, &priority, false);
232
233 if (s->forward_to_syslog || s->server->forward_to_syslog)
234 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
235
236 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
237 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
238
239 if (s->forward_to_console || s->server->forward_to_console)
240 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
241
242 if (s->server->forward_to_wall)
243 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
244
245 IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
246
247 syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
248 IOVEC_SET_STRING(iovec[n++], syslog_priority);
249
250 if (priority & LOG_FACMASK) {
251 xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
252 IOVEC_SET_STRING(iovec[n++], syslog_facility);
253 }
254
255 if (s->identifier) {
256 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
257 if (syslog_identifier)
258 IOVEC_SET_STRING(iovec[n++], syslog_identifier);
259 }
260
261 message = strappend("MESSAGE=", p);
262 if (message)
263 IOVEC_SET_STRING(iovec[n++], message);
264
265 label_len = s->label ? strlen(s->label) : 0;
266 server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, s->label, label_len, s->unit_id, priority, 0);
267 return 0;
268 }
269
270 static int stdout_stream_line(StdoutStream *s, char *p) {
271 int r;
272
273 assert(s);
274 assert(p);
275
276 p = strstrip(p);
277
278 switch (s->state) {
279
280 case STDOUT_STREAM_IDENTIFIER:
281 if (isempty(p))
282 s->identifier = NULL;
283 else {
284 s->identifier = strdup(p);
285 if (!s->identifier)
286 return log_oom();
287 }
288
289 s->state = STDOUT_STREAM_UNIT_ID;
290 return 0;
291
292 case STDOUT_STREAM_UNIT_ID:
293 if (s->ucred.uid == 0) {
294 if (isempty(p))
295 s->unit_id = NULL;
296 else {
297 s->unit_id = strdup(p);
298 if (!s->unit_id)
299 return log_oom();
300 }
301 }
302
303 s->state = STDOUT_STREAM_PRIORITY;
304 return 0;
305
306 case STDOUT_STREAM_PRIORITY:
307 r = safe_atoi(p, &s->priority);
308 if (r < 0 || s->priority < 0 || s->priority > 999) {
309 log_warning("Failed to parse log priority line.");
310 return -EINVAL;
311 }
312
313 s->state = STDOUT_STREAM_LEVEL_PREFIX;
314 return 0;
315
316 case STDOUT_STREAM_LEVEL_PREFIX:
317 r = parse_boolean(p);
318 if (r < 0) {
319 log_warning("Failed to parse level prefix line.");
320 return -EINVAL;
321 }
322
323 s->level_prefix = !!r;
324 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
325 return 0;
326
327 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
328 r = parse_boolean(p);
329 if (r < 0) {
330 log_warning("Failed to parse forward to syslog line.");
331 return -EINVAL;
332 }
333
334 s->forward_to_syslog = !!r;
335 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
336 return 0;
337
338 case STDOUT_STREAM_FORWARD_TO_KMSG:
339 r = parse_boolean(p);
340 if (r < 0) {
341 log_warning("Failed to parse copy to kmsg line.");
342 return -EINVAL;
343 }
344
345 s->forward_to_kmsg = !!r;
346 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
347 return 0;
348
349 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
350 r = parse_boolean(p);
351 if (r < 0) {
352 log_warning("Failed to parse copy to console line.");
353 return -EINVAL;
354 }
355
356 s->forward_to_console = !!r;
357 s->state = STDOUT_STREAM_RUNNING;
358
359 /* Try to save the stream, so that journald can be restarted and we can recover */
360 (void) stdout_stream_save(s);
361 return 0;
362
363 case STDOUT_STREAM_RUNNING:
364 return stdout_stream_log(s, p);
365 }
366
367 assert_not_reached("Unknown stream state");
368 }
369
370 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
371 char *p;
372 size_t remaining;
373 int r;
374
375 assert(s);
376
377 p = s->buffer;
378 remaining = s->length;
379 for (;;) {
380 char *end;
381 size_t skip;
382
383 end = memchr(p, '\n', remaining);
384 if (end)
385 skip = end - p + 1;
386 else if (remaining >= sizeof(s->buffer) - 1) {
387 end = p + sizeof(s->buffer) - 1;
388 skip = remaining;
389 } else
390 break;
391
392 *end = 0;
393
394 r = stdout_stream_line(s, p);
395 if (r < 0)
396 return r;
397
398 remaining -= skip;
399 p += skip;
400 }
401
402 if (force_flush && remaining > 0) {
403 p[remaining] = 0;
404 r = stdout_stream_line(s, p);
405 if (r < 0)
406 return r;
407
408 p += remaining;
409 remaining = 0;
410 }
411
412 if (p > s->buffer) {
413 memmove(s->buffer, p, remaining);
414 s->length = remaining;
415 }
416
417 return 0;
418 }
419
420 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
421 StdoutStream *s = userdata;
422 ssize_t l;
423 int r;
424
425 assert(s);
426
427 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
428 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
429 goto terminate;
430 }
431
432 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
433 if (l < 0) {
434
435 if (errno == EAGAIN)
436 return 0;
437
438 log_warning_errno(errno, "Failed to read from stream: %m");
439 goto terminate;
440 }
441
442 if (l == 0) {
443 stdout_stream_scan(s, true);
444 goto terminate;
445 }
446
447 s->length += l;
448 r = stdout_stream_scan(s, false);
449 if (r < 0)
450 goto terminate;
451
452 return 1;
453
454 terminate:
455 stdout_stream_destroy(s);
456 return 0;
457 }
458
459 static int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
460 _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
461 int r;
462
463 assert(s);
464 assert(fd >= 0);
465
466 stream = new0(StdoutStream, 1);
467 if (!stream)
468 return log_oom();
469
470 stream->fd = -1;
471 stream->priority = LOG_INFO;
472
473 r = getpeercred(fd, &stream->ucred);
474 if (r < 0)
475 return log_error_errno(r, "Failed to determine peer credentials: %m");
476
477 if (mac_selinux_use()) {
478 r = getpeersec(fd, &stream->label);
479 if (r < 0 && r != -EOPNOTSUPP)
480 (void) log_warning_errno(r, "Failed to determine peer security context: %m");
481 }
482
483 (void) shutdown(fd, SHUT_WR);
484
485 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
486 if (r < 0)
487 return log_error_errno(r, "Failed to add stream to event loop: %m");
488
489 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
490 if (r < 0)
491 return log_error_errno(r, "Failed to adjust stdout event source priority: %m");
492
493 stream->fd = fd;
494
495 stream->server = s;
496 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
497 s->n_stdout_streams ++;
498
499 if (ret)
500 *ret = stream;
501
502 stream = NULL;
503
504 return 0;
505 }
506
507 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
508 _cleanup_close_ int fd = -1;
509 Server *s = userdata;
510 int r;
511
512 assert(s);
513
514 if (revents != EPOLLIN) {
515 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
516 return -EIO;
517 }
518
519 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
520 if (fd < 0) {
521 if (errno == EAGAIN)
522 return 0;
523
524 log_error_errno(errno, "Failed to accept stdout connection: %m");
525 return -errno;
526 }
527
528 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
529 log_warning("Too many stdout streams, refusing connection.");
530 return 0;
531 }
532
533 r = stdout_stream_install(s, fd, NULL);
534 if (r < 0)
535 return r;
536
537 fd = -1;
538 return 0;
539 }
540
541 static int stdout_stream_load(StdoutStream *stream, const char *fname) {
542 _cleanup_free_ char
543 *priority = NULL,
544 *level_prefix = NULL,
545 *forward_to_syslog = NULL,
546 *forward_to_kmsg = NULL,
547 *forward_to_console = NULL;
548 int r;
549
550 assert(stream);
551 assert(fname);
552
553 if (!stream->state_file) {
554 stream->state_file = strappend("/run/systemd/journal/streams/", fname);
555 if (!stream->state_file)
556 return log_oom();
557 }
558
559 r = parse_env_file(stream->state_file, NEWLINE,
560 "PRIORITY", &priority,
561 "LEVEL_PREFIX", &level_prefix,
562 "FORWARD_TO_SYSLOG", &forward_to_syslog,
563 "FORWARD_TO_KMSG", &forward_to_kmsg,
564 "FORWARD_TO_CONSOLE", &forward_to_console,
565 "IDENTIFIER", &stream->identifier,
566 "UNIT", &stream->unit_id,
567 NULL);
568 if (r < 0)
569 return log_error_errno(r, "Failed to read: %s", stream->state_file);
570
571 if (priority) {
572 int p;
573
574 p = log_level_from_string(priority);
575 if (p >= 0)
576 stream->priority = p;
577 }
578
579 if (level_prefix) {
580 r = parse_boolean(level_prefix);
581 if (r >= 0)
582 stream->level_prefix = r;
583 }
584
585 if (forward_to_syslog) {
586 r = parse_boolean(forward_to_syslog);
587 if (r >= 0)
588 stream->forward_to_syslog = r;
589 }
590
591 if (forward_to_kmsg) {
592 r = parse_boolean(forward_to_kmsg);
593 if (r >= 0)
594 stream->forward_to_kmsg = r;
595 }
596
597 if (forward_to_console) {
598 r = parse_boolean(forward_to_console);
599 if (r >= 0)
600 stream->forward_to_console = r;
601 }
602
603 return 0;
604 }
605
606 static int stdout_stream_restore(Server *s, const char *fname, int fd) {
607 StdoutStream *stream;
608 int r;
609
610 assert(s);
611 assert(fname);
612 assert(fd >= 0);
613
614 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
615 log_warning("Too many stdout streams, refusing restoring of stream.");
616 return -ENOBUFS;
617 }
618
619 r = stdout_stream_install(s, fd, &stream);
620 if (r < 0)
621 return r;
622
623 stream->state = STDOUT_STREAM_RUNNING;
624 stream->fdstore = true;
625
626 /* Ignore all parsing errors */
627 (void) stdout_stream_load(stream, fname);
628
629 return 0;
630 }
631
632 static int server_restore_streams(Server *s, FDSet *fds) {
633 _cleanup_closedir_ DIR *d = NULL;
634 struct dirent *de;
635 int r;
636
637 d = opendir("/run/systemd/journal/streams");
638 if (!d) {
639 if (errno == ENOENT)
640 return 0;
641
642 return log_warning_errno(errno, "Failed to enumerate /run/systemd/journal/streams: %m");
643 }
644
645 FOREACH_DIRENT(de, d, goto fail) {
646 unsigned long st_dev, st_ino;
647 bool found = false;
648 Iterator i;
649 int fd;
650
651 if (sscanf(de->d_name, "%lu:%lu", &st_dev, &st_ino) != 2)
652 continue;
653
654 FDSET_FOREACH(fd, fds, i) {
655 struct stat st;
656
657 if (fstat(fd, &st) < 0)
658 return log_error_errno(errno, "Failed to stat %s: %m", de->d_name);
659
660 if (S_ISSOCK(st.st_mode) && st.st_dev == st_dev && st.st_ino == st_ino) {
661 found = true;
662 break;
663 }
664 }
665
666 if (!found) {
667 /* No file descriptor? Then let's delete the state file */
668 log_debug("Cannot restore stream file %s", de->d_name);
669 unlinkat(dirfd(d), de->d_name, 0);
670 continue;
671 }
672
673 fdset_remove(fds, fd);
674
675 r = stdout_stream_restore(s, de->d_name, fd);
676 if (r < 0)
677 safe_close(fd);
678 }
679
680 return 0;
681
682 fail:
683 return log_error_errno(errno, "Failed to read streams directory: %m");
684 }
685
686 int server_open_stdout_socket(Server *s, FDSet *fds) {
687 int r;
688
689 assert(s);
690
691 if (s->stdout_fd < 0) {
692 union sockaddr_union sa = {
693 .un.sun_family = AF_UNIX,
694 .un.sun_path = "/run/systemd/journal/stdout",
695 };
696
697 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
698 if (s->stdout_fd < 0)
699 return log_error_errno(errno, "socket() failed: %m");
700
701 unlink(sa.un.sun_path);
702
703 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
704 if (r < 0)
705 return log_error_errno(errno, "bind(%s) failed: %m", sa.un.sun_path);
706
707 (void) chmod(sa.un.sun_path, 0666);
708
709 if (listen(s->stdout_fd, SOMAXCONN) < 0)
710 return log_error_errno(errno, "listen(%s) failed: %m", sa.un.sun_path);
711 } else
712 fd_nonblock(s->stdout_fd, 1);
713
714 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
715 if (r < 0)
716 return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
717
718 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
719 if (r < 0)
720 return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
721
722 /* Try to restore streams, but don't bother if this fails */
723 (void) server_restore_streams(s, fds);
724
725 return 0;
726 }