]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote.c
tree-wide: beautify remaining copyright statements
[thirdparty/systemd.git] / src / journal-remote / journal-remote.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2 /***
3 Copyright © 2012 Zbigniew Jędrzejewski-Szmek
4 ***/
5
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <sys/prctl.h>
12 #include <sys/socket.h>
13 #include <stdint.h>
14
15 #include "sd-daemon.h"
16
17 #include "alloc-util.h"
18 #include "def.h"
19 #include "escape.h"
20 #include "fd-util.h"
21 #include "journal-file.h"
22 #include "journal-remote-write.h"
23 #include "journal-remote.h"
24 #include "journald-native.h"
25 #include "macro.h"
26 #include "parse-util.h"
27 #include "process-util.h"
28 #include "socket-util.h"
29 #include "stdio-util.h"
30 #include "string-util.h"
31 #include "strv.h"
32
33 #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
34
35 #define filename_escape(s) xescape((s), "/ ")
36
37 static int open_output(RemoteServer *s, Writer *w, const char* host) {
38 _cleanup_free_ char *_filename = NULL;
39 const char *filename;
40 int r;
41
42 switch (s->split_mode) {
43 case JOURNAL_WRITE_SPLIT_NONE:
44 filename = s->output;
45 break;
46
47 case JOURNAL_WRITE_SPLIT_HOST: {
48 _cleanup_free_ char *name;
49
50 assert(host);
51
52 name = filename_escape(host);
53 if (!name)
54 return log_oom();
55
56 r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
57 if (r < 0)
58 return log_oom();
59
60 filename = _filename;
61 break;
62 }
63
64 default:
65 assert_not_reached("what?");
66 }
67
68 r = journal_file_open_reliably(filename,
69 O_RDWR|O_CREAT, 0640,
70 s->compress, (uint64_t) -1, s->seal,
71 &w->metrics,
72 w->mmap, NULL,
73 NULL, &w->journal);
74 if (r < 0)
75 return log_error_errno(r, "Failed to open output journal %s: %m", filename);
76
77 log_debug("Opened output file %s", w->journal->path);
78 return 0;
79 }
80
81 /**********************************************************************
82 **********************************************************************
83 **********************************************************************/
84
85 static int init_writer_hashmap(RemoteServer *s) {
86 static const struct hash_ops* const hash_ops[] = {
87 [JOURNAL_WRITE_SPLIT_NONE] = NULL,
88 [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
89 };
90
91 assert(s);
92 assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
93
94 s->writers = hashmap_new(hash_ops[s->split_mode]);
95 if (!s->writers)
96 return log_oom();
97
98 return 0;
99 }
100
101 int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
102 _cleanup_(writer_unrefp) Writer *w = NULL;
103 const void *key;
104 int r;
105
106 switch(s->split_mode) {
107 case JOURNAL_WRITE_SPLIT_NONE:
108 key = "one and only";
109 break;
110
111 case JOURNAL_WRITE_SPLIT_HOST:
112 assert(host);
113 key = host;
114 break;
115
116 default:
117 assert_not_reached("what split mode?");
118 }
119
120 w = hashmap_get(s->writers, key);
121 if (w)
122 writer_ref(w);
123 else {
124 w = writer_new(s);
125 if (!w)
126 return log_oom();
127
128 if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
129 w->hashmap_key = strdup(key);
130 if (!w->hashmap_key)
131 return log_oom();
132 }
133
134 r = open_output(s, w, host);
135 if (r < 0)
136 return r;
137
138 r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
139 if (r < 0)
140 return r;
141 }
142
143 *writer = TAKE_PTR(w);
144
145 return 0;
146 }
147
148 /**********************************************************************
149 **********************************************************************
150 **********************************************************************/
151
152 /* This should go away as soon as µhttpd allows state to be passed around. */
153 RemoteServer *journal_remote_server_global;
154
155 static int dispatch_raw_source_event(sd_event_source *event,
156 int fd,
157 uint32_t revents,
158 void *userdata);
159 static int dispatch_raw_source_until_block(sd_event_source *event,
160 void *userdata);
161 static int dispatch_blocking_source_event(sd_event_source *event,
162 void *userdata);
163 static int dispatch_raw_connection_event(sd_event_source *event,
164 int fd,
165 uint32_t revents,
166 void *userdata);
167
168 static int get_source_for_fd(RemoteServer *s,
169 int fd, char *name, RemoteSource **source) {
170 Writer *writer;
171 int r;
172
173 /* This takes ownership of name, but only on success. */
174
175 assert(fd >= 0);
176 assert(source);
177
178 if (!GREEDY_REALLOC0(s->sources, s->sources_size, fd + 1))
179 return log_oom();
180
181 r = journal_remote_get_writer(s, name, &writer);
182 if (r < 0)
183 return log_warning_errno(r, "Failed to get writer for source %s: %m",
184 name);
185
186 if (s->sources[fd] == NULL) {
187 s->sources[fd] = source_new(fd, false, name, writer);
188 if (!s->sources[fd]) {
189 writer_unref(writer);
190 return log_oom();
191 }
192
193 s->active++;
194 }
195
196 *source = s->sources[fd];
197 return 0;
198 }
199
200 static int remove_source(RemoteServer *s, int fd) {
201 RemoteSource *source;
202
203 assert(s);
204 assert(fd >= 0 && fd < (ssize_t) s->sources_size);
205
206 source = s->sources[fd];
207 if (source) {
208 /* this closes fd too */
209 source_free(source);
210 s->sources[fd] = NULL;
211 s->active--;
212 }
213
214 return 0;
215 }
216
217 int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
218 RemoteSource *source = NULL;
219 int r;
220
221 /* This takes ownership of name, even on failure, if own_name is true. */
222
223 assert(s);
224 assert(fd >= 0);
225 assert(name);
226
227 if (!own_name) {
228 name = strdup(name);
229 if (!name)
230 return log_oom();
231 }
232
233 r = get_source_for_fd(s, fd, name, &source);
234 if (r < 0) {
235 log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
236 fd, name);
237 free(name);
238 return r;
239 }
240
241 r = sd_event_add_io(s->events, &source->event,
242 fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
243 dispatch_raw_source_event, source);
244 if (r == 0) {
245 /* Add additional source for buffer processing. It will be
246 * enabled later. */
247 r = sd_event_add_defer(s->events, &source->buffer_event,
248 dispatch_raw_source_until_block, source);
249 if (r == 0)
250 sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
251 } else if (r == -EPERM) {
252 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
253 r = sd_event_add_defer(s->events, &source->event,
254 dispatch_blocking_source_event, source);
255 if (r == 0)
256 sd_event_source_set_enabled(source->event, SD_EVENT_ON);
257 }
258 if (r < 0) {
259 log_error_errno(r, "Failed to register event source for fd:%d: %m",
260 fd);
261 goto error;
262 }
263
264 r = sd_event_source_set_description(source->event, name);
265 if (r < 0) {
266 log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
267 goto error;
268 }
269
270 return 1; /* work to do */
271
272 error:
273 remove_source(s, fd);
274 return r;
275 }
276
277 int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
278 int r;
279 _cleanup_close_ int fd_ = fd;
280 char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
281
282 assert(fd >= 0);
283
284 r = sd_event_add_io(s->events, &s->listen_event,
285 fd, EPOLLIN,
286 dispatch_raw_connection_event, s);
287 if (r < 0)
288 return r;
289
290 xsprintf(name, "raw-socket-%d", fd);
291
292 r = sd_event_source_set_description(s->listen_event, name);
293 if (r < 0)
294 return r;
295
296 fd_ = -1;
297 s->active++;
298 return 0;
299 }
300
301 /**********************************************************************
302 **********************************************************************
303 **********************************************************************/
304
305 int journal_remote_server_init(
306 RemoteServer *s,
307 const char *output,
308 JournalWriteSplitMode split_mode,
309 bool compress,
310 bool seal) {
311
312 int r;
313
314 assert(s);
315
316 assert(journal_remote_server_global == NULL);
317 journal_remote_server_global = s;
318
319 s->split_mode = split_mode;
320 s->compress = compress;
321 s->seal = seal;
322
323 if (output)
324 s->output = output;
325 else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
326 s->output = REMOTE_JOURNAL_PATH "/remote.journal";
327 else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
328 s->output = REMOTE_JOURNAL_PATH;
329 else
330 assert_not_reached("bad split mode");
331
332 r = sd_event_default(&s->events);
333 if (r < 0)
334 return log_error_errno(r, "Failed to allocate event loop: %m");
335
336 r = init_writer_hashmap(s);
337 if (r < 0)
338 return r;
339
340 return 0;
341 }
342
343 #if HAVE_MICROHTTPD
344 static void MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
345 MHD_stop_daemon(d->daemon);
346 sd_event_source_unref(d->io_event);
347 sd_event_source_unref(d->timer_event);
348 free(d);
349 }
350 #endif
351
352 RemoteServer* journal_remote_server_destroy(RemoteServer *s) {
353 size_t i;
354
355 #if HAVE_MICROHTTPD
356 hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
357 #endif
358
359 assert(s->sources_size == 0 || s->sources);
360 for (i = 0; i < s->sources_size; i++)
361 remove_source(s, i);
362 free(s->sources);
363
364 writer_unref(s->_single_writer);
365 hashmap_free(s->writers);
366
367 sd_event_source_unref(s->sigterm_event);
368 sd_event_source_unref(s->sigint_event);
369 sd_event_source_unref(s->listen_event);
370 sd_event_unref(s->events);
371
372 if (s == journal_remote_server_global)
373 journal_remote_server_global = NULL;
374
375 /* fds that we're listening on remain open... */
376 return NULL;
377 }
378
379 /**********************************************************************
380 **********************************************************************
381 **********************************************************************/
382
383 int journal_remote_handle_raw_source(
384 sd_event_source *event,
385 int fd,
386 uint32_t revents,
387 RemoteServer *s) {
388
389 RemoteSource *source;
390 int r;
391
392 /* Returns 1 if there might be more data pending,
393 * 0 if data is currently exhausted, negative on error.
394 */
395
396 assert(fd >= 0 && fd < (ssize_t) s->sources_size);
397 source = s->sources[fd];
398 assert(source->importer.fd == fd);
399
400 r = process_source(source, s->compress, s->seal);
401 if (journal_importer_eof(&source->importer)) {
402 size_t remaining;
403
404 log_debug("EOF reached with source %s (fd=%d)",
405 source->importer.name, source->importer.fd);
406
407 remaining = journal_importer_bytes_remaining(&source->importer);
408 if (remaining > 0)
409 log_notice("Premature EOF. %zu bytes lost.", remaining);
410 remove_source(s, source->importer.fd);
411 log_debug("%zu active sources remaining", s->active);
412 return 0;
413 } else if (r == -E2BIG) {
414 log_notice_errno(E2BIG, "Entry too big, skipped");
415 return 1;
416 } else if (r == -EAGAIN) {
417 return 0;
418 } else if (r < 0) {
419 log_debug_errno(r, "Closing connection: %m");
420 remove_source(s, fd);
421 return 0;
422 } else
423 return 1;
424 }
425
426 static int dispatch_raw_source_until_block(sd_event_source *event,
427 void *userdata) {
428 RemoteSource *source = userdata;
429 int r;
430
431 /* Make sure event stays around even if source is destroyed */
432 sd_event_source_ref(event);
433
434 r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
435 if (r != 1)
436 /* No more data for now */
437 sd_event_source_set_enabled(event, SD_EVENT_OFF);
438
439 sd_event_source_unref(event);
440
441 return r;
442 }
443
444 static int dispatch_raw_source_event(sd_event_source *event,
445 int fd,
446 uint32_t revents,
447 void *userdata) {
448 RemoteSource *source = userdata;
449 int r;
450
451 assert(source->event);
452 assert(source->buffer_event);
453
454 r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
455 if (r == 1)
456 /* Might have more data. We need to rerun the handler
457 * until we are sure the buffer is exhausted. */
458 sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
459
460 return r;
461 }
462
463 static int dispatch_blocking_source_event(sd_event_source *event,
464 void *userdata) {
465 RemoteSource *source = userdata;
466
467 return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
468 }
469
470 static int accept_connection(const char* type, int fd,
471 SocketAddress *addr, char **hostname) {
472 int fd2, r;
473
474 log_debug("Accepting new %s connection on fd:%d", type, fd);
475 fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
476 if (fd2 < 0)
477 return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
478
479 switch(socket_address_family(addr)) {
480 case AF_INET:
481 case AF_INET6: {
482 _cleanup_free_ char *a = NULL;
483 char *b;
484
485 r = socket_address_print(addr, &a);
486 if (r < 0) {
487 log_error_errno(r, "socket_address_print(): %m");
488 close(fd2);
489 return r;
490 }
491
492 r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b);
493 if (r < 0) {
494 log_error_errno(r, "Resolving hostname failed: %m");
495 close(fd2);
496 return r;
497 }
498
499 log_debug("Accepted %s %s connection from %s",
500 type,
501 socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
502 a);
503
504 *hostname = b;
505
506 return fd2;
507 };
508 default:
509 log_error("Rejected %s connection with unsupported family %d",
510 type, socket_address_family(addr));
511 close(fd2);
512
513 return -EINVAL;
514 }
515 }
516
517 static int dispatch_raw_connection_event(sd_event_source *event,
518 int fd,
519 uint32_t revents,
520 void *userdata) {
521 RemoteServer *s = userdata;
522 int fd2;
523 SocketAddress addr = {
524 .size = sizeof(union sockaddr_union),
525 .type = SOCK_STREAM,
526 };
527 char *hostname = NULL;
528
529 fd2 = accept_connection("raw", fd, &addr, &hostname);
530 if (fd2 < 0)
531 return fd2;
532
533 return journal_remote_add_source(s, fd2, hostname, true);
534 }