]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote.c
journal-remote: simplify error handling a bit
[thirdparty/systemd.git] / src / journal-remote / journal-remote.c
1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <stdlib.h>
6 #include <sys/prctl.h>
7 #include <stdint.h>
8
9 #include "sd-daemon.h"
10
11 #include "af-list.h"
12 #include "alloc-util.h"
13 #include "constants.h"
14 #include "errno-util.h"
15 #include "escape.h"
16 #include "fd-util.h"
17 #include "journal-remote-write.h"
18 #include "journal-remote.h"
19 #include "journald-native.h"
20 #include "macro.h"
21 #include "managed-journal-file.h"
22 #include "parse-util.h"
23 #include "process-util.h"
24 #include "socket-util.h"
25 #include "stdio-util.h"
26 #include "string-util.h"
27 #include "strv.h"
28
29 #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
30
31 #define filename_escape(s) xescape((s), "/ ")
32
33 #if HAVE_MICROHTTPD
34 MHDDaemonWrapper *MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
35 if (!d)
36 return NULL;
37
38 if (d->daemon)
39 MHD_stop_daemon(d->daemon);
40 sd_event_source_unref(d->io_event);
41 sd_event_source_unref(d->timer_event);
42
43 return mfree(d);
44 }
45 #endif
46
47 static int open_output(RemoteServer *s, Writer *w, const char* host) {
48 _cleanup_free_ char *_filename = NULL;
49 const char *filename;
50 int r;
51
52 switch (s->split_mode) {
53 case JOURNAL_WRITE_SPLIT_NONE:
54 filename = s->output;
55 break;
56
57 case JOURNAL_WRITE_SPLIT_HOST: {
58 _cleanup_free_ char *name = NULL;
59
60 assert(host);
61
62 name = filename_escape(host);
63 if (!name)
64 return log_oom();
65
66 r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
67 if (r < 0)
68 return log_oom();
69
70 filename = _filename;
71 break;
72 }
73
74 default:
75 assert_not_reached();
76 }
77
78 r = managed_journal_file_open_reliably(
79 filename,
80 O_RDWR|O_CREAT,
81 s->file_flags,
82 0640,
83 UINT64_MAX,
84 &w->metrics,
85 w->mmap,
86 NULL,
87 NULL,
88 &w->journal);
89 if (r < 0)
90 return log_error_errno(r, "Failed to open output journal %s: %m", filename);
91
92 log_debug("Opened output file %s", w->journal->file->path);
93 return 0;
94 }
95
96 /**********************************************************************
97 **********************************************************************
98 **********************************************************************/
99
100 static int init_writer_hashmap(RemoteServer *s) {
101 static const struct hash_ops* const hash_ops[] = {
102 [JOURNAL_WRITE_SPLIT_NONE] = NULL,
103 [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
104 };
105
106 assert(s);
107 assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
108
109 s->writers = hashmap_new(hash_ops[s->split_mode]);
110 if (!s->writers)
111 return log_oom();
112
113 return 0;
114 }
115
116 int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
117 _cleanup_(writer_unrefp) Writer *w = NULL;
118 const void *key;
119 int r;
120
121 switch (s->split_mode) {
122 case JOURNAL_WRITE_SPLIT_NONE:
123 key = "one and only";
124 break;
125
126 case JOURNAL_WRITE_SPLIT_HOST:
127 assert(host);
128 key = host;
129 break;
130
131 default:
132 assert_not_reached();
133 }
134
135 w = hashmap_get(s->writers, key);
136 if (w)
137 writer_ref(w);
138 else {
139 r = writer_new(s, &w);
140 if (r < 0)
141 return r;
142
143 if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
144 w->hashmap_key = strdup(key);
145 if (!w->hashmap_key)
146 return -ENOMEM;
147 }
148
149 r = open_output(s, w, host);
150 if (r < 0)
151 return r;
152
153 r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
154 if (r < 0)
155 return r;
156 }
157
158 *writer = TAKE_PTR(w);
159 return 0;
160 }
161
162 /**********************************************************************
163 **********************************************************************
164 **********************************************************************/
165
166 /* This should go away as soon as μhttpd allows state to be passed around. */
167 RemoteServer *journal_remote_server_global;
168
169 static int dispatch_raw_source_event(sd_event_source *event,
170 int fd,
171 uint32_t revents,
172 void *userdata);
173 static int dispatch_raw_source_until_block(sd_event_source *event,
174 void *userdata);
175 static int dispatch_blocking_source_event(sd_event_source *event,
176 void *userdata);
177 static int dispatch_raw_connection_event(sd_event_source *event,
178 int fd,
179 uint32_t revents,
180 void *userdata);
181
182 static int get_source_for_fd(RemoteServer *s,
183 int fd, char *name, RemoteSource **source) {
184 Writer *writer;
185 int r;
186
187 /* This takes ownership of name, but only on success. */
188
189 assert(fd >= 0);
190 assert(source);
191
192 if (!GREEDY_REALLOC0(s->sources, fd + 1))
193 return log_oom();
194
195 r = journal_remote_get_writer(s, name, &writer);
196 if (r < 0)
197 return log_warning_errno(r, "Failed to get writer for source %s: %m",
198 name);
199
200 if (!s->sources[fd]) {
201 s->sources[fd] = source_new(fd, false, name, writer);
202 if (!s->sources[fd]) {
203 writer_unref(writer);
204 return log_oom();
205 }
206
207 s->active++;
208 }
209
210 *source = s->sources[fd];
211 return 0;
212 }
213
214 static int remove_source(RemoteServer *s, int fd) {
215 RemoteSource *source;
216
217 assert(s);
218 assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
219
220 source = s->sources[fd];
221 if (source) {
222 /* this closes fd too */
223 source_free(source);
224 s->sources[fd] = NULL;
225 s->active--;
226 }
227
228 return 0;
229 }
230
231 int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
232 RemoteSource *source = NULL;
233 int r;
234
235 /* This takes ownership of name, even on failure, if own_name is true. */
236
237 assert(s);
238 assert(fd >= 0);
239 assert(name);
240
241 if (!own_name) {
242 name = strdup(name);
243 if (!name)
244 return log_oom();
245 }
246
247 r = get_source_for_fd(s, fd, name, &source);
248 if (r < 0) {
249 log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
250 fd, name);
251 free(name);
252 return r;
253 }
254
255 r = sd_event_add_io(s->events, &source->event,
256 fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
257 dispatch_raw_source_event, source);
258 if (r == 0) {
259 /* Add additional source for buffer processing. It will be
260 * enabled later. */
261 r = sd_event_add_defer(s->events, &source->buffer_event,
262 dispatch_raw_source_until_block, source);
263 if (r == 0)
264 r = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
265 } else if (r == -EPERM) {
266 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
267 r = sd_event_add_defer(s->events, &source->event,
268 dispatch_blocking_source_event, source);
269 if (r == 0)
270 r = sd_event_source_set_enabled(source->event, SD_EVENT_ON);
271 }
272 if (r < 0) {
273 log_error_errno(r, "Failed to register event source for fd:%d: %m",
274 fd);
275 goto error;
276 }
277
278 r = sd_event_source_set_description(source->event, name);
279 if (r < 0) {
280 log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
281 goto error;
282 }
283
284 return 1; /* work to do */
285
286 error:
287 remove_source(s, fd);
288 return r;
289 }
290
291 int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
292 int r;
293 _unused_ _cleanup_close_ int fd_ = fd;
294 char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
295
296 assert(fd >= 0);
297
298 r = sd_event_add_io(s->events, &s->listen_event,
299 fd, EPOLLIN,
300 dispatch_raw_connection_event, s);
301 if (r < 0)
302 return r;
303
304 xsprintf(name, "raw-socket-%d", fd);
305
306 r = sd_event_source_set_description(s->listen_event, name);
307 if (r < 0)
308 return r;
309
310 fd_ = -EBADF;
311 s->active++;
312 return 0;
313 }
314
315 /**********************************************************************
316 **********************************************************************
317 **********************************************************************/
318
319 int journal_remote_server_init(
320 RemoteServer *s,
321 const char *output,
322 JournalWriteSplitMode split_mode,
323 JournalFileFlags file_flags) {
324
325 int r;
326
327 assert(s);
328
329 assert(journal_remote_server_global == NULL);
330 journal_remote_server_global = s;
331
332 s->split_mode = split_mode;
333 s->file_flags = file_flags;
334
335 if (output)
336 s->output = output;
337 else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
338 s->output = REMOTE_JOURNAL_PATH "/remote.journal";
339 else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
340 s->output = REMOTE_JOURNAL_PATH;
341 else
342 assert_not_reached();
343
344 r = sd_event_default(&s->events);
345 if (r < 0)
346 return log_error_errno(r, "Failed to allocate event loop: %m");
347
348 r = init_writer_hashmap(s);
349 if (r < 0)
350 return r;
351
352 return 0;
353 }
354
355 void journal_remote_server_destroy(RemoteServer *s) {
356 size_t i;
357
358 #if HAVE_MICROHTTPD
359 hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
360 #endif
361
362 for (i = 0; i < MALLOC_ELEMENTSOF(s->sources); i++)
363 remove_source(s, i);
364 free(s->sources);
365
366 writer_unref(s->_single_writer);
367 hashmap_free(s->writers);
368
369 sd_event_source_unref(s->sigterm_event);
370 sd_event_source_unref(s->sigint_event);
371 sd_event_source_unref(s->listen_event);
372 sd_event_unref(s->events);
373
374 if (s == journal_remote_server_global)
375 journal_remote_server_global = NULL;
376
377 /* fds that we're listening on remain open... */
378 }
379
380 /**********************************************************************
381 **********************************************************************
382 **********************************************************************/
383
384 int journal_remote_handle_raw_source(
385 sd_event_source *event,
386 int fd,
387 uint32_t revents,
388 RemoteServer *s) {
389
390 RemoteSource *source;
391 int r;
392
393 /* Returns 1 if there might be more data pending,
394 * 0 if data is currently exhausted, negative on error.
395 */
396
397 assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
398 source = s->sources[fd];
399 assert(source->importer.fd == fd);
400
401 r = process_source(source, s->file_flags);
402 if (journal_importer_eof(&source->importer)) {
403 size_t remaining;
404
405 log_debug("EOF reached with source %s (fd=%d)",
406 source->importer.name, source->importer.fd);
407
408 remaining = journal_importer_bytes_remaining(&source->importer);
409 if (remaining > 0)
410 log_notice("Premature EOF. %zu bytes lost.", remaining);
411 remove_source(s, source->importer.fd);
412 log_debug("%zu active sources remaining", s->active);
413 return 0;
414 } else if (r == -E2BIG) {
415 log_notice("Entry with too many fields, skipped");
416 return 1;
417 } else if (r == -ENOBUFS) {
418 log_notice("Entry too big, skipped");
419 return 1;
420 } else if (r == -EAGAIN) {
421 return 0;
422 } else if (r < 0) {
423 log_debug_errno(r, "Closing connection: %m");
424 remove_source(s, fd);
425 return 0;
426 } else
427 return 1;
428 }
429
430 static int dispatch_raw_source_until_block(sd_event_source *event,
431 void *userdata) {
432 RemoteSource *source = userdata;
433 int r;
434
435 /* Make sure event stays around even if source is destroyed */
436 sd_event_source_ref(event);
437
438 r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
439 if (r != 1) {
440 int k;
441
442 /* No more data for now */
443 k = sd_event_source_set_enabled(event, SD_EVENT_OFF);
444 if (k < 0)
445 r = k;
446 }
447
448 sd_event_source_unref(event);
449
450 return r;
451 }
452
453 static int dispatch_raw_source_event(sd_event_source *event,
454 int fd,
455 uint32_t revents,
456 void *userdata) {
457 RemoteSource *source = userdata;
458 int r;
459
460 assert(source->event);
461 assert(source->buffer_event);
462
463 r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
464 if (r == 1) {
465 int k;
466
467 /* Might have more data. We need to rerun the handler
468 * until we are sure the buffer is exhausted. */
469 k = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
470 if (k < 0)
471 r = k;
472 }
473
474 return r;
475 }
476
477 static int dispatch_blocking_source_event(sd_event_source *event,
478 void *userdata) {
479 RemoteSource *source = userdata;
480
481 return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
482 }
483
484 static int accept_connection(
485 const char* type,
486 int fd,
487 SocketAddress *addr,
488 char **hostname) {
489
490 _cleanup_close_ int fd2 = -EBADF;
491 int r;
492
493 log_debug("Accepting new %s connection on fd:%d", type, fd);
494 fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
495 if (fd2 < 0) {
496 if (ERRNO_IS_ACCEPT_AGAIN(errno))
497 return -EAGAIN;
498
499 return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
500 }
501
502 switch (socket_address_family(addr)) {
503 case AF_INET:
504 case AF_INET6: {
505 _cleanup_free_ char *a = NULL;
506 char *b;
507
508 r = socket_address_print(addr, &a);
509 if (r < 0)
510 return log_error_errno(r, "socket_address_print(): %m");
511
512 r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b);
513 if (r < 0)
514 return log_error_errno(r, "Resolving hostname failed: %m");
515
516 log_debug("Accepted %s %s connection from %s",
517 type,
518 af_to_ipv4_ipv6(socket_address_family(addr)),
519 a);
520
521 *hostname = b;
522 return TAKE_FD(fd2);
523 }
524
525 default:
526 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
527 "Rejected %s connection with unsupported family %d",
528 type, socket_address_family(addr));
529 }
530 }
531
532 static int dispatch_raw_connection_event(
533 sd_event_source *event,
534 int fd,
535 uint32_t revents,
536 void *userdata) {
537
538 RemoteServer *s = userdata;
539 int fd2;
540 SocketAddress addr = {
541 .size = sizeof(union sockaddr_union),
542 .type = SOCK_STREAM,
543 };
544 char *hostname = NULL;
545
546 fd2 = accept_connection("raw", fd, &addr, &hostname);
547 if (fd2 == -EAGAIN)
548 return 0;
549 if (fd2 < 0)
550 return fd2;
551
552 return journal_remote_add_source(s, fd2, hostname, true);
553 }