]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journald-stream.c
journald: add support for wall forwarding
[thirdparty/systemd.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2011 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <stddef.h>
25
26 #ifdef HAVE_SELINUX
27 #include <selinux/selinux.h>
28 #endif
29
30 #include "sd-event.h"
31 #include "socket-util.h"
32 #include "selinux-util.h"
33 #include "journald-server.h"
34 #include "journald-stream.h"
35 #include "journald-syslog.h"
36 #include "journald-kmsg.h"
37 #include "journald-console.h"
38 #include "journald-wall.h"
39
40 #define STDOUT_STREAMS_MAX 4096
41
42 typedef enum StdoutStreamState {
43 STDOUT_STREAM_IDENTIFIER,
44 STDOUT_STREAM_UNIT_ID,
45 STDOUT_STREAM_PRIORITY,
46 STDOUT_STREAM_LEVEL_PREFIX,
47 STDOUT_STREAM_FORWARD_TO_SYSLOG,
48 STDOUT_STREAM_FORWARD_TO_KMSG,
49 STDOUT_STREAM_FORWARD_TO_CONSOLE,
50 STDOUT_STREAM_RUNNING
51 } StdoutStreamState;
52
53 struct StdoutStream {
54 Server *server;
55 StdoutStreamState state;
56
57 int fd;
58
59 struct ucred ucred;
60 #ifdef HAVE_SELINUX
61 security_context_t security_context;
62 #endif
63
64 char *identifier;
65 char *unit_id;
66 int priority;
67 bool level_prefix:1;
68 bool forward_to_syslog:1;
69 bool forward_to_kmsg:1;
70 bool forward_to_console:1;
71
72 char buffer[LINE_MAX+1];
73 size_t length;
74
75 sd_event_source *event_source;
76
77 LIST_FIELDS(StdoutStream, stdout_stream);
78 };
79
80 static int stdout_stream_log(StdoutStream *s, const char *p) {
81 struct iovec iovec[N_IOVEC_META_FIELDS + 5];
82 int priority;
83 char syslog_priority[] = "PRIORITY=\0";
84 char syslog_facility[sizeof("SYSLOG_FACILITY=") + DECIMAL_STR_MAX(priority)];
85 _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
86 unsigned n = 0;
87 char *label = NULL;
88 size_t label_len = 0;
89
90 assert(s);
91 assert(p);
92
93 if (isempty(p))
94 return 0;
95
96 priority = s->priority;
97
98 if (s->level_prefix)
99 syslog_parse_priority(&p, &priority, false);
100
101 if (s->forward_to_syslog || s->server->forward_to_syslog)
102 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
103
104 if (s->forward_to_kmsg || s->server->forward_to_kmsg)
105 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
106
107 if (s->forward_to_console || s->server->forward_to_console)
108 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
109
110 if (s->server->forward_to_wall)
111 server_forward_wall(s->server, priority, s->identifier, p, &s->ucred);
112
113 IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
114
115 syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
116 IOVEC_SET_STRING(iovec[n++], syslog_priority);
117
118 if (priority & LOG_FACMASK) {
119 snprintf(syslog_facility, sizeof(syslog_facility), "SYSLOG_FACILITY=%i", LOG_FAC(priority));
120 IOVEC_SET_STRING(iovec[n++], syslog_facility);
121 }
122
123 if (s->identifier) {
124 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
125 if (syslog_identifier)
126 IOVEC_SET_STRING(iovec[n++], syslog_identifier);
127 }
128
129 message = strappend("MESSAGE=", p);
130 if (message)
131 IOVEC_SET_STRING(iovec[n++], message);
132
133 #ifdef HAVE_SELINUX
134 if (s->security_context) {
135 label = (char*) s->security_context;
136 label_len = strlen((char*) s->security_context);
137 }
138 #endif
139
140 server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, label, label_len, s->unit_id, priority, 0);
141 return 0;
142 }
143
144 static int stdout_stream_line(StdoutStream *s, char *p) {
145 int r;
146
147 assert(s);
148 assert(p);
149
150 p = strstrip(p);
151
152 switch (s->state) {
153
154 case STDOUT_STREAM_IDENTIFIER:
155 if (isempty(p))
156 s->identifier = NULL;
157 else {
158 s->identifier = strdup(p);
159 if (!s->identifier)
160 return log_oom();
161 }
162
163 s->state = STDOUT_STREAM_UNIT_ID;
164 return 0;
165
166 case STDOUT_STREAM_UNIT_ID:
167 if (s->ucred.uid == 0) {
168 if (isempty(p))
169 s->unit_id = NULL;
170 else {
171 s->unit_id = strdup(p);
172 if (!s->unit_id)
173 return log_oom();
174 }
175 }
176
177 s->state = STDOUT_STREAM_PRIORITY;
178 return 0;
179
180 case STDOUT_STREAM_PRIORITY:
181 r = safe_atoi(p, &s->priority);
182 if (r < 0 || s->priority < 0 || s->priority > 999) {
183 log_warning("Failed to parse log priority line.");
184 return -EINVAL;
185 }
186
187 s->state = STDOUT_STREAM_LEVEL_PREFIX;
188 return 0;
189
190 case STDOUT_STREAM_LEVEL_PREFIX:
191 r = parse_boolean(p);
192 if (r < 0) {
193 log_warning("Failed to parse level prefix line.");
194 return -EINVAL;
195 }
196
197 s->level_prefix = !!r;
198 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
199 return 0;
200
201 case STDOUT_STREAM_FORWARD_TO_SYSLOG:
202 r = parse_boolean(p);
203 if (r < 0) {
204 log_warning("Failed to parse forward to syslog line.");
205 return -EINVAL;
206 }
207
208 s->forward_to_syslog = !!r;
209 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
210 return 0;
211
212 case STDOUT_STREAM_FORWARD_TO_KMSG:
213 r = parse_boolean(p);
214 if (r < 0) {
215 log_warning("Failed to parse copy to kmsg line.");
216 return -EINVAL;
217 }
218
219 s->forward_to_kmsg = !!r;
220 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
221 return 0;
222
223 case STDOUT_STREAM_FORWARD_TO_CONSOLE:
224 r = parse_boolean(p);
225 if (r < 0) {
226 log_warning("Failed to parse copy to console line.");
227 return -EINVAL;
228 }
229
230 s->forward_to_console = !!r;
231 s->state = STDOUT_STREAM_RUNNING;
232 return 0;
233
234 case STDOUT_STREAM_RUNNING:
235 return stdout_stream_log(s, p);
236 }
237
238 assert_not_reached("Unknown stream state");
239 }
240
241 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
242 char *p;
243 size_t remaining;
244 int r;
245
246 assert(s);
247
248 p = s->buffer;
249 remaining = s->length;
250 for (;;) {
251 char *end;
252 size_t skip;
253
254 end = memchr(p, '\n', remaining);
255 if (end)
256 skip = end - p + 1;
257 else if (remaining >= sizeof(s->buffer) - 1) {
258 end = p + sizeof(s->buffer) - 1;
259 skip = remaining;
260 } else
261 break;
262
263 *end = 0;
264
265 r = stdout_stream_line(s, p);
266 if (r < 0)
267 return r;
268
269 remaining -= skip;
270 p += skip;
271 }
272
273 if (force_flush && remaining > 0) {
274 p[remaining] = 0;
275 r = stdout_stream_line(s, p);
276 if (r < 0)
277 return r;
278
279 p += remaining;
280 remaining = 0;
281 }
282
283 if (p > s->buffer) {
284 memmove(s->buffer, p, remaining);
285 s->length = remaining;
286 }
287
288 return 0;
289 }
290
291 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
292 StdoutStream *s = userdata;
293 ssize_t l;
294 int r;
295
296 assert(s);
297
298 if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
299 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
300 goto terminate;
301 }
302
303 l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
304 if (l < 0) {
305
306 if (errno == EAGAIN)
307 return 0;
308
309 log_warning("Failed to read from stream: %m");
310 goto terminate;
311 }
312
313 if (l == 0) {
314 stdout_stream_scan(s, true);
315 goto terminate;
316 }
317
318 s->length += l;
319 r = stdout_stream_scan(s, false);
320 if (r < 0)
321 goto terminate;
322
323 return 1;
324
325 terminate:
326 stdout_stream_free(s);
327 return 0;
328 }
329
330 void stdout_stream_free(StdoutStream *s) {
331 assert(s);
332
333 if (s->server) {
334 assert(s->server->n_stdout_streams > 0);
335 s->server->n_stdout_streams --;
336 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
337 }
338
339 if (s->event_source) {
340 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
341 s->event_source = sd_event_source_unref(s->event_source);
342 }
343
344 if (s->fd >= 0)
345 close_nointr_nofail(s->fd);
346
347 #ifdef HAVE_SELINUX
348 if (s->security_context)
349 freecon(s->security_context);
350 #endif
351
352 free(s->identifier);
353 free(s->unit_id);
354 free(s);
355 }
356
357 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
358 Server *s = userdata;
359 StdoutStream *stream;
360 int fd, r;
361
362 assert(s);
363
364 if (revents != EPOLLIN) {
365 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
366 return -EIO;
367 }
368
369 fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
370 if (fd < 0) {
371 if (errno == EAGAIN)
372 return 0;
373
374 log_error("Failed to accept stdout connection: %m");
375 return -errno;
376 }
377
378 if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
379 log_warning("Too many stdout streams, refusing connection.");
380 close_nointr_nofail(fd);
381 return 0;
382 }
383
384 stream = new0(StdoutStream, 1);
385 if (!stream) {
386 close_nointr_nofail(fd);
387 return log_oom();
388 }
389
390 stream->fd = fd;
391
392 r = getpeercred(fd, &stream->ucred);
393 if (r < 0) {
394 log_error("Failed to determine peer credentials: %m");
395 goto fail;
396 }
397
398 #ifdef HAVE_SELINUX
399 if (use_selinux()) {
400 if (getpeercon(fd, &stream->security_context) < 0 && errno != ENOPROTOOPT)
401 log_error("Failed to determine peer security context: %m");
402 }
403 #endif
404
405 if (shutdown(fd, SHUT_WR) < 0) {
406 log_error("Failed to shutdown writing side of socket: %m");
407 goto fail;
408 }
409
410 r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
411 if (r < 0) {
412 log_error("Failed to add stream to event loop: %s", strerror(-r));
413 goto fail;
414 }
415
416 r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
417 if (r < 0) {
418 log_error("Failed to adjust stdout event source priority: %s", strerror(-r));
419 goto fail;
420 }
421
422 stream->server = s;
423 LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
424 s->n_stdout_streams ++;
425
426 return 0;
427
428 fail:
429 stdout_stream_free(stream);
430 return 0;
431 }
432
433 int server_open_stdout_socket(Server *s) {
434 int r;
435
436 assert(s);
437
438 if (s->stdout_fd < 0) {
439 union sockaddr_union sa = {
440 .un.sun_family = AF_UNIX,
441 .un.sun_path = "/run/systemd/journal/stdout",
442 };
443
444 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
445 if (s->stdout_fd < 0) {
446 log_error("socket() failed: %m");
447 return -errno;
448 }
449
450 unlink(sa.un.sun_path);
451
452 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
453 if (r < 0) {
454 log_error("bind() failed: %m");
455 return -errno;
456 }
457
458 chmod(sa.un.sun_path, 0666);
459
460 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
461 log_error("listen() failed: %m");
462 return -errno;
463 }
464 } else
465 fd_nonblock(s->stdout_fd, 1);
466
467 r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
468 if (r < 0) {
469 log_error("Failed to add stdout server fd to event source: %s", strerror(-r));
470 return r;
471 }
472
473 r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
474 if (r < 0) {
475 log_error("Failed to adjust priority of stdout server event source: %s", strerror(-r));
476 return r;
477 }
478
479 return 0;
480 }