]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journald-stream.c
util-lib: split out allocation calls into alloc-util.[ch]
[thirdparty/systemd.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <stddef.h>
23 #include <unistd.h>
24
25 #ifdef HAVE_SELINUX
26 #include <selinux/selinux.h>
27 #endif
28
29 #include "sd-daemon.h"
30 #include "sd-event.h"
31
32 #include "alloc-util.h"
33 #include "dirent-util.h"
34 #include "escape.h"
35 #include "fd-util.h"
36 #include "fileio.h"
37 #include "io-util.h"
38 #include "journald-console.h"
39 #include "journald-kmsg.h"
40 #include "journald-server.h"
41 #include "journald-stream.h"
42 #include "journald-syslog.h"
43 #include "journald-wall.h"
44 #include "mkdir.h"
45 #include "parse-util.h"
46 #include "selinux-util.h"
47 #include "socket-util.h"
48 #include "stdio-util.h"
49 #include "string-util.h"
50 #include "syslog-util.h"
51
52 #define STDOUT_STREAMS_MAX 4096
53
54 typedef enum StdoutStreamState {
55 STDOUT_STREAM_IDENTIFIER,
56 STDOUT_STREAM_UNIT_ID,
57 STDOUT_STREAM_PRIORITY,
58 STDOUT_STREAM_LEVEL_PREFIX,
59 STDOUT_STREAM_FORWARD_TO_SYSLOG,
60 STDOUT_STREAM_FORWARD_TO_KMSG,
61 STDOUT_STREAM_FORWARD_TO_CONSOLE,
62 STDOUT_STREAM_RUNNING
63 } StdoutStreamState;
64
65 struct StdoutStream {
66 Server *server;
67 StdoutStreamState state;
68
69 int fd;
70
71 struct ucred ucred;
72 char *label;
73 char *identifier;
74 char *unit_id;
75 int priority;
76 bool level_prefix:1;
77 bool forward_to_syslog:1;
78 bool forward_to_kmsg:1;
79 bool forward_to_console:1;
80
81 bool fdstore:1;
82
83 char buffer[LINE_MAX+1];
84 size_t length;
85
86 sd_event_source *event_source;
87
88 char *state_file;
89
90 LIST_FIELDS(StdoutStream, stdout_stream);
91 };
92
93 void stdout_stream_free(StdoutStream *s) {
94 if (!s)
95 return;
96
97 if (s->server) {
98 assert(s->server->n_stdout_streams > 0);
99 s->server->n_stdout_streams --;
100 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
101 }
102
103 if (s->event_source) {
104 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
105 s->event_source = sd_event_source_unref(s->event_source);
106 }
107
108 safe_close(s->fd);
109 free(s->label);
110 free(s->identifier);
111 free(s->unit_id);
112 free(s->state_file);
113
114 free(s);
115 }
116
117 DEFINE_TRIVIAL_CLEANUP_FUNC(StdoutStream*, stdout_stream_free);
118
119 static void stdout_stream_destroy(StdoutStream *s) {
120 if (!s)
121 return;
122
123 if (s->state_file)
124 unlink(s->state_file);
125
126 stdout_stream_free(s);
127 }
128
129 static int stdout_stream_save(StdoutStream *s) {
130 _cleanup_free_ char *temp_path = NULL;
131 _cleanup_fclose_ FILE *f = NULL;
132 int r;
133
134 assert(s);
135
136 if (s->state != STDOUT_STREAM_RUNNING)
137 return 0;
138
139 if (!s->state_file) {
140 struct stat st;
141
142 r = fstat(s->fd, &st);
143 if (r < 0)
144 return log_warning_errno(errno, "Failed to stat connected stream: %m");
145
146 /* We use device and inode numbers as identifier for the stream */
147 if (asprintf(&s->state_file, "/run/systemd/journal/streams/%lu:%lu", (unsigned long) st.st_dev, (unsigned long) st.st_ino) < 0)
148 return log_oom();
149 }
150
151 mkdir_p("/run/systemd/journal/streams", 0755);
152
153 r = fopen_temporary(s->state_file, &f, &temp_path);
154 if (r < 0)
155 goto fail;
156
157 fprintf(f,
158 "# This is private data. Do not parse\n"
159 "PRIORITY=%i\n"
160 "LEVEL_PREFIX=%i\n"
161 "FORWARD_TO_SYSLOG=%i\n"
162 "FORWARD_TO_KMSG=%i\n"
163 "FORWARD_TO_CONSOLE=%i\n",
164 s->priority,
165 s->level_prefix,
166 s->forward_to_syslog,
167 s->forward_to_kmsg,
168 s->forward_to_console);
169
170 if (!isempty(s->identifier)) {
171 _cleanup_free_ char *escaped;
172
173 escaped = cescape(s->identifier);
174 if (!escaped) {
175 r = -ENOMEM;
176 goto fail;
177 }
178
179 fprintf(f, "IDENTIFIER=%s\n", escaped);
180 }
181
182 if (!isempty(s->unit_id)) {
183 _cleanup_free_ char *escaped;
184
185 escaped = cescape(s->unit_id);
186 if (!escaped) {
187 r = -ENOMEM;
188 goto fail;
189 }
190
191 fprintf(f, "UNIT=%s\n", escaped);
192 }
193
194 r = fflush_and_check(f);
195 if (r < 0)
196 goto fail;
197
198 if (rename(temp_path, s->state_file) < 0) {
199 r = -errno;
200 goto fail;
201 }
202
203 /* Store the connection fd in PID 1, so that we get it passed
204 * in again on next start */
205 if (!s->fdstore) {
206 sd_pid_notify_with_fds(0, false, "FDSTORE=1", &s->fd, 1);
207 s->fdstore = true;
208 }
209
210 return 0;
211
212 fail:
213 (void) unlink(s->state_file);
214
215 if (temp_path)
216 (void) unlink(temp_path);
217
218 return log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
219 }
220
221 static int stdout_stream_log(StdoutStream *s, const char *p) {
222 struct iovec iovec[N_IOVEC_META_FIELDS + 5];
223 int priority;
224 char syslog_priority[] = "PRIORITY=\0";
225 char syslog_facility[sizeof("SYSLOG_FACILITY=")-1 + DECIMAL_STR_MAX(int) + 1];
226 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
227 unsigned n = 0;
228 size_t label_len;
229
230 assert(s);
231 assert(p);
232
233 if (isempty(p))
234 return 0;
235
236 priority = s->priority;
237
238 if (s->level_prefix)
239 syslog_parse_priority(&p, &priority, false);
240
241 if (s->forward_to_syslog || s->server->forward_to_syslog)
242 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
243
244 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
245 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
246
247 if (s->forward_to_console || s->server->forward_to_console)
248 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
249
250 if (s->server->forward_to_wall)
251 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
252
253 IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
254
255 syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
256 IOVEC_SET_STRING(iovec[n++], syslog_priority);
257
258 if (priority & LOG_FACMASK) {
259 xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
260 IOVEC_SET_STRING(iovec[n++], syslog_facility);
261 }
262
263 if (s->identifier) {
264 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
265 if (syslog_identifier)
266 IOVEC_SET_STRING(iovec[n++], syslog_identifier);
267 }
268
269 message = strappend("MESSAGE=", p);
270 if (message)
271 IOVEC_SET_STRING(iovec[n++], message);
272
273 label_len = s->label ? strlen(s->label) : 0;
274 server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, s->label, label_len, s->unit_id, priority, 0);
275 return 0;
276 }
277
278 static int stdout_stream_line(StdoutStream *s, char *p) {
279 int r;
280
281 assert(s);
282 assert(p);
283
284 p = strstrip(p);
285
286 switch (s->state) {
287
288 case STDOUT_STREAM_IDENTIFIER:
289 if (isempty(p))
290 s->identifier = NULL;
291 else {
292 s->identifier = strdup(p);
293 if (!s->identifier)
294 return log_oom();
295 }
296
297 s->state = STDOUT_STREAM_UNIT_ID;
298 return 0;
299
300 case STDOUT_STREAM_UNIT_ID:
301 if (s->ucred.uid == 0) {
302 if (isempty(p))
303 s->unit_id = NULL;
304 else {
305 s->unit_id = strdup(p);
306 if (!s->unit_id)
307 return log_oom();
308 }
309 }
310
311 s->state = STDOUT_STREAM_PRIORITY;
312 return 0;
313
314 case STDOUT_STREAM_PRIORITY:
315 r = safe_atoi(p, &s->priority);
316 if (r < 0 || s->priority < 0 || s->priority > 999) {
317 log_warning("Failed to parse log priority line.");
318 return -EINVAL;
319 }
320
321 s->state = STDOUT_STREAM_LEVEL_PREFIX;
322 return 0;
323
324 case STDOUT_STREAM_LEVEL_PREFIX:
325 r = parse_boolean(p);
326 if (r < 0) {
327 log_warning("Failed to parse level prefix line.");
328 return -EINVAL;
329 }
330
331 s->level_prefix = !!r;
332 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
333 return 0;
334
335 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
336 r = parse_boolean(p);
337 if (r < 0) {
338 log_warning("Failed to parse forward to syslog line.");
339 return -EINVAL;
340 }
341
342 s->forward_to_syslog = !!r;
343 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
344 return 0;
345
346 case STDOUT_STREAM_FORWARD_TO_KMSG:
347 r = parse_boolean(p);
348 if (r < 0) {
349 log_warning("Failed to parse copy to kmsg line.");
350 return -EINVAL;
351 }
352
353 s->forward_to_kmsg = !!r;
354 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
355 return 0;
356
357 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
358 r = parse_boolean(p);
359 if (r < 0) {
360 log_warning("Failed to parse copy to console line.");
361 return -EINVAL;
362 }
363
364 s->forward_to_console = !!r;
365 s->state = STDOUT_STREAM_RUNNING;
366
367 /* Try to save the stream, so that journald can be restarted and we can recover */
368 (void) stdout_stream_save(s);
369 return 0;
370
371 case STDOUT_STREAM_RUNNING:
372 return stdout_stream_log(s, p);
373 }
374
375 assert_not_reached("Unknown stream state");
376 }
377
378 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
379 char *p;
380 size_t remaining;
381 int r;
382
383 assert(s);
384
385 p = s->buffer;
386 remaining = s->length;
387 for (;;) {
388 char *end;
389 size_t skip;
390
391 end = memchr(p, '\n', remaining);
392 if (end)
393 skip = end - p + 1;
394 else if (remaining >= sizeof(s->buffer) - 1) {
395 end = p + sizeof(s->buffer) - 1;
396 skip = remaining;
397 } else
398 break;
399
400 *end = 0;
401
402 r = stdout_stream_line(s, p);
403 if (r < 0)
404 return r;
405
406 remaining -= skip;
407 p += skip;
408 }
409
410 if (force_flush && remaining > 0) {
411 p[remaining] = 0;
412 r = stdout_stream_line(s, p);
413 if (r < 0)
414 return r;
415
416 p += remaining;
417 remaining = 0;
418 }
419
420 if (p > s->buffer) {
421 memmove(s->buffer, p, remaining);
422 s->length = remaining;
423 }
424
425 return 0;
426 }
427
428 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
429 StdoutStream *s = userdata;
430 ssize_t l;
431 int r;
432
433 assert(s);
434
435 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
436 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
437 goto terminate;
438 }
439
440 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
441 if (l < 0) {
442
443 if (errno == EAGAIN)
444 return 0;
445
446 log_warning_errno(errno, "Failed to read from stream: %m");
447 goto terminate;
448 }
449
450 if (l == 0) {
451 stdout_stream_scan(s, true);
452 goto terminate;
453 }
454
455 s->length += l;
456 r = stdout_stream_scan(s, false);
457 if (r < 0)
458 goto terminate;
459
460 return 1;
461
462 terminate:
463 stdout_stream_destroy(s);
464 return 0;
465 }
466
467 static int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
468 _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
469 int r;
470
471 assert(s);
472 assert(fd >= 0);
473
474 stream = new0(StdoutStream, 1);
475 if (!stream)
476 return log_oom();
477
478 stream->fd = -1;
479 stream->priority = LOG_INFO;
480
481 r = getpeercred(fd, &stream->ucred);
482 if (r < 0)
483 return log_error_errno(r, "Failed to determine peer credentials: %m");
484
485 if (mac_selinux_use()) {
486 r = getpeersec(fd, &stream->label);
487 if (r < 0 && r != -EOPNOTSUPP)
488 (void) log_warning_errno(r, "Failed to determine peer security context: %m");
489 }
490
491 (void) shutdown(fd, SHUT_WR);
492
493 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
494 if (r < 0)
495 return log_error_errno(r, "Failed to add stream to event loop: %m");
496
497 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
498 if (r < 0)
499 return log_error_errno(r, "Failed to adjust stdout event source priority: %m");
500
501 stream->fd = fd;
502
503 stream->server = s;
504 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
505 s->n_stdout_streams ++;
506
507 if (ret)
508 *ret = stream;
509
510 stream = NULL;
511
512 return 0;
513 }
514
515 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
516 _cleanup_close_ int fd = -1;
517 Server *s = userdata;
518 int r;
519
520 assert(s);
521
522 if (revents != EPOLLIN) {
523 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
524 return -EIO;
525 }
526
527 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
528 if (fd < 0) {
529 if (errno == EAGAIN)
530 return 0;
531
532 log_error_errno(errno, "Failed to accept stdout connection: %m");
533 return -errno;
534 }
535
536 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
537 log_warning("Too many stdout streams, refusing connection.");
538 return 0;
539 }
540
541 r = stdout_stream_install(s, fd, NULL);
542 if (r < 0)
543 return r;
544
545 fd = -1;
546 return 0;
547 }
548
549 static int stdout_stream_load(StdoutStream *stream, const char *fname) {
550 _cleanup_free_ char
551 *priority = NULL,
552 *level_prefix = NULL,
553 *forward_to_syslog = NULL,
554 *forward_to_kmsg = NULL,
555 *forward_to_console = NULL;
556 int r;
557
558 assert(stream);
559 assert(fname);
560
561 if (!stream->state_file) {
562 stream->state_file = strappend("/run/systemd/journal/streams/", fname);
563 if (!stream->state_file)
564 return log_oom();
565 }
566
567 r = parse_env_file(stream->state_file, NEWLINE,
568 "PRIORITY", &priority,
569 "LEVEL_PREFIX", &level_prefix,
570 "FORWARD_TO_SYSLOG", &forward_to_syslog,
571 "FORWARD_TO_KMSG", &forward_to_kmsg,
572 "FORWARD_TO_CONSOLE", &forward_to_console,
573 "IDENTIFIER", &stream->identifier,
574 "UNIT", &stream->unit_id,
575 NULL);
576 if (r < 0)
577 return log_error_errno(r, "Failed to read: %s", stream->state_file);
578
579 if (priority) {
580 int p;
581
582 p = log_level_from_string(priority);
583 if (p >= 0)
584 stream->priority = p;
585 }
586
587 if (level_prefix) {
588 r = parse_boolean(level_prefix);
589 if (r >= 0)
590 stream->level_prefix = r;
591 }
592
593 if (forward_to_syslog) {
594 r = parse_boolean(forward_to_syslog);
595 if (r >= 0)
596 stream->forward_to_syslog = r;
597 }
598
599 if (forward_to_kmsg) {
600 r = parse_boolean(forward_to_kmsg);
601 if (r >= 0)
602 stream->forward_to_kmsg = r;
603 }
604
605 if (forward_to_console) {
606 r = parse_boolean(forward_to_console);
607 if (r >= 0)
608 stream->forward_to_console = r;
609 }
610
611 return 0;
612 }
613
614 static int stdout_stream_restore(Server *s, const char *fname, int fd) {
615 StdoutStream *stream;
616 int r;
617
618 assert(s);
619 assert(fname);
620 assert(fd >= 0);
621
622 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
623 log_warning("Too many stdout streams, refusing restoring of stream.");
624 return -ENOBUFS;
625 }
626
627 r = stdout_stream_install(s, fd, &stream);
628 if (r < 0)
629 return r;
630
631 stream->state = STDOUT_STREAM_RUNNING;
632 stream->fdstore = true;
633
634 /* Ignore all parsing errors */
635 (void) stdout_stream_load(stream, fname);
636
637 return 0;
638 }
639
640 int server_restore_streams(Server *s, FDSet *fds) {
641 _cleanup_closedir_ DIR *d = NULL;
642 struct dirent *de;
643 int r;
644
645 d = opendir("/run/systemd/journal/streams");
646 if (!d) {
647 if (errno == ENOENT)
648 return 0;
649
650 return log_warning_errno(errno, "Failed to enumerate /run/systemd/journal/streams: %m");
651 }
652
653 FOREACH_DIRENT(de, d, goto fail) {
654 unsigned long st_dev, st_ino;
655 bool found = false;
656 Iterator i;
657 int fd;
658
659 if (sscanf(de->d_name, "%lu:%lu", &st_dev, &st_ino) != 2)
660 continue;
661
662 FDSET_FOREACH(fd, fds, i) {
663 struct stat st;
664
665 if (fstat(fd, &st) < 0)
666 return log_error_errno(errno, "Failed to stat %s: %m", de->d_name);
667
668 if (S_ISSOCK(st.st_mode) && st.st_dev == st_dev && st.st_ino == st_ino) {
669 found = true;
670 break;
671 }
672 }
673
674 if (!found) {
675 /* No file descriptor? Then let's delete the state file */
676 log_debug("Cannot restore stream file %s", de->d_name);
677 unlinkat(dirfd(d), de->d_name, 0);
678 continue;
679 }
680
681 fdset_remove(fds, fd);
682
683 r = stdout_stream_restore(s, de->d_name, fd);
684 if (r < 0)
685 safe_close(fd);
686 }
687
688 return 0;
689
690 fail:
691 return log_error_errno(errno, "Failed to read streams directory: %m");
692 }
693
694 int server_open_stdout_socket(Server *s) {
695 int r;
696
697 assert(s);
698
699 if (s->stdout_fd < 0) {
700 union sockaddr_union sa = {
701 .un.sun_family = AF_UNIX,
702 .un.sun_path = "/run/systemd/journal/stdout",
703 };
704
705 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
706 if (s->stdout_fd < 0)
707 return log_error_errno(errno, "socket() failed: %m");
708
709 unlink(sa.un.sun_path);
710
711 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
712 if (r < 0)
713 return log_error_errno(errno, "bind(%s) failed: %m", sa.un.sun_path);
714
715 (void) chmod(sa.un.sun_path, 0666);
716
717 if (listen(s->stdout_fd, SOMAXCONN) < 0)
718 return log_error_errno(errno, "listen(%s) failed: %m", sa.un.sun_path);
719 } else
720 fd_nonblock(s->stdout_fd, 1);
721
722 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
723 if (r < 0)
724 return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
725
726 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
727 if (r < 0)
728 return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
729
730 return 0;
731 }