]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote.c
06be2d08e2bb5e143c754c3898bd2979038ef6e3
[thirdparty/systemd.git] / src / journal-remote / journal-remote.c
1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <stdlib.h>
6 #include <sys/prctl.h>
7 #include <stdint.h>
8
9 #include "sd-daemon.h"
10
11 #include "af-list.h"
12 #include "alloc-util.h"
13 #include "constants.h"
14 #include "errno-util.h"
15 #include "escape.h"
16 #include "fd-util.h"
17 #include "journal-file-util.h"
18 #include "journal-remote-write.h"
19 #include "journal-remote.h"
20 #include "journald-native.h"
21 #include "macro.h"
22 #include "parse-util.h"
23 #include "parse-helpers.h"
24 #include "process-util.h"
25 #include "socket-util.h"
26 #include "stdio-util.h"
27 #include "string-util.h"
28 #include "strv.h"
29
30 #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
31
32 #define filename_escape(s) xescape((s), "/ ")
33
34 #if HAVE_MICROHTTPD
35 MHDDaemonWrapper *MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
36 if (!d)
37 return NULL;
38
39 if (d->daemon)
40 MHD_stop_daemon(d->daemon);
41 sd_event_source_unref(d->io_event);
42 sd_event_source_unref(d->timer_event);
43
44 return mfree(d);
45 }
46 #endif
47
48 static int open_output(RemoteServer *s, Writer *w, const char* host) {
49 _cleanup_free_ char *_filename = NULL;
50 const char *filename;
51 int r;
52
53 assert(s);
54 assert(w);
55
56 switch (s->split_mode) {
57 case JOURNAL_WRITE_SPLIT_NONE:
58 filename = s->output;
59 break;
60
61 case JOURNAL_WRITE_SPLIT_HOST: {
62 _cleanup_free_ char *name = NULL;
63
64 assert(host);
65
66 name = filename_escape(host);
67 if (!name)
68 return log_oom();
69
70 r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
71 if (r < 0)
72 return log_oom();
73
74 filename = _filename;
75 break;
76 }
77
78 default:
79 assert_not_reached();
80 }
81
82 r = journal_file_open_reliably(
83 filename,
84 O_RDWR|O_CREAT,
85 s->file_flags,
86 0640,
87 UINT64_MAX,
88 &w->metrics,
89 w->mmap,
90 NULL,
91 &w->journal);
92 if (r < 0)
93 return log_error_errno(r, "Failed to open output journal %s: %m", filename);
94
95 log_debug("Opened output file %s", w->journal->path);
96 return 0;
97 }
98
99 /**********************************************************************
100 **********************************************************************
101 **********************************************************************/
102
103 static int init_writer_hashmap(RemoteServer *s) {
104 static const struct hash_ops* const hash_ops[] = {
105 [JOURNAL_WRITE_SPLIT_NONE] = NULL,
106 [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
107 };
108
109 assert(s);
110 assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
111
112 s->writers = hashmap_new(hash_ops[s->split_mode]);
113 if (!s->writers)
114 return log_oom();
115
116 return 0;
117 }
118
119 int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
120 _cleanup_(writer_unrefp) Writer *w = NULL;
121 const void *key;
122 int r;
123
124 assert(s);
125 assert(writer);
126
127 switch (s->split_mode) {
128 case JOURNAL_WRITE_SPLIT_NONE:
129 key = "one and only";
130 break;
131
132 case JOURNAL_WRITE_SPLIT_HOST:
133 assert(host);
134 key = host;
135 break;
136
137 default:
138 assert_not_reached();
139 }
140
141 w = hashmap_get(s->writers, key);
142 if (w)
143 writer_ref(w);
144 else {
145 r = writer_new(s, &w);
146 if (r < 0)
147 return r;
148
149 if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
150 w->hashmap_key = strdup(key);
151 if (!w->hashmap_key)
152 return -ENOMEM;
153 }
154
155 r = open_output(s, w, host);
156 if (r < 0)
157 return r;
158
159 r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
160 if (r < 0)
161 return r;
162 }
163
164 *writer = TAKE_PTR(w);
165 return 0;
166 }
167
168 /**********************************************************************
169 **********************************************************************
170 **********************************************************************/
171
172 /* This should go away as soon as μhttpd allows state to be passed around. */
173 RemoteServer *journal_remote_server_global;
174
175 static int dispatch_raw_source_event(sd_event_source *event,
176 int fd,
177 uint32_t revents,
178 void *userdata);
179 static int dispatch_raw_source_until_block(sd_event_source *event,
180 void *userdata);
181 static int dispatch_blocking_source_event(sd_event_source *event,
182 void *userdata);
183 static int dispatch_raw_connection_event(sd_event_source *event,
184 int fd,
185 uint32_t revents,
186 void *userdata);
187
188 static int get_source_for_fd(RemoteServer *s,
189 int fd, char *name, RemoteSource **source) {
190 Writer *writer;
191 int r;
192
193 /* This takes ownership of name, but only on success. */
194
195 assert(s);
196 assert(fd >= 0);
197 assert(source);
198
199 if (!GREEDY_REALLOC0(s->sources, fd + 1))
200 return log_oom();
201
202 r = journal_remote_get_writer(s, name, &writer);
203 if (r < 0)
204 return log_warning_errno(r, "Failed to get writer for source %s: %m",
205 name);
206
207 if (!s->sources[fd]) {
208 s->sources[fd] = source_new(fd, false, name, writer);
209 if (!s->sources[fd]) {
210 writer_unref(writer);
211 return log_oom();
212 }
213
214 s->active++;
215 }
216
217 *source = s->sources[fd];
218 return 0;
219 }
220
221 static int remove_source(RemoteServer *s, int fd) {
222 RemoteSource *source;
223
224 assert(s);
225 assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
226
227 source = s->sources[fd];
228 if (source) {
229 /* this closes fd too */
230 source_free(source);
231 s->sources[fd] = NULL;
232 s->active--;
233 }
234
235 return 0;
236 }
237
238 int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
239 RemoteSource *source = NULL;
240 int r;
241
242 /* This takes ownership of name, even on failure, if own_name is true. */
243
244 assert(s);
245 assert(fd >= 0);
246 assert(name);
247
248 if (!own_name) {
249 name = strdup(name);
250 if (!name)
251 return log_oom();
252 }
253
254 r = get_source_for_fd(s, fd, name, &source);
255 if (r < 0) {
256 log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
257 fd, name);
258 free(name);
259 return r;
260 }
261
262 r = sd_event_add_io(s->events, &source->event,
263 fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
264 dispatch_raw_source_event, source);
265 if (r == 0) {
266 /* Add additional source for buffer processing. It will be
267 * enabled later. */
268 r = sd_event_add_defer(s->events, &source->buffer_event,
269 dispatch_raw_source_until_block, source);
270 if (r == 0)
271 r = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
272 } else if (r == -EPERM) {
273 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
274 r = sd_event_add_defer(s->events, &source->event,
275 dispatch_blocking_source_event, source);
276 if (r == 0)
277 r = sd_event_source_set_enabled(source->event, SD_EVENT_ON);
278 }
279 if (r < 0) {
280 log_error_errno(r, "Failed to register event source for fd:%d: %m",
281 fd);
282 goto error;
283 }
284
285 r = sd_event_source_set_description(source->event, name);
286 if (r < 0) {
287 log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
288 goto error;
289 }
290
291 return 1; /* work to do */
292
293 error:
294 remove_source(s, fd);
295 return r;
296 }
297
298 int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
299 _unused_ _cleanup_close_ int fd_ = fd;
300 char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
301 int r;
302
303 assert(s);
304 assert(fd >= 0);
305
306 r = sd_event_add_io(s->events, &s->listen_event,
307 fd, EPOLLIN,
308 dispatch_raw_connection_event, s);
309 if (r < 0)
310 return r;
311
312 xsprintf(name, "raw-socket-%d", fd);
313
314 r = sd_event_source_set_description(s->listen_event, name);
315 if (r < 0)
316 return r;
317
318 TAKE_FD(fd_);
319 s->active++;
320 return 0;
321 }
322
323 /**********************************************************************
324 **********************************************************************
325 **********************************************************************/
326
327 int journal_remote_server_init(
328 RemoteServer *s,
329 const char *output,
330 JournalWriteSplitMode split_mode,
331 JournalFileFlags file_flags) {
332
333 int r;
334
335 assert(s);
336
337 assert(journal_remote_server_global == NULL);
338 journal_remote_server_global = s;
339
340 s->split_mode = split_mode;
341 s->file_flags = file_flags;
342
343 if (output)
344 s->output = output;
345 else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
346 s->output = REMOTE_JOURNAL_PATH "/remote.journal";
347 else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
348 s->output = REMOTE_JOURNAL_PATH;
349 else
350 assert_not_reached();
351
352 r = sd_event_default(&s->events);
353 if (r < 0)
354 return log_error_errno(r, "Failed to allocate event loop: %m");
355
356 r = init_writer_hashmap(s);
357 if (r < 0)
358 return r;
359
360 return 0;
361 }
362
363 void journal_remote_server_destroy(RemoteServer *s) {
364 size_t i;
365
366 if (!s)
367 return;
368
369 #if HAVE_MICROHTTPD
370 hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
371 #endif
372
373 for (i = 0; i < MALLOC_ELEMENTSOF(s->sources); i++)
374 remove_source(s, i);
375 free(s->sources);
376
377 writer_unref(s->_single_writer);
378 hashmap_free(s->writers);
379
380 sd_event_source_unref(s->sigterm_event);
381 sd_event_source_unref(s->sigint_event);
382 sd_event_source_unref(s->listen_event);
383 sd_event_unref(s->events);
384
385 if (s == journal_remote_server_global)
386 journal_remote_server_global = NULL;
387
388 /* fds that we're listening on remain open... */
389 }
390
391 /**********************************************************************
392 **********************************************************************
393 **********************************************************************/
394
395 int journal_remote_handle_raw_source(
396 sd_event_source *event,
397 int fd,
398 uint32_t revents,
399 RemoteServer *s) {
400
401 RemoteSource *source;
402 int r;
403
404 /* Returns 1 if there might be more data pending,
405 * 0 if data is currently exhausted, negative on error.
406 */
407
408 assert(s);
409 assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
410 source = s->sources[fd];
411 assert(source->importer.fd == fd);
412
413 r = process_source(source, s->file_flags);
414 if (journal_importer_eof(&source->importer)) {
415 size_t remaining;
416
417 log_debug("EOF reached with source %s (fd=%d)",
418 source->importer.name, source->importer.fd);
419
420 remaining = journal_importer_bytes_remaining(&source->importer);
421 if (remaining > 0)
422 log_notice("Premature EOF. %zu bytes lost.", remaining);
423 remove_source(s, source->importer.fd);
424 log_debug("%zu active sources remaining", s->active);
425 return 0;
426 } else if (r == -E2BIG) {
427 log_notice("Entry with too many fields, skipped");
428 return 1;
429 } else if (r == -ENOBUFS) {
430 log_notice("Entry too big, skipped");
431 return 1;
432 } else if (r == -EAGAIN) {
433 return 0;
434 } else if (r < 0) {
435 log_debug_errno(r, "Closing connection: %m");
436 remove_source(s, fd);
437 return 0;
438 } else
439 return 1;
440 }
441
442 static int dispatch_raw_source_until_block(sd_event_source *event,
443 void *userdata) {
444 RemoteSource *source = ASSERT_PTR(userdata);
445 int r;
446
447 assert(event);
448
449 /* Make sure event stays around even if source is destroyed */
450 sd_event_source_ref(event);
451
452 r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
453 if (r != 1) {
454 int k;
455
456 /* No more data for now */
457 k = sd_event_source_set_enabled(event, SD_EVENT_OFF);
458 if (k < 0)
459 r = k;
460 }
461
462 sd_event_source_unref(event);
463
464 return r;
465 }
466
467 static int dispatch_raw_source_event(sd_event_source *event,
468 int fd,
469 uint32_t revents,
470 void *userdata) {
471 RemoteSource *source = ASSERT_PTR(userdata);
472 int r;
473
474 assert(source->event);
475 assert(source->buffer_event);
476
477 r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
478 if (r == 1) {
479 int k;
480
481 /* Might have more data. We need to rerun the handler
482 * until we are sure the buffer is exhausted. */
483 k = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
484 if (k < 0)
485 r = k;
486 }
487
488 return r;
489 }
490
491 static int dispatch_blocking_source_event(sd_event_source *event,
492 void *userdata) {
493 RemoteSource *source = ASSERT_PTR(userdata);
494
495 return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
496 }
497
498 static int accept_connection(
499 const char* type,
500 int fd,
501 SocketAddress *addr,
502 char **hostname) {
503
504 _cleanup_close_ int fd2 = -EBADF;
505 int r;
506
507 assert(addr);
508 assert(hostname);
509
510 log_debug("Accepting new %s connection on fd:%d", type, fd);
511 fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
512 if (fd2 < 0) {
513 if (ERRNO_IS_ACCEPT_AGAIN(errno))
514 return -EAGAIN;
515
516 return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
517 }
518
519 switch (socket_address_family(addr)) {
520 case AF_INET:
521 case AF_INET6: {
522 _cleanup_free_ char *a = NULL;
523 char *b;
524
525 r = socket_address_print(addr, &a);
526 if (r < 0)
527 return log_error_errno(r, "socket_address_print(): %m");
528
529 r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b);
530 if (r < 0)
531 return log_error_errno(r, "Resolving hostname failed: %m");
532
533 log_debug("Accepted %s %s connection from %s",
534 type,
535 af_to_ipv4_ipv6(socket_address_family(addr)),
536 a);
537
538 *hostname = b;
539 return TAKE_FD(fd2);
540 }
541
542 default:
543 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
544 "Rejected %s connection with unsupported family %d",
545 type, socket_address_family(addr));
546 }
547 }
548
549 static int dispatch_raw_connection_event(
550 sd_event_source *event,
551 int fd,
552 uint32_t revents,
553 void *userdata) {
554
555 RemoteServer *s = ASSERT_PTR(userdata);
556 int fd2;
557 SocketAddress addr = {
558 .size = sizeof(union sockaddr_union),
559 .type = SOCK_STREAM,
560 };
561 char *hostname = NULL;
562
563 fd2 = accept_connection("raw", fd, &addr, &hostname);
564 if (fd2 == -EAGAIN)
565 return 0;
566 if (fd2 < 0)
567 return fd2;
568
569 return journal_remote_add_source(s, fd2, hostname, true);
570 }