]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journald-stream.c
util-lib: split out all temporary file related calls into tmpfiles-util.c
[thirdparty/systemd.git] / src / journal / journald-stream.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2
3 #include <stddef.h>
4 #include <unistd.h>
5
6 #if HAVE_SELINUX
7 #include <selinux/selinux.h>
8 #endif
9
10 #include "sd-daemon.h"
11 #include "sd-event.h"
12
13 #include "alloc-util.h"
14 #include "dirent-util.h"
15 #include "escape.h"
16 #include "fd-util.h"
17 #include "fileio.h"
18 #include "io-util.h"
19 #include "journald-console.h"
20 #include "journald-context.h"
21 #include "journald-kmsg.h"
22 #include "journald-server.h"
23 #include "journald-stream.h"
24 #include "journald-syslog.h"
25 #include "journald-wall.h"
26 #include "mkdir.h"
27 #include "parse-util.h"
28 #include "process-util.h"
29 #include "selinux-util.h"
30 #include "socket-util.h"
31 #include "stdio-util.h"
32 #include "string-util.h"
33 #include "syslog-util.h"
34 #include "tmpfile-util.h"
35 #include "unit-name.h"
36
37 #define STDOUT_STREAMS_MAX 4096
38
39 typedef enum StdoutStreamState {
40 STDOUT_STREAM_IDENTIFIER,
41 STDOUT_STREAM_UNIT_ID,
42 STDOUT_STREAM_PRIORITY,
43 STDOUT_STREAM_LEVEL_PREFIX,
44 STDOUT_STREAM_FORWARD_TO_SYSLOG,
45 STDOUT_STREAM_FORWARD_TO_KMSG,
46 STDOUT_STREAM_FORWARD_TO_CONSOLE,
47 STDOUT_STREAM_RUNNING
48 } StdoutStreamState;
49
50 /* The different types of log record terminators: a real \n was read, a NUL character was read, the maximum line length
51 * was reached, or the end of the stream was reached */
52
53 typedef enum LineBreak {
54 LINE_BREAK_NEWLINE,
55 LINE_BREAK_NUL,
56 LINE_BREAK_LINE_MAX,
57 LINE_BREAK_EOF,
58 } LineBreak;
59
60 struct StdoutStream {
61 Server *server;
62 StdoutStreamState state;
63
64 int fd;
65
66 struct ucred ucred;
67 char *label;
68 char *identifier;
69 char *unit_id;
70 int priority;
71 bool level_prefix:1;
72 bool forward_to_syslog:1;
73 bool forward_to_kmsg:1;
74 bool forward_to_console:1;
75
76 bool fdstore:1;
77 bool in_notify_queue:1;
78
79 char *buffer;
80 size_t length;
81 size_t allocated;
82
83 sd_event_source *event_source;
84
85 char *state_file;
86
87 ClientContext *context;
88
89 LIST_FIELDS(StdoutStream, stdout_stream);
90 LIST_FIELDS(StdoutStream, stdout_stream_notify_queue);
91
92 char id_field[STRLEN("_STREAM_ID=") + SD_ID128_STRING_MAX];
93 };
94
95 void stdout_stream_free(StdoutStream *s) {
96 if (!s)
97 return;
98
99 if (s->server) {
100
101 if (s->context)
102 client_context_release(s->server, s->context);
103
104 assert(s->server->n_stdout_streams > 0);
105 s->server->n_stdout_streams--;
106 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
107
108 if (s->in_notify_queue)
109 LIST_REMOVE(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s);
110 }
111
112 if (s->event_source) {
113 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
114 s->event_source = sd_event_source_unref(s->event_source);
115 }
116
117 safe_close(s->fd);
118 free(s->label);
119 free(s->identifier);
120 free(s->unit_id);
121 free(s->state_file);
122 free(s->buffer);
123
124 free(s);
125 }
126
127 DEFINE_TRIVIAL_CLEANUP_FUNC(StdoutStream*, stdout_stream_free);
128
129 void stdout_stream_destroy(StdoutStream *s) {
130 if (!s)
131 return;
132
133 if (s->state_file)
134 (void) unlink(s->state_file);
135
136 stdout_stream_free(s);
137 }
138
139 static int stdout_stream_save(StdoutStream *s) {
140 _cleanup_free_ char *temp_path = NULL;
141 _cleanup_fclose_ FILE *f = NULL;
142 int r;
143
144 assert(s);
145
146 if (s->state != STDOUT_STREAM_RUNNING)
147 return 0;
148
149 if (!s->state_file) {
150 struct stat st;
151
152 r = fstat(s->fd, &st);
153 if (r < 0)
154 return log_warning_errno(errno, "Failed to stat connected stream: %m");
155
156 /* We use device and inode numbers as identifier for the stream */
157 if (asprintf(&s->state_file, "/run/systemd/journal/streams/%lu:%lu", (unsigned long) st.st_dev, (unsigned long) st.st_ino) < 0)
158 return log_oom();
159 }
160
161 mkdir_p("/run/systemd/journal/streams", 0755);
162
163 r = fopen_temporary(s->state_file, &f, &temp_path);
164 if (r < 0)
165 goto fail;
166
167 fprintf(f,
168 "# This is private data. Do not parse\n"
169 "PRIORITY=%i\n"
170 "LEVEL_PREFIX=%i\n"
171 "FORWARD_TO_SYSLOG=%i\n"
172 "FORWARD_TO_KMSG=%i\n"
173 "FORWARD_TO_CONSOLE=%i\n"
174 "STREAM_ID=%s\n",
175 s->priority,
176 s->level_prefix,
177 s->forward_to_syslog,
178 s->forward_to_kmsg,
179 s->forward_to_console,
180 s->id_field + STRLEN("_STREAM_ID="));
181
182 if (!isempty(s->identifier)) {
183 _cleanup_free_ char *escaped;
184
185 escaped = cescape(s->identifier);
186 if (!escaped) {
187 r = -ENOMEM;
188 goto fail;
189 }
190
191 fprintf(f, "IDENTIFIER=%s\n", escaped);
192 }
193
194 if (!isempty(s->unit_id)) {
195 _cleanup_free_ char *escaped;
196
197 escaped = cescape(s->unit_id);
198 if (!escaped) {
199 r = -ENOMEM;
200 goto fail;
201 }
202
203 fprintf(f, "UNIT=%s\n", escaped);
204 }
205
206 r = fflush_and_check(f);
207 if (r < 0)
208 goto fail;
209
210 if (rename(temp_path, s->state_file) < 0) {
211 r = -errno;
212 goto fail;
213 }
214
215 if (!s->fdstore && !s->in_notify_queue) {
216 LIST_PREPEND(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s);
217 s->in_notify_queue = true;
218
219 if (s->server->notify_event_source) {
220 r = sd_event_source_set_enabled(s->server->notify_event_source, SD_EVENT_ON);
221 if (r < 0)
222 log_warning_errno(r, "Failed to enable notify event source: %m");
223 }
224 }
225
226 return 0;
227
228 fail:
229 (void) unlink(s->state_file);
230
231 if (temp_path)
232 (void) unlink(temp_path);
233
234 return log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
235 }
236
237 static int stdout_stream_log(StdoutStream *s, const char *p, LineBreak line_break) {
238 struct iovec *iovec;
239 int priority;
240 char syslog_priority[] = "PRIORITY=\0";
241 char syslog_facility[STRLEN("SYSLOG_FACILITY=") + DECIMAL_STR_MAX(int) + 1];
242 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
243 size_t n = 0, m;
244 int r;
245
246 assert(s);
247 assert(p);
248
249 if (s->context)
250 (void) client_context_maybe_refresh(s->server, s->context, NULL, NULL, 0, NULL, USEC_INFINITY);
251 else if (pid_is_valid(s->ucred.pid)) {
252 r = client_context_acquire(s->server, s->ucred.pid, &s->ucred, s->label, strlen_ptr(s->label), s->unit_id, &s->context);
253 if (r < 0)
254 log_warning_errno(r, "Failed to acquire client context, ignoring: %m");
255 }
256
257 priority = s->priority;
258
259 if (s->level_prefix)
260 syslog_parse_priority(&p, &priority, false);
261
262 if (!client_context_test_priority(s->context, priority))
263 return 0;
264
265 if (isempty(p))
266 return 0;
267
268 if (s->forward_to_syslog || s->server->forward_to_syslog)
269 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
270
271 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
272 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
273
274 if (s->forward_to_console || s->server->forward_to_console)
275 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
276
277 if (s->server->forward_to_wall)
278 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
279
280 m = N_IOVEC_META_FIELDS + 7 + client_context_extra_fields_n_iovec(s->context);
281 iovec = newa(struct iovec, m);
282
283 iovec[n++] = IOVEC_MAKE_STRING("_TRANSPORT=stdout");
284 iovec[n++] = IOVEC_MAKE_STRING(s->id_field);
285
286 syslog_priority[STRLEN("PRIORITY=")] = '0' + LOG_PRI(priority);
287 iovec[n++] = IOVEC_MAKE_STRING(syslog_priority);
288
289 if (priority & LOG_FACMASK) {
290 xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
291 iovec[n++] = IOVEC_MAKE_STRING(syslog_facility);
292 }
293
294 if (s->identifier) {
295 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
296 if (syslog_identifier)
297 iovec[n++] = IOVEC_MAKE_STRING(syslog_identifier);
298 }
299
300 if (line_break != LINE_BREAK_NEWLINE) {
301 const char *c;
302
303 /* If this log message was generated due to an uncommon line break then mention this in the log
304 * entry */
305
306 c = line_break == LINE_BREAK_NUL ? "_LINE_BREAK=nul" :
307 line_break == LINE_BREAK_LINE_MAX ? "_LINE_BREAK=line-max" :
308 "_LINE_BREAK=eof";
309 iovec[n++] = IOVEC_MAKE_STRING(c);
310 }
311
312 message = strappend("MESSAGE=", p);
313 if (message)
314 iovec[n++] = IOVEC_MAKE_STRING(message);
315
316 server_dispatch_message(s->server, iovec, n, m, s->context, NULL, priority, 0);
317 return 0;
318 }
319
320 static int stdout_stream_line(StdoutStream *s, char *p, LineBreak line_break) {
321 int r;
322 char *orig;
323
324 assert(s);
325 assert(p);
326
327 orig = p;
328 p = strstrip(p);
329
330 /* line breaks by NUL, line max length or EOF are not permissible during the negotiation part of the protocol */
331 if (line_break != LINE_BREAK_NEWLINE && s->state != STDOUT_STREAM_RUNNING) {
332 log_warning("Control protocol line not properly terminated.");
333 return -EINVAL;
334 }
335
336 switch (s->state) {
337
338 case STDOUT_STREAM_IDENTIFIER:
339 if (!isempty(p)) {
340 s->identifier = strdup(p);
341 if (!s->identifier)
342 return log_oom();
343 }
344
345 s->state = STDOUT_STREAM_UNIT_ID;
346 return 0;
347
348 case STDOUT_STREAM_UNIT_ID:
349 if (s->ucred.uid == 0 &&
350 unit_name_is_valid(p, UNIT_NAME_PLAIN|UNIT_NAME_INSTANCE)) {
351
352 s->unit_id = strdup(p);
353 if (!s->unit_id)
354 return log_oom();
355 }
356
357 s->state = STDOUT_STREAM_PRIORITY;
358 return 0;
359
360 case STDOUT_STREAM_PRIORITY:
361 r = safe_atoi(p, &s->priority);
362 if (r < 0 || s->priority < 0 || s->priority > 999) {
363 log_warning("Failed to parse log priority line.");
364 return -EINVAL;
365 }
366
367 s->state = STDOUT_STREAM_LEVEL_PREFIX;
368 return 0;
369
370 case STDOUT_STREAM_LEVEL_PREFIX:
371 r = parse_boolean(p);
372 if (r < 0) {
373 log_warning("Failed to parse level prefix line.");
374 return -EINVAL;
375 }
376
377 s->level_prefix = r;
378 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
379 return 0;
380
381 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
382 r = parse_boolean(p);
383 if (r < 0) {
384 log_warning("Failed to parse forward to syslog line.");
385 return -EINVAL;
386 }
387
388 s->forward_to_syslog = r;
389 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
390 return 0;
391
392 case STDOUT_STREAM_FORWARD_TO_KMSG:
393 r = parse_boolean(p);
394 if (r < 0) {
395 log_warning("Failed to parse copy to kmsg line.");
396 return -EINVAL;
397 }
398
399 s->forward_to_kmsg = r;
400 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
401 return 0;
402
403 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
404 r = parse_boolean(p);
405 if (r < 0) {
406 log_warning("Failed to parse copy to console line.");
407 return -EINVAL;
408 }
409
410 s->forward_to_console = r;
411 s->state = STDOUT_STREAM_RUNNING;
412
413 /* Try to save the stream, so that journald can be restarted and we can recover */
414 (void) stdout_stream_save(s);
415 return 0;
416
417 case STDOUT_STREAM_RUNNING:
418 return stdout_stream_log(s, orig, line_break);
419 }
420
421 assert_not_reached("Unknown stream state");
422 }
423
424 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
425 char *p;
426 size_t remaining;
427 int r;
428
429 assert(s);
430
431 p = s->buffer;
432 remaining = s->length;
433
434 /* XXX: This function does nothing if (s->length == 0) */
435
436 for (;;) {
437 LineBreak line_break;
438 size_t skip;
439 char *end1, *end2;
440
441 end1 = memchr(p, '\n', remaining);
442 end2 = memchr(p, 0, end1 ? (size_t) (end1 - p) : remaining);
443
444 if (end2) {
445 /* We found a NUL terminator */
446 skip = end2 - p + 1;
447 line_break = LINE_BREAK_NUL;
448 } else if (end1) {
449 /* We found a \n terminator */
450 *end1 = 0;
451 skip = end1 - p + 1;
452 line_break = LINE_BREAK_NEWLINE;
453 } else if (remaining >= s->server->line_max) {
454 /* Force a line break after the maximum line length */
455 *(p + s->server->line_max) = 0;
456 skip = remaining;
457 line_break = LINE_BREAK_LINE_MAX;
458 } else
459 break;
460
461 r = stdout_stream_line(s, p, line_break);
462 if (r < 0)
463 return r;
464
465 remaining -= skip;
466 p += skip;
467 }
468
469 if (force_flush && remaining > 0) {
470 p[remaining] = 0;
471 r = stdout_stream_line(s, p, LINE_BREAK_EOF);
472 if (r < 0)
473 return r;
474
475 p += remaining;
476 remaining = 0;
477 }
478
479 if (p > s->buffer) {
480 memmove(s->buffer, p, remaining);
481 s->length = remaining;
482 }
483
484 return 0;
485 }
486
487 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
488 StdoutStream *s = userdata;
489 size_t limit;
490 ssize_t l;
491 int r;
492
493 assert(s);
494
495 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
496 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
497 goto terminate;
498 }
499
500 /* If the buffer is full already (discounting the extra NUL we need), add room for another 1K */
501 if (s->length + 1 >= s->allocated) {
502 if (!GREEDY_REALLOC(s->buffer, s->allocated, s->length + 1 + 1024)) {
503 log_oom();
504 goto terminate;
505 }
506 }
507
508 /* Try to make use of the allocated buffer in full, but never read more than the configured line size. Also,
509 * always leave room for a terminating NUL we might need to add. */
510 limit = MIN(s->allocated - 1, s->server->line_max);
511
512 l = read(s->fd, s->buffer + s->length, limit - s->length);
513 if (l < 0) {
514 if (errno == EAGAIN)
515 return 0;
516
517 log_warning_errno(errno, "Failed to read from stream: %m");
518 goto terminate;
519 }
520
521 if (l == 0) {
522 stdout_stream_scan(s, true);
523 goto terminate;
524 }
525
526 s->length += l;
527 r = stdout_stream_scan(s, false);
528 if (r < 0)
529 goto terminate;
530
531 return 1;
532
533 terminate:
534 stdout_stream_destroy(s);
535 return 0;
536 }
537
538 int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
539 _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
540 sd_id128_t id;
541 int r;
542
543 assert(s);
544 assert(fd >= 0);
545
546 r = sd_id128_randomize(&id);
547 if (r < 0)
548 return log_error_errno(r, "Failed to generate stream ID: %m");
549
550 stream = new0(StdoutStream, 1);
551 if (!stream)
552 return log_oom();
553
554 stream->fd = -1;
555 stream->priority = LOG_INFO;
556
557 xsprintf(stream->id_field, "_STREAM_ID=" SD_ID128_FORMAT_STR, SD_ID128_FORMAT_VAL(id));
558
559 r = getpeercred(fd, &stream->ucred);
560 if (r < 0)
561 return log_error_errno(r, "Failed to determine peer credentials: %m");
562
563 if (mac_selinux_use()) {
564 r = getpeersec(fd, &stream->label);
565 if (r < 0 && r != -EOPNOTSUPP)
566 (void) log_warning_errno(r, "Failed to determine peer security context: %m");
567 }
568
569 (void) shutdown(fd, SHUT_WR);
570
571 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
572 if (r < 0)
573 return log_error_errno(r, "Failed to add stream to event loop: %m");
574
575 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
576 if (r < 0)
577 return log_error_errno(r, "Failed to adjust stdout event source priority: %m");
578
579 stream->fd = fd;
580
581 stream->server = s;
582 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
583 s->n_stdout_streams++;
584
585 if (ret)
586 *ret = stream;
587
588 stream = NULL;
589
590 return 0;
591 }
592
593 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
594 _cleanup_close_ int fd = -1;
595 Server *s = userdata;
596 int r;
597
598 assert(s);
599
600 if (revents != EPOLLIN)
601 return log_error_errno(SYNTHETIC_ERRNO(EIO),
602 "Got invalid event from epoll for stdout server fd: %" PRIx32,
603 revents);
604
605 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
606 if (fd < 0) {
607 if (errno == EAGAIN)
608 return 0;
609
610 return log_error_errno(errno, "Failed to accept stdout connection: %m");
611 }
612
613 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
614 struct ucred u;
615
616 r = getpeercred(fd, &u);
617
618 /* By closing fd here we make sure that the client won't wait too long for journald to
619 * gather all the data it adds to the error message to find out that the connection has
620 * just been refused.
621 */
622 fd = safe_close(fd);
623
624 server_driver_message(s, r < 0 ? 0 : u.pid, NULL, LOG_MESSAGE("Too many stdout streams, refusing connection."), NULL);
625 return 0;
626 }
627
628 r = stdout_stream_install(s, fd, NULL);
629 if (r < 0)
630 return r;
631
632 fd = -1;
633 return 0;
634 }
635
636 static int stdout_stream_load(StdoutStream *stream, const char *fname) {
637 _cleanup_free_ char
638 *priority = NULL,
639 *level_prefix = NULL,
640 *forward_to_syslog = NULL,
641 *forward_to_kmsg = NULL,
642 *forward_to_console = NULL,
643 *stream_id = NULL;
644 int r;
645
646 assert(stream);
647 assert(fname);
648
649 if (!stream->state_file) {
650 stream->state_file = strappend("/run/systemd/journal/streams/", fname);
651 if (!stream->state_file)
652 return log_oom();
653 }
654
655 r = parse_env_file(NULL, stream->state_file,
656 "PRIORITY", &priority,
657 "LEVEL_PREFIX", &level_prefix,
658 "FORWARD_TO_SYSLOG", &forward_to_syslog,
659 "FORWARD_TO_KMSG", &forward_to_kmsg,
660 "FORWARD_TO_CONSOLE", &forward_to_console,
661 "IDENTIFIER", &stream->identifier,
662 "UNIT", &stream->unit_id,
663 "STREAM_ID", &stream_id);
664 if (r < 0)
665 return log_error_errno(r, "Failed to read: %s", stream->state_file);
666
667 if (priority) {
668 int p;
669
670 p = log_level_from_string(priority);
671 if (p >= 0)
672 stream->priority = p;
673 }
674
675 if (level_prefix) {
676 r = parse_boolean(level_prefix);
677 if (r >= 0)
678 stream->level_prefix = r;
679 }
680
681 if (forward_to_syslog) {
682 r = parse_boolean(forward_to_syslog);
683 if (r >= 0)
684 stream->forward_to_syslog = r;
685 }
686
687 if (forward_to_kmsg) {
688 r = parse_boolean(forward_to_kmsg);
689 if (r >= 0)
690 stream->forward_to_kmsg = r;
691 }
692
693 if (forward_to_console) {
694 r = parse_boolean(forward_to_console);
695 if (r >= 0)
696 stream->forward_to_console = r;
697 }
698
699 if (stream_id) {
700 sd_id128_t id;
701
702 r = sd_id128_from_string(stream_id, &id);
703 if (r >= 0)
704 xsprintf(stream->id_field, "_STREAM_ID=" SD_ID128_FORMAT_STR, SD_ID128_FORMAT_VAL(id));
705 }
706
707 return 0;
708 }
709
710 static int stdout_stream_restore(Server *s, const char *fname, int fd) {
711 StdoutStream *stream;
712 int r;
713
714 assert(s);
715 assert(fname);
716 assert(fd >= 0);
717
718 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
719 log_warning("Too many stdout streams, refusing restoring of stream.");
720 return -ENOBUFS;
721 }
722
723 r = stdout_stream_install(s, fd, &stream);
724 if (r < 0)
725 return r;
726
727 stream->state = STDOUT_STREAM_RUNNING;
728 stream->fdstore = true;
729
730 /* Ignore all parsing errors */
731 (void) stdout_stream_load(stream, fname);
732
733 return 0;
734 }
735
736 int server_restore_streams(Server *s, FDSet *fds) {
737 _cleanup_closedir_ DIR *d = NULL;
738 struct dirent *de;
739 int r;
740
741 d = opendir("/run/systemd/journal/streams");
742 if (!d) {
743 if (errno == ENOENT)
744 return 0;
745
746 return log_warning_errno(errno, "Failed to enumerate /run/systemd/journal/streams: %m");
747 }
748
749 FOREACH_DIRENT(de, d, goto fail) {
750 unsigned long st_dev, st_ino;
751 bool found = false;
752 Iterator i;
753 int fd;
754
755 if (sscanf(de->d_name, "%lu:%lu", &st_dev, &st_ino) != 2)
756 continue;
757
758 FDSET_FOREACH(fd, fds, i) {
759 struct stat st;
760
761 if (fstat(fd, &st) < 0)
762 return log_error_errno(errno, "Failed to stat %s: %m", de->d_name);
763
764 if (S_ISSOCK(st.st_mode) && st.st_dev == st_dev && st.st_ino == st_ino) {
765 found = true;
766 break;
767 }
768 }
769
770 if (!found) {
771 /* No file descriptor? Then let's delete the state file */
772 log_debug("Cannot restore stream file %s", de->d_name);
773 if (unlinkat(dirfd(d), de->d_name, 0) < 0)
774 log_warning_errno(errno, "Failed to remove /run/systemd/journal/streams/%s: %m",
775 de->d_name);
776 continue;
777 }
778
779 fdset_remove(fds, fd);
780
781 r = stdout_stream_restore(s, de->d_name, fd);
782 if (r < 0)
783 safe_close(fd);
784 }
785
786 return 0;
787
788 fail:
789 return log_error_errno(errno, "Failed to read streams directory: %m");
790 }
791
792 int server_open_stdout_socket(Server *s) {
793 static const union sockaddr_union sa = {
794 .un.sun_family = AF_UNIX,
795 .un.sun_path = "/run/systemd/journal/stdout",
796 };
797 int r;
798
799 assert(s);
800
801 if (s->stdout_fd < 0) {
802 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
803 if (s->stdout_fd < 0)
804 return log_error_errno(errno, "socket() failed: %m");
805
806 (void) sockaddr_un_unlink(&sa.un);
807
808 r = bind(s->stdout_fd, &sa.sa, SOCKADDR_UN_LEN(sa.un));
809 if (r < 0)
810 return log_error_errno(errno, "bind(%s) failed: %m", sa.un.sun_path);
811
812 (void) chmod(sa.un.sun_path, 0666);
813
814 if (listen(s->stdout_fd, SOMAXCONN) < 0)
815 return log_error_errno(errno, "listen(%s) failed: %m", sa.un.sun_path);
816 } else
817 (void) fd_nonblock(s->stdout_fd, true);
818
819 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
820 if (r < 0)
821 return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
822
823 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+5);
824 if (r < 0)
825 return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
826
827 return 0;
828 }
829
830 void stdout_stream_send_notify(StdoutStream *s) {
831 struct iovec iovec = {
832 .iov_base = (char*) "FDSTORE=1",
833 .iov_len = STRLEN("FDSTORE=1"),
834 };
835 struct msghdr msghdr = {
836 .msg_iov = &iovec,
837 .msg_iovlen = 1,
838 };
839 struct cmsghdr *cmsg;
840 ssize_t l;
841
842 assert(s);
843 assert(!s->fdstore);
844 assert(s->in_notify_queue);
845 assert(s->server);
846 assert(s->server->notify_fd >= 0);
847
848 /* Store the connection fd in PID 1, so that we get it passed
849 * in again on next start */
850
851 msghdr.msg_controllen = CMSG_SPACE(sizeof(int));
852 msghdr.msg_control = alloca0(msghdr.msg_controllen);
853
854 cmsg = CMSG_FIRSTHDR(&msghdr);
855 cmsg->cmsg_level = SOL_SOCKET;
856 cmsg->cmsg_type = SCM_RIGHTS;
857 cmsg->cmsg_len = CMSG_LEN(sizeof(int));
858
859 memcpy(CMSG_DATA(cmsg), &s->fd, sizeof(int));
860
861 l = sendmsg(s->server->notify_fd, &msghdr, MSG_DONTWAIT|MSG_NOSIGNAL);
862 if (l < 0) {
863 if (errno == EAGAIN)
864 return;
865
866 log_error_errno(errno, "Failed to send stream file descriptor to service manager: %m");
867 } else {
868 log_debug("Successfully sent stream file descriptor to service manager.");
869 s->fdstore = 1;
870 }
871
872 LIST_REMOVE(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s);
873 s->in_notify_queue = false;
874
875 }