]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal/journald-stream.c
alloc-util: simplify GREEDY_REALLOC() logic by relying on malloc_usable_size()
[thirdparty/systemd.git] / src / journal / journald-stream.c
CommitLineData
db9ecf05 1/* SPDX-License-Identifier: LGPL-2.1-or-later */
a45b9fca 2
4871690d 3#include <stddef.h>
07630cea 4#include <unistd.h>
a45b9fca 5
349cc4a5 6#if HAVE_SELINUX
a45b9fca
LP
7#include <selinux/selinux.h>
8#endif
9
13790add 10#include "sd-daemon.h"
4f5dd394
LP
11#include "sd-event.h"
12
b5efdb8a 13#include "alloc-util.h"
a0956174 14#include "dirent-util.h"
686d13b9 15#include "env-file.h"
4ff9bc2e 16#include "errno-util.h"
4f5dd394 17#include "escape.h"
3ffd4af2 18#include "fd-util.h"
13790add 19#include "fileio.h"
a30e35f8 20#include "fs-util.h"
afc5dbf3 21#include "io-util.h"
4f5dd394 22#include "journald-console.h"
22e3a02b 23#include "journald-context.h"
4f5dd394 24#include "journald-kmsg.h"
d025f1e4 25#include "journald-server.h"
3ffd4af2 26#include "journald-stream.h"
a45b9fca 27#include "journald-syslog.h"
40b71e89 28#include "journald-wall.h"
4f5dd394 29#include "mkdir.h"
6bedfcbb 30#include "parse-util.h"
22e3a02b 31#include "process-util.h"
4f5dd394
LP
32#include "selinux-util.h"
33#include "socket-util.h"
15a5e950 34#include "stdio-util.h"
07630cea 35#include "string-util.h"
7ccbd1ae 36#include "syslog-util.h"
e4de7287 37#include "tmpfile-util.h"
7a1f1aaa 38#include "unit-name.h"
a45b9fca
LP
39
40#define STDOUT_STREAMS_MAX 4096
41
80e97206
YS
42/* During the "setup" protocol phase of the stream logic let's define a different maximum line length than
43 * during the actual operational phase. We want to allow users to specify very short line lengths after all,
44 * but the unit name we embed in the setup protocol might be longer than that. Hence, during the setup phase
45 * let's enforce a line length matching the maximum unit name length (255) */
46#define STDOUT_STREAM_SETUP_PROTOCOL_LINE_MAX (UNIT_NAME_MAX-1U)
47
a45b9fca
LP
48typedef enum StdoutStreamState {
49 STDOUT_STREAM_IDENTIFIER,
50 STDOUT_STREAM_UNIT_ID,
51 STDOUT_STREAM_PRIORITY,
52 STDOUT_STREAM_LEVEL_PREFIX,
53 STDOUT_STREAM_FORWARD_TO_SYSLOG,
54 STDOUT_STREAM_FORWARD_TO_KMSG,
55 STDOUT_STREAM_FORWARD_TO_CONSOLE,
80e97206 56 STDOUT_STREAM_RUNNING,
a45b9fca
LP
57} StdoutStreamState;
58
ec20fe5f
LP
59/* The different types of log record terminators: a real \n was read, a NUL character was read, the maximum line length
60 * was reached, or the end of the stream was reached */
61
62typedef enum LineBreak {
63 LINE_BREAK_NEWLINE,
64 LINE_BREAK_NUL,
65 LINE_BREAK_LINE_MAX,
66 LINE_BREAK_EOF,
45ba1ea5 67 LINE_BREAK_PID_CHANGE,
549b7379 68 _LINE_BREAK_MAX,
2d93c20e 69 _LINE_BREAK_INVALID = -EINVAL,
ec20fe5f
LP
70} LineBreak;
71
a45b9fca
LP
72struct StdoutStream {
73 Server *server;
74 StdoutStreamState state;
75
76 int fd;
77
78 struct ucred ucred;
2de56f70 79 char *label;
a45b9fca
LP
80 char *identifier;
81 char *unit_id;
82 int priority;
83 bool level_prefix:1;
84 bool forward_to_syslog:1;
85 bool forward_to_kmsg:1;
86 bool forward_to_console:1;
87
13790add 88 bool fdstore:1;
e22aa3d3 89 bool in_notify_queue:1;
13790add 90
ec20fe5f 91 char *buffer;
a45b9fca
LP
92 size_t length;
93
f9a810be
LP
94 sd_event_source *event_source;
95
13790add
LP
96 char *state_file;
97
22e3a02b
LP
98 ClientContext *context;
99
a45b9fca 100 LIST_FIELDS(StdoutStream, stdout_stream);
e22aa3d3 101 LIST_FIELDS(StdoutStream, stdout_stream_notify_queue);
ec20fe5f 102
fbd0b64f 103 char id_field[STRLEN("_STREAM_ID=") + SD_ID128_STRING_MAX];
a45b9fca
LP
104};
105
75db809a 106StdoutStream* stdout_stream_free(StdoutStream *s) {
13790add 107 if (!s)
75db809a 108 return NULL;
13790add
LP
109
110 if (s->server) {
22e3a02b
LP
111
112 if (s->context)
113 client_context_release(s->server, s->context);
114
13790add 115 assert(s->server->n_stdout_streams > 0);
313cefa1 116 s->server->n_stdout_streams--;
13790add 117 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
e22aa3d3
LP
118
119 if (s->in_notify_queue)
120 LIST_REMOVE(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s);
65c398c0
LP
121
122 (void) server_start_or_stop_idle_timer(s->server); /* Maybe we are idle now? */
13790add
LP
123 }
124
125 if (s->event_source) {
126 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
127 s->event_source = sd_event_source_unref(s->event_source);
128 }
129
130 safe_close(s->fd);
2de56f70 131 free(s->label);
13790add
LP
132 free(s->identifier);
133 free(s->unit_id);
134 free(s->state_file);
ec20fe5f 135 free(s->buffer);
13790add 136
75db809a 137 return mfree(s);
13790add
LP
138}
139
140DEFINE_TRIVIAL_CLEANUP_FUNC(StdoutStream*, stdout_stream_free);
141
9541f5ff 142void stdout_stream_destroy(StdoutStream *s) {
13790add
LP
143 if (!s)
144 return;
145
146 if (s->state_file)
e22aa3d3 147 (void) unlink(s->state_file);
13790add
LP
148
149 stdout_stream_free(s);
150}
151
152static int stdout_stream_save(StdoutStream *s) {
a30e35f8 153 _cleanup_(unlink_and_freep) char *temp_path = NULL;
13790add
LP
154 _cleanup_fclose_ FILE *f = NULL;
155 int r;
156
157 assert(s);
158
159 if (s->state != STDOUT_STREAM_RUNNING)
160 return 0;
161
162 if (!s->state_file) {
163 struct stat st;
164
165 r = fstat(s->fd, &st);
166 if (r < 0)
167 return log_warning_errno(errno, "Failed to stat connected stream: %m");
168
169 /* We use device and inode numbers as identifier for the stream */
b1852c48
LP
170 r = asprintf(&s->state_file, "%s/streams/%lu:%lu", s->server->runtime_directory, (unsigned long) st.st_dev, (unsigned long) st.st_ino);
171 if (r < 0)
13790add
LP
172 return log_oom();
173 }
174
b1852c48 175 (void) mkdir_parents(s->state_file, 0755);
13790add
LP
176
177 r = fopen_temporary(s->state_file, &f, &temp_path);
178 if (r < 0)
dacd6cee 179 goto fail;
13790add
LP
180
181 fprintf(f,
182 "# This is private data. Do not parse\n"
183 "PRIORITY=%i\n"
184 "LEVEL_PREFIX=%i\n"
185 "FORWARD_TO_SYSLOG=%i\n"
186 "FORWARD_TO_KMSG=%i\n"
ec20fe5f
LP
187 "FORWARD_TO_CONSOLE=%i\n"
188 "STREAM_ID=%s\n",
13790add
LP
189 s->priority,
190 s->level_prefix,
191 s->forward_to_syslog,
192 s->forward_to_kmsg,
ec20fe5f 193 s->forward_to_console,
fbd0b64f 194 s->id_field + STRLEN("_STREAM_ID="));
13790add
LP
195
196 if (!isempty(s->identifier)) {
c2b2df60 197 _cleanup_free_ char *escaped = NULL;
13790add
LP
198
199 escaped = cescape(s->identifier);
200 if (!escaped) {
201 r = -ENOMEM;
dacd6cee 202 goto fail;
13790add
LP
203 }
204
205 fprintf(f, "IDENTIFIER=%s\n", escaped);
206 }
207
208 if (!isempty(s->unit_id)) {
c2b2df60 209 _cleanup_free_ char *escaped = NULL;
13790add
LP
210
211 escaped = cescape(s->unit_id);
212 if (!escaped) {
213 r = -ENOMEM;
dacd6cee 214 goto fail;
13790add
LP
215 }
216
217 fprintf(f, "UNIT=%s\n", escaped);
218 }
219
220 r = fflush_and_check(f);
221 if (r < 0)
dacd6cee 222 goto fail;
13790add
LP
223
224 if (rename(temp_path, s->state_file) < 0) {
225 r = -errno;
dacd6cee 226 goto fail;
13790add
LP
227 }
228
a30e35f8
LP
229 temp_path = mfree(temp_path);
230
e22aa3d3
LP
231 if (!s->fdstore && !s->in_notify_queue) {
232 LIST_PREPEND(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s);
233 s->in_notify_queue = true;
234
235 if (s->server->notify_event_source) {
236 r = sd_event_source_set_enabled(s->server->notify_event_source, SD_EVENT_ON);
237 if (r < 0)
238 log_warning_errno(r, "Failed to enable notify event source: %m");
239 }
13790add
LP
240 }
241
dacd6cee 242 return 0;
13790add 243
dacd6cee
LP
244fail:
245 (void) unlink(s->state_file);
dacd6cee 246 return log_error_errno(r, "Failed to save stream data %s: %m", s->state_file);
13790add
LP
247}
248
549b7379
LP
249static int stdout_stream_log(
250 StdoutStream *s,
251 const char *p,
252 LineBreak line_break) {
253
d3070fbd 254 struct iovec *iovec;
a45b9fca 255 int priority;
e3bfb7be 256 char syslog_priority[] = "PRIORITY=\0";
fbd0b64f 257 char syslog_facility[STRLEN("SYSLOG_FACILITY=") + DECIMAL_STR_MAX(int) + 1];
e3bfb7be 258 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
d3070fbd 259 size_t n = 0, m;
22e3a02b 260 int r;
a45b9fca
LP
261
262 assert(s);
263 assert(p);
264
549b7379
LP
265 assert(line_break >= 0);
266 assert(line_break < _LINE_BREAK_MAX);
267
d3070fbd
LP
268 if (s->context)
269 (void) client_context_maybe_refresh(s->server, s->context, NULL, NULL, 0, NULL, USEC_INFINITY);
270 else if (pid_is_valid(s->ucred.pid)) {
271 r = client_context_acquire(s->server, s->ucred.pid, &s->ucred, s->label, strlen_ptr(s->label), s->unit_id, &s->context);
272 if (r < 0)
273 log_warning_errno(r, "Failed to acquire client context, ignoring: %m");
274 }
275
a45b9fca
LP
276 priority = s->priority;
277
278 if (s->level_prefix)
e3bfb7be 279 syslog_parse_priority(&p, &priority, false);
a45b9fca 280
d3070fbd
LP
281 if (!client_context_test_priority(s->context, priority))
282 return 0;
283
6f526243
EV
284 if (isempty(p))
285 return 0;
286
a45b9fca
LP
287 if (s->forward_to_syslog || s->server->forward_to_syslog)
288 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
289
290 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
291 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
292
293 if (s->forward_to_console || s->server->forward_to_console)
294 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
295
40b71e89
ST
296 if (s->server->forward_to_wall)
297 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
298
d3070fbd
LP
299 m = N_IOVEC_META_FIELDS + 7 + client_context_extra_fields_n_iovec(s->context);
300 iovec = newa(struct iovec, m);
301
e6a7ec4b
LP
302 iovec[n++] = IOVEC_MAKE_STRING("_TRANSPORT=stdout");
303 iovec[n++] = IOVEC_MAKE_STRING(s->id_field);
ec20fe5f 304
fbd0b64f 305 syslog_priority[STRLEN("PRIORITY=")] = '0' + LOG_PRI(priority);
e6a7ec4b 306 iovec[n++] = IOVEC_MAKE_STRING(syslog_priority);
a45b9fca 307
e3bfb7be 308 if (priority & LOG_FACMASK) {
5ffa8c81 309 xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
e6a7ec4b 310 iovec[n++] = IOVEC_MAKE_STRING(syslog_facility);
e3bfb7be 311 }
a45b9fca
LP
312
313 if (s->identifier) {
b910cc72 314 syslog_identifier = strjoin("SYSLOG_IDENTIFIER=", s->identifier);
a45b9fca 315 if (syslog_identifier)
e6a7ec4b 316 iovec[n++] = IOVEC_MAKE_STRING(syslog_identifier);
a45b9fca
LP
317 }
318
549b7379
LP
319 static const char * const line_break_field_table[_LINE_BREAK_MAX] = {
320 [LINE_BREAK_NEWLINE] = NULL, /* Do not add field if traditional newline */
321 [LINE_BREAK_NUL] = "_LINE_BREAK=nul",
322 [LINE_BREAK_LINE_MAX] = "_LINE_BREAK=line-max",
323 [LINE_BREAK_EOF] = "_LINE_BREAK=eof",
45ba1ea5 324 [LINE_BREAK_PID_CHANGE] = "_LINE_BREAK=pid-change",
549b7379 325 };
ec20fe5f 326
549b7379 327 const char *c = line_break_field_table[line_break];
ec20fe5f 328
549b7379
LP
329 /* If this log message was generated due to an uncommon line break then mention this in the log
330 * entry */
331 if (c)
e6a7ec4b 332 iovec[n++] = IOVEC_MAKE_STRING(c);
ec20fe5f 333
b910cc72 334 message = strjoin("MESSAGE=", p);
a45b9fca 335 if (message)
e6a7ec4b 336 iovec[n++] = IOVEC_MAKE_STRING(message);
a45b9fca 337
d3070fbd 338 server_dispatch_message(s->server, iovec, n, m, s->context, NULL, priority, 0);
a45b9fca
LP
339 return 0;
340}
341
d977ef25
LP
342static int syslog_parse_priority_and_facility(const char *s) {
343 int prio, r;
344
345 /* Parses both facility and priority in one value, i.e. is different from log_level_from_string()
346 * which only parses the priority and refuses any facility value */
347
348 r = safe_atoi(s, &prio);
349 if (r < 0)
350 return r;
351
352 if (prio < 0 || prio > 999)
353 return -ERANGE;
354
355 return prio;
356}
357
ec20fe5f 358static int stdout_stream_line(StdoutStream *s, char *p, LineBreak line_break) {
cfa1b98e 359 char *orig;
5fe7fb0b 360 int r;
a45b9fca
LP
361
362 assert(s);
363 assert(p);
364
cfa1b98e 365 orig = p;
a45b9fca
LP
366 p = strstrip(p);
367
ec20fe5f 368 /* line breaks by NUL, line max length or EOF are not permissible during the negotiation part of the protocol */
5fe7fb0b
LP
369 if (line_break != LINE_BREAK_NEWLINE && s->state != STDOUT_STREAM_RUNNING)
370 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL),
371 "Control protocol line not properly terminated.");
ec20fe5f 372
a45b9fca
LP
373 switch (s->state) {
374
375 case STDOUT_STREAM_IDENTIFIER:
7a1f1aaa 376 if (!isempty(p)) {
a45b9fca
LP
377 s->identifier = strdup(p);
378 if (!s->identifier)
379 return log_oom();
380 }
381
382 s->state = STDOUT_STREAM_UNIT_ID;
383 return 0;
384
385 case STDOUT_STREAM_UNIT_ID:
7a1f1aaa
LP
386 if (s->ucred.uid == 0 &&
387 unit_name_is_valid(p, UNIT_NAME_PLAIN|UNIT_NAME_INSTANCE)) {
388
389 s->unit_id = strdup(p);
390 if (!s->unit_id)
391 return log_oom();
a45b9fca
LP
392 }
393
394 s->state = STDOUT_STREAM_PRIORITY;
395 return 0;
396
d977ef25
LP
397 case STDOUT_STREAM_PRIORITY: {
398 int priority;
a45b9fca 399
d977ef25
LP
400 priority = syslog_parse_priority_and_facility(p);
401 if (priority < 0)
402 return log_warning_errno(priority, "Failed to parse log priority line: %m");
403
404 s->priority = priority;
a45b9fca
LP
405 s->state = STDOUT_STREAM_LEVEL_PREFIX;
406 return 0;
d977ef25 407 }
a45b9fca
LP
408
409 case STDOUT_STREAM_LEVEL_PREFIX:
410 r = parse_boolean(p);
d267ac6e
LP
411 if (r < 0)
412 return log_warning_errno(r, "Failed to parse level prefix line: %m");
a45b9fca 413
5d904a6a 414 s->level_prefix = r;
a45b9fca
LP
415 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
416 return 0;
417
418 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
419 r = parse_boolean(p);
d267ac6e
LP
420 if (r < 0)
421 return log_warning_errno(r, "Failed to parse forward to syslog line: %m");
a45b9fca 422
5d904a6a 423 s->forward_to_syslog = r;
a45b9fca
LP
424 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
425 return 0;
426
427 case STDOUT_STREAM_FORWARD_TO_KMSG:
428 r = parse_boolean(p);
d267ac6e
LP
429 if (r < 0)
430 return log_warning_errno(r, "Failed to parse copy to kmsg line: %m");
a45b9fca 431
5d904a6a 432 s->forward_to_kmsg = r;
a45b9fca
LP
433 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
434 return 0;
435
436 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
437 r = parse_boolean(p);
d267ac6e
LP
438 if (r < 0)
439 return log_warning_errno(r, "Failed to parse copy to console line.");
a45b9fca 440
5d904a6a 441 s->forward_to_console = r;
a45b9fca 442 s->state = STDOUT_STREAM_RUNNING;
13790add
LP
443
444 /* Try to save the stream, so that journald can be restarted and we can recover */
445 (void) stdout_stream_save(s);
a45b9fca
LP
446 return 0;
447
448 case STDOUT_STREAM_RUNNING:
ec20fe5f 449 return stdout_stream_log(s, orig, line_break);
a45b9fca
LP
450 }
451
452 assert_not_reached("Unknown stream state");
453}
454
45ba1ea5
LP
455static int stdout_stream_found(
456 StdoutStream *s,
457 char *p,
458 size_t l,
459 LineBreak line_break) {
460
461 char saved;
a45b9fca
LP
462 int r;
463
464 assert(s);
45ba1ea5
LP
465 assert(p);
466
467 /* Let's NUL terminate the specified buffer for this call, and revert back afterwards */
468 saved = p[l];
469 p[l] = 0;
470 r = stdout_stream_line(s, p, line_break);
471 p[l] = saved;
a45b9fca 472
45ba1ea5
LP
473 return r;
474}
475
80e97206
YS
476static size_t stdout_stream_line_max(StdoutStream *s) {
477 assert(s);
478
479 /* During the "setup" phase of our protocol, let's ensure we use a line length where a full unit name
480 * can fit in */
481 if (s->state != STDOUT_STREAM_RUNNING)
482 return STDOUT_STREAM_SETUP_PROTOCOL_LINE_MAX;
483
484 /* After the protocol's "setup" phase is complete, let's use whatever the user configured */
485 return s->server->line_max;
486}
487
45ba1ea5
LP
488static int stdout_stream_scan(
489 StdoutStream *s,
490 char *p,
491 size_t remaining,
492 LineBreak force_flush,
493 size_t *ret_consumed) {
95cbb83c 494
80e97206 495 size_t consumed = 0, line_max;
45ba1ea5
LP
496 int r;
497
498 assert(s);
499 assert(p);
95cbb83c 500
80e97206
YS
501 line_max = stdout_stream_line_max(s);
502
a45b9fca 503 for (;;) {
ec20fe5f 504 LineBreak line_break;
45ba1ea5 505 size_t skip, found;
ec20fe5f 506 char *end1, *end2;
80e97206 507 size_t tmp_remaining = MIN(remaining, line_max);
ec20fe5f 508
80e97206
YS
509 end1 = memchr(p, '\n', tmp_remaining);
510 end2 = memchr(p, 0, end1 ? (size_t) (end1 - p) : tmp_remaining);
ec20fe5f
LP
511
512 if (end2) {
513 /* We found a NUL terminator */
45ba1ea5
LP
514 found = end2 - p;
515 skip = found + 1;
ec20fe5f
LP
516 line_break = LINE_BREAK_NUL;
517 } else if (end1) {
518 /* We found a \n terminator */
45ba1ea5
LP
519 found = end1 - p;
520 skip = found + 1;
ec20fe5f 521 line_break = LINE_BREAK_NEWLINE;
80e97206 522 } else if (remaining >= line_max) {
ec20fe5f 523 /* Force a line break after the maximum line length */
80e97206 524 found = skip = line_max;
ec20fe5f 525 line_break = LINE_BREAK_LINE_MAX;
a45b9fca
LP
526 } else
527 break;
528
45ba1ea5 529 r = stdout_stream_found(s, p, found, line_break);
a45b9fca
LP
530 if (r < 0)
531 return r;
532
a45b9fca 533 p += skip;
45ba1ea5
LP
534 consumed += skip;
535 remaining -= skip;
a45b9fca
LP
536 }
537
45ba1ea5
LP
538 if (force_flush >= 0 && remaining > 0) {
539 r = stdout_stream_found(s, p, remaining, force_flush);
a45b9fca
LP
540 if (r < 0)
541 return r;
542
45ba1ea5 543 consumed += remaining;
a45b9fca
LP
544 }
545
45ba1ea5
LP
546 if (ret_consumed)
547 *ret_consumed = consumed;
a45b9fca
LP
548
549 return 0;
550}
551
f9a810be 552static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
fb29cdbe 553 CMSG_BUFFER_TYPE(CMSG_SPACE(sizeof(struct ucred))) control;
319a4f4b 554 size_t limit, consumed, allocated;
f9a810be 555 StdoutStream *s = userdata;
371d72e0 556 struct ucred *ucred;
09d0b46a 557 struct iovec iovec;
a45b9fca 558 ssize_t l;
45ba1ea5 559 char *p;
a45b9fca
LP
560 int r;
561
09d0b46a
LB
562 struct msghdr msghdr = {
563 .msg_iov = &iovec,
564 .msg_iovlen = 1,
fb29cdbe
LP
565 .msg_control = &control,
566 .msg_controllen = sizeof(control),
09d0b46a
LB
567 };
568
a45b9fca
LP
569 assert(s);
570
f9a810be
LP
571 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
572 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
91bf3b3e 573 goto terminate;
f9a810be
LP
574 }
575
034e9719 576 /* If the buffer is almost full, add room for another 1K */
319a4f4b
LP
577 allocated = MALLOC_ELEMENTSOF(s->buffer);
578 if (s->length + 512 >= allocated) {
579 if (!GREEDY_REALLOC(s->buffer, s->length + 1 + 1024)) {
ec20fe5f
LP
580 log_oom();
581 goto terminate;
582 }
319a4f4b
LP
583
584 allocated = MALLOC_ELEMENTSOF(s->buffer);
ec20fe5f 585 }
a45b9fca 586
ec20fe5f
LP
587 /* Try to make use of the allocated buffer in full, but never read more than the configured line size. Also,
588 * always leave room for a terminating NUL we might need to add. */
319a4f4b 589 limit = MIN(allocated - 1, MAX(s->server->line_max, STDOUT_STREAM_SETUP_PROTOCOL_LINE_MAX));
45ba1ea5 590 assert(s->length <= limit);
09d0b46a
LB
591 iovec = IOVEC_MAKE(s->buffer + s->length, limit - s->length);
592
593 l = recvmsg(s->fd, &msghdr, MSG_DONTWAIT|MSG_CMSG_CLOEXEC);
ec20fe5f 594 if (l < 0) {
09d0b46a 595 if (IN_SET(errno, EINTR, EAGAIN))
a45b9fca
LP
596 return 0;
597
56f64d95 598 log_warning_errno(errno, "Failed to read from stream: %m");
91bf3b3e 599 goto terminate;
a45b9fca 600 }
09d0b46a 601 cmsg_close_all(&msghdr);
a45b9fca
LP
602
603 if (l == 0) {
45ba1ea5 604 (void) stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_EOF, NULL);
91bf3b3e 605 goto terminate;
a45b9fca
LP
606 }
607
45ba1ea5
LP
608 /* Invalidate the context if the PID of the sender changed. This happens when a forked process
609 * inherits stdout/stderr from a parent. In this case getpeercred() returns the ucred of the parent,
610 * which can be invalid if the parent has exited in the meantime. */
371d72e0 611 ucred = CMSG_FIND_DATA(&msghdr, SOL_SOCKET, SCM_CREDENTIALS, struct ucred);
09d0b46a 612 if (ucred && ucred->pid != s->ucred.pid) {
45ba1ea5 613 /* Force out any previously half-written lines from a different process, before we switch to
371d72e0 614 * the new ucred structure for everything we just added */
45ba1ea5 615 r = stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_PID_CHANGE, NULL);
09d0b46a
LB
616 if (r < 0)
617 goto terminate;
618
020b4a02 619 s->context = client_context_release(s->server, s->context);
45ba1ea5
LP
620
621 p = s->buffer + s->length;
622 } else {
623 p = s->buffer;
624 l += s->length;
09d0b46a
LB
625 }
626
45ba1ea5
LP
627 /* Always copy in the new credentials */
628 if (ucred)
629 s->ucred = *ucred;
630
631 r = stdout_stream_scan(s, p, l, _LINE_BREAK_INVALID, &consumed);
a45b9fca 632 if (r < 0)
91bf3b3e 633 goto terminate;
a45b9fca 634
45ba1ea5
LP
635 /* Move what wasn't consumed to the front of the buffer */
636 assert(consumed <= (size_t) l);
637 s->length = l - consumed;
638 memmove(s->buffer, p + consumed, s->length);
639
a45b9fca
LP
640 return 1;
641
91bf3b3e 642terminate:
13790add 643 stdout_stream_destroy(s);
f9a810be 644 return 0;
a45b9fca
LP
645}
646
9541f5ff 647int stdout_stream_install(Server *s, int fd, StdoutStream **ret) {
13790add 648 _cleanup_(stdout_stream_freep) StdoutStream *stream = NULL;
ec20fe5f 649 sd_id128_t id;
13790add
LP
650 int r;
651
a45b9fca 652 assert(s);
13790add 653 assert(fd >= 0);
a45b9fca 654
ec20fe5f
LP
655 r = sd_id128_randomize(&id);
656 if (r < 0)
657 return log_error_errno(r, "Failed to generate stream ID: %m");
658
d98580e4 659 stream = new(StdoutStream, 1);
13790add
LP
660 if (!stream)
661 return log_oom();
a45b9fca 662
d98580e4
LP
663 *stream = (StdoutStream) {
664 .fd = -1,
665 .priority = LOG_INFO,
666 };
a45b9fca 667
ec20fe5f
LP
668 xsprintf(stream->id_field, "_STREAM_ID=" SD_ID128_FORMAT_STR, SD_ID128_FORMAT_VAL(id));
669
13790add
LP
670 r = getpeercred(fd, &stream->ucred);
671 if (r < 0)
672 return log_error_errno(r, "Failed to determine peer credentials: %m");
a45b9fca 673
09d0b46a
LB
674 r = setsockopt_int(fd, SOL_SOCKET, SO_PASSCRED, true);
675 if (r < 0)
676 return log_error_errno(r, "SO_PASSCRED failed: %m");
677
6d395665 678 if (mac_selinux_use()) {
2de56f70
ZJS
679 r = getpeersec(fd, &stream->label);
680 if (r < 0 && r != -EOPNOTSUPP)
681 (void) log_warning_errno(r, "Failed to determine peer security context: %m");
13790add 682 }
a45b9fca 683
13790add
LP
684 (void) shutdown(fd, SHUT_WR);
685
686 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
687 if (r < 0)
688 return log_error_errno(r, "Failed to add stream to event loop: %m");
689
690 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
691 if (r < 0)
692 return log_error_errno(r, "Failed to adjust stdout event source priority: %m");
693
694 stream->fd = fd;
695
696 stream->server = s;
697 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
313cefa1 698 s->n_stdout_streams++;
13790add 699
65c398c0
LP
700 (void) server_start_or_stop_idle_timer(s); /* Maybe no longer idle? */
701
13790add
LP
702 if (ret)
703 *ret = stream;
704
7e7ef3bf 705 TAKE_PTR(stream);
13790add 706 return 0;
a45b9fca
LP
707}
708
f9a810be 709static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
13790add 710 _cleanup_close_ int fd = -1;
f9a810be 711 Server *s = userdata;
13790add 712 int r;
a45b9fca
LP
713
714 assert(s);
715
baaa35ad
ZJS
716 if (revents != EPOLLIN)
717 return log_error_errno(SYNTHETIC_ERRNO(EIO),
718 "Got invalid event from epoll for stdout server fd: %" PRIx32,
719 revents);
f9a810be 720
a45b9fca
LP
721 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
722 if (fd < 0) {
4ff9bc2e 723 if (ERRNO_IS_ACCEPT_AGAIN(errno))
a45b9fca
LP
724 return 0;
725
e1427b13 726 return log_error_errno(errno, "Failed to accept stdout connection: %m");
a45b9fca
LP
727 }
728
729 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
c8758e72
EV
730 struct ucred u;
731
732 r = getpeercred(fd, &u);
733
734 /* By closing fd here we make sure that the client won't wait too long for journald to
735 * gather all the data it adds to the error message to find out that the connection has
736 * just been refused.
737 */
738 fd = safe_close(fd);
739
740 server_driver_message(s, r < 0 ? 0 : u.pid, NULL, LOG_MESSAGE("Too many stdout streams, refusing connection."), NULL);
a45b9fca
LP
741 return 0;
742 }
743
13790add
LP
744 r = stdout_stream_install(s, fd, NULL);
745 if (r < 0)
746 return r;
747
7e7ef3bf 748 TAKE_FD(fd);
13790add
LP
749 return 0;
750}
751
752static int stdout_stream_load(StdoutStream *stream, const char *fname) {
753 _cleanup_free_ char
754 *priority = NULL,
755 *level_prefix = NULL,
756 *forward_to_syslog = NULL,
757 *forward_to_kmsg = NULL,
ec20fe5f
LP
758 *forward_to_console = NULL,
759 *stream_id = NULL;
13790add
LP
760 int r;
761
762 assert(stream);
763 assert(fname);
764
765 if (!stream->state_file) {
b1852c48 766 stream->state_file = path_join(stream->server->runtime_directory, "streams", fname);
13790add
LP
767 if (!stream->state_file)
768 return log_oom();
a45b9fca
LP
769 }
770
aa8fbc74 771 r = parse_env_file(NULL, stream->state_file,
13790add
LP
772 "PRIORITY", &priority,
773 "LEVEL_PREFIX", &level_prefix,
774 "FORWARD_TO_SYSLOG", &forward_to_syslog,
775 "FORWARD_TO_KMSG", &forward_to_kmsg,
776 "FORWARD_TO_CONSOLE", &forward_to_console,
777 "IDENTIFIER", &stream->identifier,
778 "UNIT", &stream->unit_id,
13df9c39 779 "STREAM_ID", &stream_id);
13790add
LP
780 if (r < 0)
781 return log_error_errno(r, "Failed to read: %s", stream->state_file);
a45b9fca 782
13790add
LP
783 if (priority) {
784 int p;
785
d977ef25 786 p = syslog_parse_priority_and_facility(priority);
13790add
LP
787 if (p >= 0)
788 stream->priority = p;
a45b9fca
LP
789 }
790
13790add
LP
791 if (level_prefix) {
792 r = parse_boolean(level_prefix);
793 if (r >= 0)
794 stream->level_prefix = r;
d682b3a7 795 }
a45b9fca 796
13790add
LP
797 if (forward_to_syslog) {
798 r = parse_boolean(forward_to_syslog);
799 if (r >= 0)
800 stream->forward_to_syslog = r;
a45b9fca
LP
801 }
802
13790add
LP
803 if (forward_to_kmsg) {
804 r = parse_boolean(forward_to_kmsg);
805 if (r >= 0)
806 stream->forward_to_kmsg = r;
f9a810be
LP
807 }
808
13790add
LP
809 if (forward_to_console) {
810 r = parse_boolean(forward_to_console);
811 if (r >= 0)
812 stream->forward_to_console = r;
a45b9fca
LP
813 }
814
ec20fe5f
LP
815 if (stream_id) {
816 sd_id128_t id;
817
818 r = sd_id128_from_string(stream_id, &id);
819 if (r >= 0)
820 xsprintf(stream->id_field, "_STREAM_ID=" SD_ID128_FORMAT_STR, SD_ID128_FORMAT_VAL(id));
821 }
822
13790add
LP
823 return 0;
824}
825
826static int stdout_stream_restore(Server *s, const char *fname, int fd) {
827 StdoutStream *stream;
828 int r;
829
830 assert(s);
831 assert(fname);
832 assert(fd >= 0);
833
834 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
835 log_warning("Too many stdout streams, refusing restoring of stream.");
836 return -ENOBUFS;
837 }
838
839 r = stdout_stream_install(s, fd, &stream);
840 if (r < 0)
841 return r;
842
843 stream->state = STDOUT_STREAM_RUNNING;
844 stream->fdstore = true;
845
846 /* Ignore all parsing errors */
847 (void) stdout_stream_load(stream, fname);
a45b9fca
LP
848
849 return 0;
13790add
LP
850}
851
15d91bff 852int server_restore_streams(Server *s, FDSet *fds) {
13790add
LP
853 _cleanup_closedir_ DIR *d = NULL;
854 struct dirent *de;
b1852c48 855 const char *path;
13790add
LP
856 int r;
857
b1852c48
LP
858 path = strjoina(s->runtime_directory, "/streams");
859 d = opendir(path);
13790add
LP
860 if (!d) {
861 if (errno == ENOENT)
862 return 0;
863
b1852c48 864 return log_warning_errno(errno, "Failed to enumerate %s: %m", path);
13790add
LP
865 }
866
867 FOREACH_DIRENT(de, d, goto fail) {
868 unsigned long st_dev, st_ino;
869 bool found = false;
13790add
LP
870 int fd;
871
872 if (sscanf(de->d_name, "%lu:%lu", &st_dev, &st_ino) != 2)
873 continue;
874
90e74a66 875 FDSET_FOREACH(fd, fds) {
13790add
LP
876 struct stat st;
877
878 if (fstat(fd, &st) < 0)
879 return log_error_errno(errno, "Failed to stat %s: %m", de->d_name);
880
881 if (S_ISSOCK(st.st_mode) && st.st_dev == st_dev && st.st_ino == st_ino) {
882 found = true;
883 break;
884 }
885 }
886
887 if (!found) {
888 /* No file descriptor? Then let's delete the state file */
889 log_debug("Cannot restore stream file %s", de->d_name);
cb51ee7a 890 if (unlinkat(dirfd(d), de->d_name, 0) < 0)
74deaff1 891 log_warning_errno(errno, "Failed to remove %s/%s: %m", path, de->d_name);
13790add
LP
892 continue;
893 }
894
895 fdset_remove(fds, fd);
896
897 r = stdout_stream_restore(s, de->d_name, fd);
898 if (r < 0)
899 safe_close(fd);
900 }
a45b9fca 901
7b77ed8c 902 return 0;
13790add
LP
903
904fail:
905 return log_error_errno(errno, "Failed to read streams directory: %m");
a45b9fca
LP
906}
907
b1852c48 908int server_open_stdout_socket(Server *s, const char *stdout_socket) {
a45b9fca 909 int r;
a45b9fca
LP
910
911 assert(s);
b1852c48 912 assert(stdout_socket);
a45b9fca
LP
913
914 if (s->stdout_fd < 0) {
f36a9d59
ZJS
915 union sockaddr_union sa;
916 socklen_t sa_len;
b1852c48
LP
917
918 r = sockaddr_un_set_path(&sa.un, stdout_socket);
919 if (r < 0)
920 return log_error_errno(r, "Unable to use namespace path %s for AF_UNIX socket: %m", stdout_socket);
f36a9d59 921 sa_len = r;
b1852c48 922
a45b9fca 923 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
4a62c710
MS
924 if (s->stdout_fd < 0)
925 return log_error_errno(errno, "socket() failed: %m");
a45b9fca 926
155b6876 927 (void) sockaddr_un_unlink(&sa.un);
a45b9fca 928
f36a9d59 929 r = bind(s->stdout_fd, &sa.sa, sa_len);
4a62c710
MS
930 if (r < 0)
931 return log_error_errno(errno, "bind(%s) failed: %m", sa.un.sun_path);
a45b9fca 932
4a61c3e5 933 (void) chmod(sa.un.sun_path, 0666);
a45b9fca 934
4a62c710
MS
935 if (listen(s->stdout_fd, SOMAXCONN) < 0)
936 return log_error_errno(errno, "listen(%s) failed: %m", sa.un.sun_path);
a45b9fca 937 } else
48440643 938 (void) fd_nonblock(s->stdout_fd, true);
a45b9fca 939
151b9b96 940 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
23bbb0de
MS
941 if (r < 0)
942 return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
f9a810be 943
48cef295 944 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+5);
23bbb0de
MS
945 if (r < 0)
946 return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
a45b9fca
LP
947
948 return 0;
949}
e22aa3d3
LP
950
951void stdout_stream_send_notify(StdoutStream *s) {
952 struct iovec iovec = {
953 .iov_base = (char*) "FDSTORE=1",
fbd0b64f 954 .iov_len = STRLEN("FDSTORE=1"),
e22aa3d3
LP
955 };
956 struct msghdr msghdr = {
957 .msg_iov = &iovec,
958 .msg_iovlen = 1,
959 };
960 struct cmsghdr *cmsg;
961 ssize_t l;
962
963 assert(s);
964 assert(!s->fdstore);
965 assert(s->in_notify_queue);
966 assert(s->server);
967 assert(s->server->notify_fd >= 0);
968
969 /* Store the connection fd in PID 1, so that we get it passed
970 * in again on next start */
971
972 msghdr.msg_controllen = CMSG_SPACE(sizeof(int));
973 msghdr.msg_control = alloca0(msghdr.msg_controllen);
974
975 cmsg = CMSG_FIRSTHDR(&msghdr);
976 cmsg->cmsg_level = SOL_SOCKET;
977 cmsg->cmsg_type = SCM_RIGHTS;
978 cmsg->cmsg_len = CMSG_LEN(sizeof(int));
979
980 memcpy(CMSG_DATA(cmsg), &s->fd, sizeof(int));
981
982 l = sendmsg(s->server->notify_fd, &msghdr, MSG_DONTWAIT|MSG_NOSIGNAL);
983 if (l < 0) {
984 if (errno == EAGAIN)
985 return;
986
987 log_error_errno(errno, "Failed to send stream file descriptor to service manager: %m");
988 } else {
989 log_debug("Successfully sent stream file descriptor to service manager.");
990 s->fdstore = 1;
991 }
992
993 LIST_REMOVE(stdout_stream_notify_queue, s->server->stdout_streams_notify_queue, s);
994 s->in_notify_queue = false;
995
996}