]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote.c
journal: Serialize __MONOTONIC_TIMESTAMP metadata field as well
[thirdparty/systemd.git] / src / journal-remote / journal-remote.c
1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <stdlib.h>
6 #include <sys/prctl.h>
7 #include <stdint.h>
8
9 #include "sd-daemon.h"
10
11 #include "af-list.h"
12 #include "alloc-util.h"
13 #include "constants.h"
14 #include "errno-util.h"
15 #include "escape.h"
16 #include "fd-util.h"
17 #include "journal-file-util.h"
18 #include "journal-remote-write.h"
19 #include "journal-remote.h"
20 #include "macro.h"
21 #include "parse-util.h"
22 #include "parse-helpers.h"
23 #include "process-util.h"
24 #include "socket-util.h"
25 #include "stdio-util.h"
26 #include "string-util.h"
27 #include "strv.h"
28
29 #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
30
31 #define filename_escape(s) xescape((s), "/ ")
32
33 #if HAVE_MICROHTTPD
34 MHDDaemonWrapper *MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
35 if (!d)
36 return NULL;
37
38 if (d->daemon)
39 MHD_stop_daemon(d->daemon);
40 sd_event_source_unref(d->io_event);
41 sd_event_source_unref(d->timer_event);
42
43 return mfree(d);
44 }
45 #endif
46
47 static int open_output(RemoteServer *s, Writer *w, const char* host) {
48 _cleanup_free_ char *_filename = NULL;
49 const char *filename;
50 int r;
51
52 assert(s);
53 assert(w);
54
55 switch (s->split_mode) {
56 case JOURNAL_WRITE_SPLIT_NONE:
57 filename = s->output;
58 break;
59
60 case JOURNAL_WRITE_SPLIT_HOST: {
61 _cleanup_free_ char *name = NULL;
62
63 assert(host);
64
65 name = filename_escape(host);
66 if (!name)
67 return log_oom();
68
69 r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
70 if (r < 0)
71 return log_oom();
72
73 filename = _filename;
74 break;
75 }
76
77 default:
78 assert_not_reached();
79 }
80
81 r = journal_file_open_reliably(
82 filename,
83 O_RDWR|O_CREAT,
84 s->file_flags,
85 0640,
86 UINT64_MAX,
87 &w->metrics,
88 w->mmap,
89 &w->journal);
90 if (r < 0)
91 return log_error_errno(r, "Failed to open output journal %s: %m", filename);
92
93 log_debug("Opened output file %s", w->journal->path);
94 return 0;
95 }
96
97 /**********************************************************************
98 **********************************************************************
99 **********************************************************************/
100
101 static int init_writer_hashmap(RemoteServer *s) {
102 static const struct hash_ops* const hash_ops[] = {
103 [JOURNAL_WRITE_SPLIT_NONE] = NULL,
104 [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
105 };
106
107 assert(s);
108 assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
109
110 s->writers = hashmap_new(hash_ops[s->split_mode]);
111 if (!s->writers)
112 return log_oom();
113
114 return 0;
115 }
116
117 int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
118 _cleanup_(writer_unrefp) Writer *w = NULL;
119 const void *key;
120 int r;
121
122 assert(s);
123 assert(writer);
124
125 switch (s->split_mode) {
126 case JOURNAL_WRITE_SPLIT_NONE:
127 key = "one and only";
128 break;
129
130 case JOURNAL_WRITE_SPLIT_HOST:
131 assert(host);
132 key = host;
133 break;
134
135 default:
136 assert_not_reached();
137 }
138
139 w = hashmap_get(s->writers, key);
140 if (w)
141 writer_ref(w);
142 else {
143 r = writer_new(s, &w);
144 if (r < 0)
145 return r;
146
147 if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
148 w->hashmap_key = strdup(key);
149 if (!w->hashmap_key)
150 return -ENOMEM;
151 }
152
153 r = open_output(s, w, host);
154 if (r < 0)
155 return r;
156
157 r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
158 if (r < 0)
159 return r;
160 }
161
162 *writer = TAKE_PTR(w);
163 return 0;
164 }
165
166 /**********************************************************************
167 **********************************************************************
168 **********************************************************************/
169
170 /* This should go away as soon as μhttpd allows state to be passed around. */
171 RemoteServer *journal_remote_server_global;
172
173 static int dispatch_raw_source_event(sd_event_source *event,
174 int fd,
175 uint32_t revents,
176 void *userdata);
177 static int dispatch_raw_source_until_block(sd_event_source *event,
178 void *userdata);
179 static int dispatch_blocking_source_event(sd_event_source *event,
180 void *userdata);
181 static int dispatch_raw_connection_event(sd_event_source *event,
182 int fd,
183 uint32_t revents,
184 void *userdata);
185
186 static int get_source_for_fd(RemoteServer *s,
187 int fd, char *name, RemoteSource **source) {
188 Writer *writer;
189 int r;
190
191 /* This takes ownership of name, but only on success. */
192
193 assert(s);
194 assert(fd >= 0);
195 assert(source);
196
197 if (!GREEDY_REALLOC0(s->sources, fd + 1))
198 return log_oom();
199
200 r = journal_remote_get_writer(s, name, &writer);
201 if (r < 0)
202 return log_warning_errno(r, "Failed to get writer for source %s: %m",
203 name);
204
205 if (!s->sources[fd]) {
206 s->sources[fd] = source_new(fd, false, name, writer);
207 if (!s->sources[fd]) {
208 writer_unref(writer);
209 return log_oom();
210 }
211
212 s->active++;
213 }
214
215 *source = s->sources[fd];
216 return 0;
217 }
218
219 static int remove_source(RemoteServer *s, int fd) {
220 RemoteSource *source;
221
222 assert(s);
223 assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
224
225 source = s->sources[fd];
226 if (source) {
227 /* this closes fd too */
228 source_free(source);
229 s->sources[fd] = NULL;
230 s->active--;
231 }
232
233 return 0;
234 }
235
236 int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
237 RemoteSource *source = NULL;
238 int r;
239
240 /* This takes ownership of name, even on failure, if own_name is true. */
241
242 assert(s);
243 assert(fd >= 0);
244 assert(name);
245
246 if (!own_name) {
247 name = strdup(name);
248 if (!name)
249 return log_oom();
250 }
251
252 r = get_source_for_fd(s, fd, name, &source);
253 if (r < 0) {
254 log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
255 fd, name);
256 free(name);
257 return r;
258 }
259
260 r = sd_event_add_io(s->event, &source->event,
261 fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
262 dispatch_raw_source_event, source);
263 if (r == 0) {
264 /* Add additional source for buffer processing. It will be
265 * enabled later. */
266 r = sd_event_add_defer(s->event, &source->buffer_event,
267 dispatch_raw_source_until_block, source);
268 if (r == 0)
269 r = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
270 } else if (r == -EPERM) {
271 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
272 r = sd_event_add_defer(s->event, &source->event,
273 dispatch_blocking_source_event, source);
274 if (r == 0)
275 r = sd_event_source_set_enabled(source->event, SD_EVENT_ON);
276 }
277 if (r < 0) {
278 log_error_errno(r, "Failed to register event source for fd:%d: %m",
279 fd);
280 goto error;
281 }
282
283 r = sd_event_source_set_description(source->event, name);
284 if (r < 0) {
285 log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
286 goto error;
287 }
288
289 return 1; /* work to do */
290
291 error:
292 remove_source(s, fd);
293 return r;
294 }
295
296 int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
297 _unused_ _cleanup_close_ int fd_ = fd;
298 char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
299 int r;
300
301 assert(s);
302 assert(fd >= 0);
303
304 r = sd_event_add_io(s->event, &s->listen_event,
305 fd, EPOLLIN,
306 dispatch_raw_connection_event, s);
307 if (r < 0)
308 return r;
309
310 xsprintf(name, "raw-socket-%d", fd);
311
312 r = sd_event_source_set_description(s->listen_event, name);
313 if (r < 0)
314 return r;
315
316 TAKE_FD(fd_);
317 s->active++;
318 return 0;
319 }
320
321 /**********************************************************************
322 **********************************************************************
323 **********************************************************************/
324
325 int journal_remote_server_init(
326 RemoteServer *s,
327 const char *output,
328 JournalWriteSplitMode split_mode,
329 JournalFileFlags file_flags) {
330
331 int r;
332
333 assert(s);
334
335 assert(journal_remote_server_global == NULL);
336 journal_remote_server_global = s;
337
338 s->split_mode = split_mode;
339 s->file_flags = file_flags;
340
341 if (output)
342 s->output = output;
343 else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
344 s->output = REMOTE_JOURNAL_PATH "/remote.journal";
345 else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
346 s->output = REMOTE_JOURNAL_PATH;
347 else
348 assert_not_reached();
349
350 r = sd_event_default(&s->event);
351 if (r < 0)
352 return log_error_errno(r, "Failed to allocate event loop: %m");
353
354 r = init_writer_hashmap(s);
355 if (r < 0)
356 return r;
357
358 return 0;
359 }
360
361 void journal_remote_server_destroy(RemoteServer *s) {
362 size_t i;
363
364 if (!s)
365 return;
366
367 #if HAVE_MICROHTTPD
368 hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
369 #endif
370
371 for (i = 0; i < MALLOC_ELEMENTSOF(s->sources); i++)
372 remove_source(s, i);
373 free(s->sources);
374
375 writer_unref(s->_single_writer);
376 hashmap_free(s->writers);
377
378 sd_event_source_unref(s->listen_event);
379 sd_event_unref(s->event);
380
381 if (s == journal_remote_server_global)
382 journal_remote_server_global = NULL;
383
384 /* fds that we're listening on remain open... */
385 }
386
387 /**********************************************************************
388 **********************************************************************
389 **********************************************************************/
390
391 int journal_remote_handle_raw_source(
392 sd_event_source *event,
393 int fd,
394 uint32_t revents,
395 RemoteServer *s) {
396
397 RemoteSource *source;
398 int r;
399
400 /* Returns 1 if there might be more data pending,
401 * 0 if data is currently exhausted, negative on error.
402 */
403
404 assert(s);
405 assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
406 source = s->sources[fd];
407 assert(source->importer.fd == fd);
408
409 r = process_source(source, s->file_flags);
410 if (journal_importer_eof(&source->importer)) {
411 size_t remaining;
412
413 log_debug("EOF reached with source %s (fd=%d)",
414 source->importer.name, source->importer.fd);
415
416 remaining = journal_importer_bytes_remaining(&source->importer);
417 if (remaining > 0)
418 log_notice("Premature EOF. %zu bytes lost.", remaining);
419 remove_source(s, source->importer.fd);
420 log_debug("%zu active sources remaining", s->active);
421 return 0;
422 } else if (r == -E2BIG) {
423 log_notice("Entry with too many fields, skipped");
424 return 1;
425 } else if (r == -ENOBUFS) {
426 log_notice("Entry too big, skipped");
427 return 1;
428 } else if (r == -EAGAIN) {
429 return 0;
430 } else if (r < 0) {
431 log_debug_errno(r, "Closing connection: %m");
432 remove_source(s, fd);
433 return 0;
434 } else
435 return 1;
436 }
437
438 static int dispatch_raw_source_until_block(sd_event_source *event,
439 void *userdata) {
440 RemoteSource *source = ASSERT_PTR(userdata);
441 int r;
442
443 assert(event);
444
445 /* Make sure event stays around even if source is destroyed */
446 sd_event_source_ref(event);
447
448 r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
449 if (r != 1) {
450 int k;
451
452 /* No more data for now */
453 k = sd_event_source_set_enabled(event, SD_EVENT_OFF);
454 if (k < 0)
455 r = k;
456 }
457
458 sd_event_source_unref(event);
459
460 return r;
461 }
462
463 static int dispatch_raw_source_event(sd_event_source *event,
464 int fd,
465 uint32_t revents,
466 void *userdata) {
467 RemoteSource *source = ASSERT_PTR(userdata);
468 int r;
469
470 assert(source->event);
471 assert(source->buffer_event);
472
473 r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
474 if (r == 1) {
475 int k;
476
477 /* Might have more data. We need to rerun the handler
478 * until we are sure the buffer is exhausted. */
479 k = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
480 if (k < 0)
481 r = k;
482 }
483
484 return r;
485 }
486
487 static int dispatch_blocking_source_event(sd_event_source *event,
488 void *userdata) {
489 RemoteSource *source = ASSERT_PTR(userdata);
490
491 return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
492 }
493
494 static int accept_connection(
495 const char* type,
496 int fd,
497 SocketAddress *addr,
498 char **hostname) {
499
500 _cleanup_close_ int fd2 = -EBADF;
501 int r;
502
503 assert(addr);
504 assert(hostname);
505
506 log_debug("Accepting new %s connection on fd:%d", type, fd);
507 fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
508 if (fd2 < 0) {
509 if (ERRNO_IS_ACCEPT_AGAIN(errno))
510 return -EAGAIN;
511
512 return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
513 }
514
515 switch (socket_address_family(addr)) {
516 case AF_INET:
517 case AF_INET6:
518 case AF_VSOCK:
519 case AF_UNIX: {
520 _cleanup_free_ char *a = NULL;
521 char *b;
522
523 r = socket_address_print(addr, &a);
524 if (r < 0)
525 return log_error_errno(r, "socket_address_print(): %m");
526
527 r = socknameinfo_pretty(&addr->sockaddr.sa, addr->size, &b);
528 if (r < 0)
529 return log_error_errno(r, "Resolving hostname failed: %m");
530
531 log_debug("Accepted %s %s connection from %s",
532 type,
533 af_to_ipv4_ipv6(socket_address_family(addr)),
534 a);
535
536 *hostname = b;
537 return TAKE_FD(fd2);
538 }
539
540 default:
541 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
542 "Rejected %s connection with unsupported family %d",
543 type, socket_address_family(addr));
544 }
545 }
546
547 static int dispatch_raw_connection_event(
548 sd_event_source *event,
549 int fd,
550 uint32_t revents,
551 void *userdata) {
552
553 RemoteServer *s = ASSERT_PTR(userdata);
554 int fd2;
555 SocketAddress addr = {
556 .size = sizeof(union sockaddr_union),
557 .type = SOCK_STREAM,
558 };
559 char *hostname = NULL;
560
561 fd2 = accept_connection("raw", fd, &addr, &hostname);
562 if (fd2 == -EAGAIN)
563 return 0;
564 if (fd2 < 0)
565 return fd2;
566
567 return journal_remote_add_source(s, fd2, hostname, true);
568 }